URL
date
AI summary
slug
status
tags
summary
type

背景

最近每到高峰期,生产环境就会有各种负载告警,排查之后发现个别用户像是陷入了“死循环”一样,一直不停的针对某些接口发送大量请求,并且看起来不像是恶意通过手动调用的接口。

排查 & 解决

找前端同学排查是否存在代码bug导致“死循环”,排查了很久也并没有找到根本原因。但是我们不能坐以待毙,这种几个用户的“死循环”都足以让系统的压力陡然上升。于是准备上一套针对于单用户请求的限流策略。
由于我们是在一个运行了很久的系统上增加限流,那么这套策略势必不能影响当前正常的用户行为,于是我们先通过过往的请求日志来分析出异常流量的特点:短时间内同用户、同IP、同接口、同请求参数的请求大量发送。这也就是我们的限流策略:针对同用户、同接口、同入参做限流。
💡
此前我们的spring-cloud-gateway已经接了sentinel,第一想法是通过sentinel来做限流。但遗憾的是sentinel对于多维度限流策略并没有很好的支持。
而spring-cloud-gateway原生带有RequestRateLimiterGatewayFilter,使用它可以实现我们的需求。

RequestRateLimiterGatewayFilter

RequestRateLimiterGatewayFilter的逻辑非常简单:
  1. 通过指定的KeyResolver从请求里解析出限流key
  1. 通过指定的RateLimiter来判定该请求应该被放行or被限流
  1. 如放行则filterChain继续往下走,如限流则直接返回响应码429(可配置)
由此可见spring-cloud-gateway限流器的两大核心就是KeyResolver和RateLimiter。

KeyResolver

KeyResolver的默认实现是PrincipalNameKeyResolver,是从ServerWebExchange拿到Principal.name作为限流key,看起来像是用户信息,暂时不了解完整的赋值逻辑。不过反正我们要自定义,就不去深究了。
我们根据前面提到的限流策略(同用户、同接口、同入参)实现了自定义的KeyResolver,这里用了个小技巧:因为我们的web请求统一做了签名逻辑,所以每个请求里都有签名参数(sign),而它本身就是对用户信息+入参的特征提取,所以我们只需要通过 接口和sign 来作为限流key即可。并且这样还有一个好处就是这个key尽量短,我们都知道redis的性能和使用的字符串长度有很大的关系。
public class UrlWithUserParamKeyResolver implements KeyResolver { @Override public Mono<String> resolve(ServerWebExchange exchange) { ServerHttpRequest request = exchange.getRequest(); String authorization = request.getHeaders().getFirst("Authorization"); if (StringUtils.isBlank(authorization)) { return Mono.justOrEmpty(authorization); } String path = request.getURI().getPath(); String sign = request.getQueryParams().getFirst("sign"); if (StringUtils.isBlank(sign)) { return Mono.justOrEmpty(authorization); } return Mono.just(path + "|" + sign); } }

RateLimiter

RateLimiter的默认实现是RedisRateLimiter,通过redis用令牌桶算法实现了一套限流策略。核心参数有3个,也都是和令牌桶息息相关的:
  1. replenishRate 填充速率,也就是每秒可以往桶里补充的令牌数
  1. burstCapacity 桶容量,到达这个数量令牌就补充不进去了
  1. requestedTokens 请求消耗数量,每次请求消耗多少令牌
关于令牌桶算法可以参考这里
spring-cloud-gateway通过一个lua脚本实现了限流逻辑,在本文的最后我们也会简单分析一下这段代码

接入RedisRateLimiter

引入依赖

由于依赖了Redis,所以首先得引入spring-boot-starter-data-redis-reactive以及增加相关配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>

自定义逻辑

另外,由于是在已经长时间稳定运行的系统上增加限流逻辑,我们希望更加谨慎一些,所以我们在原生RequestRateLimiterGatewayFilterFactory的基础上自定义了一些逻辑:
  1. 支持通过开关来控制限流逻辑
  1. 支持通过开关来控制限流后是真正限流还是只打印观测日志
  1. 调用redis执行限流脚本支持超时配置,超时后放行,不影响业务
  1. 支持被限流后的异常文案可配置化
自定义的完整源代码如下(为了篇幅省略了一些get/set方法):
@Slf4j @ConfigurationProperties("com.xxx.gateway.filter.request-rate-limiter") @Component public class RequestRateLimiter2GatewayFilterFactory extends AbstractGatewayFilterFactory<RequestRateLimiter2GatewayFilterFactory.Config> { /** * Key-Resolver key. */ public static final String KEY_RESOLVER_KEY = "keyResolver"; private static final String EMPTY_KEY = "____EMPTY_KEY__"; private final RateLimiter defaultRateLimiter; private final KeyResolver defaultKeyResolver; /** * Switch to deny requests if the Key Resolver returns an empty key, defaults to true. */ private boolean denyEmptyKey = true; /** HttpStatus to return when denyEmptyKey is true, defaults to FORBIDDEN. */ private String emptyKeyStatusCode = HttpStatus.FORBIDDEN.name(); public RequestRateLimiter2GatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) { super(Config.class); this.defaultRateLimiter = defaultRateLimiter; this.defaultKeyResolver = defaultKeyResolver; } @SuppressWarnings("unchecked") @Override public GatewayFilter apply(Config config) { KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver); RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter); boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey); HttpStatusHolder emptyKeyStatus = HttpStatusHolder .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode)); return (exchange, chain) -> { if (!config.isOpen()) { return chain.filter(exchange); } return resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY) .flatMap(key -> { if (EMPTY_KEY.equals(key)) { if (denyEmpty) { setResponseStatus(exchange, emptyKeyStatus); return exchange.getResponse().setComplete(); } return chain.filter(exchange); } String routeId = config.getRouteId(); if (routeId == null) { Route route = exchange .getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); routeId = route.getId(); } long start = System.currentTimeMillis(); return limiter.isAllowed(routeId, key).timeout(Duration.ofMillis(config.getTimeout())).onErrorResume(throwable -> { long end = System.currentTimeMillis(); log.warn("rate limit occurs error, cost:{} ms", (end - start), throwable); return Mono.just(new RateLimiter.Response(true, Collections.emptyMap())); }).flatMap(response -> { long end = System.currentTimeMillis(); log.info("rate limit lua script cost:{} ms", (end - start)); for (Map.Entry<String, String> header : response.getHeaders() .entrySet()) { exchange.getResponse().getHeaders().add(header.getKey(), header.getValue()); } if (response.isAllowed()) { return chain.filter(exchange); } if (!config.isBlock()) { exchange.getResponse().getHeaders().add("x-exceed-unblock", "1"); log.info("did not block, but record, key:{}, userId:{}", key, exchange.getRequest().getHeaders().getFirst("x-user-id")); return chain.filter(exchange); } // setResponseStatus(exchange, config.getStatusCode()); String text = "{\\"error\\":{\\"code\\":\\"%s\\",\\"message\\":\\"%s\\",\\"detail\\":\\"\\",\\"exception\\":\\"\\"}}"; return ExceptionUtils.exceedLimit(exchange, config.getStatusCode(), String.format(text, config.getStatusCode().value(), config.getMsg())); }); }); }; } private <T> T getOrDefault(T configValue, T defaultValue) { return (configValue != null) ? configValue : defaultValue; } public static class Config implements HasRouteId { private KeyResolver keyResolver; private RateLimiter rateLimiter; private HttpStatus statusCode = HttpStatus.TOO_MANY_REQUESTS; private Boolean denyEmptyKey; private String emptyKeyStatus; private String routeId; private String msg = "请求过于频繁,请稍后再试"; private boolean block; private boolean open; private long timeout = 100; } }

配置

com.xxx.gateway.filter.request-rate-limiter.deny-empty-key=false com.xxx.gateway.filter.request-rate-limiter.empty-key-status-code=429 spring.cloud.gateway.routes[0].id=test_host_route spring.cloud.gateway.routes[0].uri=glb://test-service spring.cloud.gateway.routes[0].predicates[0].name=Host spring.cloud.gateway.routes[0].predicates[0].args.patterns[0]=test.com spring.cloud.gateway.routes[0].filters[0].name=RequestRateLimiter2 # 控制是否打开限流逻辑 spring.cloud.gateway.routes[0].filters[0].args.open=true # 控制达到限流阈值后是阻塞请求还是打印观测日志 spring.cloud.gateway.routes[0].filters[0].args.block=true # 控制限流逻辑执行超时时间 spring.cloud.gateway.routes[0].filters[0].args.timeout=150 spring.cloud.gateway.routes[0].filters[0].args.redis-rate-limiter.replenishRate=5 spring.cloud.gateway.routes[0].filters[0].args.redis-rate-limiter.burstCapacity=5 spring.cloud.gateway.routes[0].filters[0].args.redis-rate-limiter.requestedTokens=1 spring.cloud.gateway.routes[0].filters[1].name=Retry spring.cloud.gateway.routes[0].filters[1].args.retries=1 spring.cloud.gateway.routes[0].filters[1].args.methods=GET,POST
到这里,针对test_host_route这个route的限流就生效了!
这里针对超时配置特殊说明一下,本来是想通过lettuce的socket timeout来配置的,但是尝试了一下貌似不生效。就使用reactor的timeout来做了。当时找的相关文章:

后续优化思路

接入限流器可以限制请求的频率,不过如果考虑得更深入一点,这里可能还存在恶意请求的问题。
  • 比如爬虫来遍历整站数据
  • 比如DDos攻击
针对爬虫的场景
  1. 我们可以做多维度的限流器,比如上面说的是 用户 + 请求 + 参数 维度。那我们还可以针对用户维度,限制一段时间的请求量
  1. 可以增加恶意流量识别的逻辑,并针对恶意用户,做用户维度或者IP维度的封禁。再结合产品化的解禁策略,比如图形验证码等等

限流脚本代码分析

下面是spring-cloud-gateway自带的RedisRateLimiter的限流脚本
local tokens_key = KEYS[1] local timestamp_key = KEYS[2] local rate = tonumber(ARGV[1]) local capacity = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local requested = tonumber(ARGV[4]) local fill_time = capacity/rate local ttl = math.floor(fill_time*2) local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then last_tokens = capacity end local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end local delta = math.max(0, now-last_refreshed) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end if ttl > 0 then redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) end return { allowed_num, new_tokens }
一图胜千言,我们直接用一张流程图来解析上面这段代码吧
notion image

存在的问题

上述脚本里存在一个比较严重的问题,因为它的时间now使用的是客户端时间,一旦客户端发生了时间回拨,则可能出现限流失效的问题。下面我通过一个例子来说明:
假设10:18:21的令牌已经消耗完了,同一时间再进来的请求就要等待了,但是此时某一台客户端的时间被往前回拨了1s——10:18:20,而它执行限流脚本时确实是会被限流,但是在限流脚本的最后,redis里存储的timestamp被更新成了10:18:20。这导致后续时钟没有被回拨的请求就能继续通行了。
当然时钟回拨只是一个可能的case,还存在其他的场景,比如:客户端生成完时间之后,发生了一次较长时间的gc,也会导致执行脚本时的时间比传入的时间戳要大。
不过这个问题在3.0.6版本已经被解决了,使用了redis-server的时间:
the inaccurate rate limit in redis when the node time is different in the multi-node scenario
Updated Nov 8, 2021
issue是在压测过程中遇到的问题,我猜想应该是发生了gc等原因导致的,而非时间回拨。对于时间回拨问题,即使切换到了redis的time命令,我想也还是存在的。

redis.replicate_commands

另外,我注意到上面这个fix的改动里还出现了一行代码,这个和本篇文章没有太大关系,只是单纯好奇,记录一下:
redis.replicate_commands()
这个命令是影响redis主从复制的。对于lua脚本来说,在redis3.2之前只支持脚本级的复制,也就是拿着一模一样的脚本再去slave执行一遍,所以lua脚本里对于一些不确定性的写命令是不支持的:比如这个脚本里新增的time命令,在从库执行的时间就无法保证和主库一致,如果这个值用于了后续的写命令,就会让数据同步出现问题。
而redis3.2增加了上述的命令,可以把lua脚本内发生数据变更的命令以事务的方式做持久化和主从复制

一点优化

把2个key优化成一个hash结构更加节约内存,但是不知道对性能的影响如何,回头测试一下

参考

  1. Spring Cloud RedisRateLimit限频存在的几个问题
  1. Redis · 引擎特性 · Lua脚本新姿势
如何解析离线binlog文件spring-cloud-gateway内存泄漏?