侧边栏壁纸
博主头像
再见理想博主等级

只争朝夕,不负韶华

  • 累计撰写 112 篇文章
  • 累计创建 64 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

Redis + Lua 分布式限流

再见理想
2023-01-19 / 0 评论 / 0 点赞 / 711 阅读 / 1,731 字

一,前言

单机版限流仅能保护自身节点,但无法保护应用依赖的各种服务,并且在进行节点扩容、缩容时也无法准确控制整个服务的请求限制。而分布式限流,以集群为维度,可以方便的控制这个集群的请求限制,从而保护下游依赖的各种服务资源。

分布式限流最关键的是要将限流服务做成原子化 ,我们可以借助 Redis 的计数器,Lua 执行的原子性,进行分布式限流。具体实现上存储了两个 key,一个用于计时,一个用于计数。请求每调用一次,计数器加1,若在计时器时间内计数器未超过阈值,则放行。

Redis + Lua 限流基于的是令牌桶算法,系统以恒定速率向桶里放入令牌,当请求来的时候,从桶里拿走一个令牌,如果桶里没有令牌则阻塞或者拒绝新的请求,它允许突发的流量。令牌桶的另外一个好处是可以方便的改变放入桶中的令牌的速率,可以方便实现动态限流

为什么使用 Lua 脚本?

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能,Redis 支持 Lua 脚本,所以通过 Lua 实现限流的算法。Lua本身就是一种编程语言(脚本语言),Redis 脚本使用 Lua 解释器来执行脚本。虽然Redis 官方没有直接提供限流相应的API,但却支持了 Lua 脚本的功能,可以使用它实现复杂的令牌桶或漏桶算法,也是分布式系统中实现限流的主要方式之一。

Lua 脚本实现算法对比操作 Redis 实现算法的优点:

  • 减少网络开销:使用Lua脚本,无需向Redis 发送多次请求,执行一次即可,减少网络传输;

  • 原子操作:Redis 将整个 Lua 脚本作为一个命令执行,原子,无需担心并发;

  • 复用:Lua脚本一旦执行,会永久保存 Redis 中,其他客户端可复用;

二,实现步骤

2.1,项目中引入 Redis(已配置即跳过)

maven:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

Redis配置类:

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfigurtion {
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        template.setConnectionFactory(factory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}

2.2,创建自定义限流注解

通过切面拦截带有自定义限流注解的接口或方法,通过注解方式就可轻松给接口加上限流操作。

import java.lang.annotation.*;

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MyRedisLimiter {
    //限流key,缓存到redis的key=prefix()+key()
    String key();
    //Key的前缀
    String prefix() default "limiter:";
    //给定的时间范围 单位(秒) 默认1秒 即1秒内超过count次的请求将会被限流
    int period() default 1;
    // period()时间内最多访问的次数
    int count();
}

2.3,编写 limit.lua 脚本

通过 limit.lua 脚本实现限流逻辑,将 limit.lua 文件放到项目 resources 目录下。

local count
count = redis.call('get',KEYS[1])
-- 不超过阈值,则直接返回
if count and tonumber(count) > tonumber(ARGV[1]) then
    return count;
end
    -- 自加
    count = redis.call('incr',KEYS[1])
if tonumber(count) == 1 then
    -- 从第一次调用开始限流,设置对应key的过期时间
    redis.call('expire',KEYS[1],ARGV[2])
end
return count;

2.3,切面实现限流业务逻辑

通过 redisTemplate.execute() 执行 lua 脚本返回周期时间内的请求次数,根据次数判断是否触发限流。

import com.google.common.collect.ImmutableList;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.lang.reflect.Method;

@Slf4j
@Aspect
@Component
public class RedisLimitAspect {

    @Autowired
    private RedisTemplate redisTemplate;

    private static final String LIMIT_LUA_PATH = "limit.lua";
    private DefaultRedisScript<Number> redisScript;

    /**
     * 初始化加载Lua脚本
     */
    @PostConstruct
    public void init() {
        redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Number.class);
        ClassPathResource classPathResource = new ClassPathResource(LIMIT_LUA_PATH);
        try {
            //探测资源是否存在
            classPathResource.getInputStream(); 
            redisScript.setScriptSource(new ResourceScriptSource(classPathResource));
        } catch (IOException e) {
            log.error("未找到文件:{}", LIMIT_LUA_PATH);
        }
    }

    /**
     * 对@MyRedisLimiter注解拦截
     */
    @Pointcut("@annotation(com.ym.framework.limit.MyRedisLimiter)")
    public void pointcut(){}

    /**
     * 对切点进行继续处理
     */
    @Around("pointcut()")
    public Object limit(ProceedingJoinPoint pjp) {
        log.info("[限流切面]进入限流逻辑");
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        MyRedisLimiter limitAnnotation = method.getAnnotation(MyRedisLimiter.class);
        int limitPeriod = limitAnnotation.period();
        int limitCount = limitAnnotation.count();
        String limitKey = limitAnnotation.key();

        String key = limitAnnotation.prefix() + limitKey;
        ImmutableList<String> keys = ImmutableList.of(key);
        try {
            // 执行lua脚本,返回count:limitPeriod内的请求次数
            Number count = (Number)redisTemplate.execute(redisScript, keys, limitCount, limitPeriod);
            log.info("[限流切面]try to access, this time count is {} for key: {}", count, key);
            if (count != null && count.intValue() <= limitCount) {
                //放行请求
                return pjp.proceed();
            } else {
                //触发限流,降级或抛异常
                throw new Exception("服务器繁忙,请稍后再试");
            }
        } catch (Throwable e) {
            if (e instanceof RuntimeException) {
                throw new RuntimeException(e.getLocalizedMessage());
            }
            throw new RuntimeException("[限流切面]服务器繁忙,请稍后再试");
        }
    }
}

2.4,接口测试

@Api(tags = "限流测试接口")
@Validated
@RestController
@CrossOrigin
@RequestMapping("/api/limit")
public class TestLimiterController {

    // 1秒内限制10个请求,key=limiter:limitTest
    @MyRedisLimiter(key = "limitTest", count = 10)
    @GetMapping(value = "/test1")
    public String test1() {
        System.out.println("test1");
        return "test1";
    }

    // 10秒内限制5个请求,key=limiter:REDPACKET
    @MyRedisLimiter(key = "REDPACKET", period = 10, count = 5)
    @GetMapping("/test2")
    public String test2() {
        System.out.println("test2");
        return "test2";
    }

    // 10秒内限制5个请求,key=limiter:SECKILL
    @MyRedisLimiter(key = "SECKILL", period = 10, count = 5)
    @GetMapping("/test3")
    public String test3() {
        System.out.println("test3");
        return "test3";
    }
}
0

评论区