好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

详解Spring Cloud Gateway 限流操作

开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。

api网关作为所有请求的入口,请求量大,我们可以通过对并发访问的请求进行限速来保护系统的可用性。

常用的限流算法比如有令牌桶算法,漏桶算法,计数器算法等。

在zuul中我们可以自己去实现限流的功能 (zuul中如何限流在我的书 《spring cloud微服务-全栈技术与案例解析》  中有详细讲解) ,spring cloud gateway的出现本身就是用来替代zuul的。

要想替代那肯定得有强大的功能,除了性能上的优势之外,spring cloud gateway还提供了很多新功能,比如今天我们要讲的限流操作,使用起来非常简单,今天我们就来学习在如何在spring cloud gateway中进行限流操作。

目前限流提供了基于redis的实现,我们需要增加对应的依赖:

?

1

2

3

4

<dependency>

  <groupid>org.springframework.boot</groupid>

  <artifactid>spring-boot-starter-data-redis-reactive</artifactid>

</dependency>

可以通过keyresolver来指定限流的key,比如我们需要根据用户来做限流,ip来做限流等等。

ip限流

?

1

2

3

4

@bean

public keyresolver ipkeyresolver() {

  return exchange -> mono.just(exchange.getrequest().getremoteaddress().gethostname());

}

通过exchange对象可以获取到请求信息,这边用了hostname,如果你想根据用户来做限流的话这边可以获取当前请求的用户id或者用户名就可以了,比如:

用户限流

使用这种方式限流,请求路径中必须携带userid参数。

?

1

2

3

4

@bean

keyresolver userkeyresolver() {

  return exchange -> mono.just(exchange.getrequest().getqueryparams().getfirst( "userid" ));

}

接口限流

获取请求地址的uri作为限流key。

?

1

2

3

4

@bean

keyresolver apikeyresolver() {

  return exchange -> mono.just(exchange.getrequest().getpath().value());

}

然后配置限流的过滤器信息:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

server:

  port: 8084

spring:

  redis:

  host: 127.0 . 0.1

  port: 6379

  cloud:

  gateway:

   routes:

   - id: fsh-house

   uri: lb: //fsh-house

   predicates:

   - path=/house/**

   filters:

   - name: requestratelimiter

    args:

    redis-rate-limiter.replenishrate: 10

    redis-rate-limiter.burstcapacity: 20

    key-resolver: "#{@ipkeyresolver}"

filter名称必须是requestratelimiter redis-rate-limiter.replenishrate:允许用户每秒处理多少个请求 redis-rate-limiter.burstcapacity:令牌桶的容量,允许在一秒钟内完成的最大请求数 key-resolver:使用spel按名称引用bean

可以访问接口进行测试,这时候redis中会有对应的数据:

127.0.0.1:6379> keys *
1) "request_rate_limiter.{localhost}.timestamp"
2) "request_rate_limiter.{localhost}.tokens"

大括号中就是我们的限流key,这边是ip,本地的就是localhost

timestamp:存储的是当前时间的秒数,也就是system.currenttimemillis() / 1000或者instant.now().getepochsecond() tokens:存储的是当前这秒钟的对应的可用的令牌数量

spring cloud gateway目前提供的限流还是相对比较简单的,在实际中我们的限流策略会有很多种情况,比如:

每个接口的限流数量不同,可以通过配置中心动态调整 超过的流量被拒绝后可以返回固定的格式给调用方 对某个服务进行整体限流(这个大家可以思考下用spring cloud gateway如何实现,其实很简单) ……

当然我们也可以通过重新redisratelimiter来实现自己的限流策略,这个我们后面再进行介绍。

限流源码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

// routeid也就是我们的fsh-house,id就是限流的key,也就是localhost。

public mono<response> isallowed(string routeid, string id) {

  // 会判断redisratelimiter是否初始化了

  if (! this .initialized.get()) {

   throw new illegalstateexception( "redisratelimiter is not initialized" );

  }

  // 获取routeid对应的限流配置

  config routeconfig = getconfig().getordefault(routeid, defaultconfig);

 

  if (routeconfig == null ) {

   throw new illegalargumentexception( "no configuration found for route " + routeid);

  }

 

  // 允许用户每秒做多少次请求

  int replenishrate = routeconfig.getreplenishrate();

 

  // 令牌桶的容量,允许在一秒钟内完成的最大请求数

  int burstcapacity = routeconfig.getburstcapacity();

 

  try {

   // 限流key的名称(request_rate_limiter.{localhost}.timestamp,request_rate_limiter.{localhost}.tokens)

   list<string> keys = getkeys(id);

 

 

   // the arguments to the lua script. time() returns unixtime in seconds.

   list<string> scriptargs = arrays.aslist(replenishrate + "" , burstcapacity + "" ,

     instant.now().getepochsecond() + "" , "1" );

   // allowed, tokens_left = redis.eval(script, keys, args)

   // 执行lua脚本

   flux<list< long >> flux = this .redistemplate.execute( this .script, keys, scriptargs);

     // .log("redisratelimiter", level.finer);

   return flux.onerrorresume(throwable -> flux.just(arrays.aslist(1l, -1l)))

     .reduce( new arraylist< long >(), (longs, l) -> {

      longs.addall(l);

      return longs;

     }) .map(results -> {

      boolean allowed = results.get( 0 ) == 1l;

      long tokensleft = results.get( 1 );

 

      response response = new response(allowed, getheaders(routeconfig, tokensleft));

 

      if (log.isdebugenabled()) {

       log.debug( "response: " + response);

      }

      return response;

     });

  }

  catch (exception e) {

   log.error( "error determining if user allowed from redis" , e);

  }

  return mono.just( new response( true , getheaders(routeconfig, -1l)));

}

lua脚本在:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

local tokens_key = keys[ 1 ]

local timestamp_key = keys[ 2 ]

--redis.log(redis.log_warning, "tokens_key " .. tokens_key)

 

local rate = tonumber(argv[ 1 ])

local capacity = tonumber(argv[ 2 ])

local now = tonumber(argv[ 3 ])

local requested = tonumber(argv[ 4 ])

 

local fill_time = capacity/rate

local ttl = math.floor(fill_time* 2 )

 

--redis.log(redis.log_warning, "rate " .. argv[ 1 ])

--redis.log(redis.log_warning, "capacity " .. argv[ 2 ])

--redis.log(redis.log_warning, "now " .. argv[ 3 ])

--redis.log(redis.log_warning, "requested " .. argv[ 4 ])

--redis.log(redis.log_warning, "filltime " .. fill_time)

--redis.log(redis.log_warning, "ttl " .. ttl)

 

local last_tokens = tonumber(redis.call( "get" , tokens_key))

if last_tokens == nil then

  last_tokens = capacity

end

--redis.log(redis.log_warning, "last_tokens " .. last_tokens)

 

local last_refreshed = tonumber(redis.call( "get" , timestamp_key))

if last_refreshed == nil then

  last_refreshed = 0

end

--redis.log(redis.log_warning, "last_refreshed " .. last_refreshed)

 

local delta = math.max( 0 , now-last_refreshed)

local filled_tokens = math.min(capacity, last_tokens+(delta*rate))

local allowed = filled_tokens >= requested

local new_tokens = filled_tokens

local allowed_num = 0

if allowed then

  new_tokens = filled_tokens - requested

  allowed_num = 1

end

 

--redis.log(redis.log_warning, "delta " .. delta)

--redis.log(redis.log_warning, "filled_tokens " .. filled_tokens)

--redis.log(redis.log_warning, "allowed_num " .. allowed_num)

--redis.log(redis.log_warning, "new_tokens " .. new_tokens)

 

redis.call( "setex" , tokens_key, ttl, new_tokens)

redis.call( "setex" , timestamp_key, ttl, now)

 

return { allowed_num, new_tokens }

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

原文链接:https://mp.weixin.qq.com/s/70i4X_b-gtegofF-8W-uBQ

查看更多关于详解Spring Cloud Gateway 限流操作的详细内容...

  阅读:42次