Redis+Lua分布式限流实现
基本架构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17请求流转图:
┌──────────┐ 1.请求 ┌──────────┐
│ Client 1 │─────────────►│ │
└──────────┘ │ │
┌──────────┐ 2.限流 │ API │
│ Client 2 │◄────────────►│ Gateway │
└──────────┘ │ │
┌──────────┐ 3.计数 │ │
│ Client 3 │─────────────►│ │
└──────────┘ └────┬─────┘
│
4.执行Lua脚本
│
▼
┌──────────┐
│ Redis │
└──────────┘Lua限流脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28-- 限流脚本
-- KEYS[1]: 限流key
-- ARGV[1]: 时间窗口大小(秒)
-- ARGV[2]: 限流阈值
-- ARGV[3]: 当前时间戳
local key = KEYS[1]
local window = tonumber(ARGV[1])
local threshold = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- 1. 移除时间窗口之前的数据
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
-- 2. 获取当前窗口的请求数
local count = redis.call('ZCARD', key)
-- 3. 判断是否超过阈值
if count >= threshold then
return 0
end
-- 4. 记录本次请求
redis.call('ZADD', key, now, now)
-- 5. 设置过期时间
redis.call('EXPIRE', key, window)
return 1Java实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class RedisLimiter {
private final StringRedisTemplate redisTemplate;
private final String luaScript;
public RedisLimiter(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
// 加载Lua脚本
this.luaScript = loadLuaScript();
}
public boolean isAllowed(String key, int window, int threshold) {
List<String> keys = Collections.singletonList(key);
long now = System.currentTimeMillis();
// 执行Lua脚本
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
keys,
String.valueOf(window),
String.valueOf(threshold),
String.valueOf(now)
);
return result != null && result == 1;
}
}动态调整实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public class DynamicRateLimiter {
private static final String LIMIT_CONFIG_KEY = "rate:limit:config";
// 更新限流阈值
public void updateThreshold(String key, int threshold) {
redisTemplate.opsForHash().put(
LIMIT_CONFIG_KEY,
key,
String.valueOf(threshold)
);
}
// 获取当前阈值
private int getCurrentThreshold(String key) {
String value = (String) redisTemplate.opsForHash()
.get(LIMIT_CONFIG_KEY, key);
return value == null ?
defaultThreshold : Integer.parseInt(value);
}
// 限流检查
public boolean isAllowed(String key) {
int threshold = getCurrentThreshold(key);
return redisLimiter.isAllowed(
key,
window,
threshold
);
}
}滑动时间窗口示意
1
2
3
4
5
6
7
8
9
10
11
12
13时间窗口滑动:
now-60s now
│ │
▼ ▼
┌─────────────────────┐
│ 时间窗口(60s) │
└─────────────────────┘
│ │
│ ┌──────┐ │
│ │请求量│ │
│ └──────┘ │
│ │
过期的请求 新请求使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ApiController {
private final DynamicRateLimiter limiter;
public String test() {
String key = "api:test";
if (!limiter.isAllowed(key)) {
throw new RuntimeException("请求被限流");
}
// 业务逻辑
return "success";
}
public void updateLimit(
String key,
int threshold
) {
limiter.updateThreshold(key, threshold);
}
}