好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Sentinel实现动态配置的集群流控的方法

介绍

为什么要使用集群流控呢?

相对于单机流控而言,我们给每台机器设置单机限流阈值,在理想情况下整个集群的限流阈值为机器数量??单机阈值。不过实际情况下流量到每台机器可能会不均匀,会导致总量没有到的情况下某些机器就开始限流。因此仅靠单机维度去限制的话会无法精确地限制总体流量。而集群流控可以精确地控制整个集群的调用总量,结合单机限流兜底,可以更好地发挥流量控制的效果。

基于单机流量不均的问题以及如何设置集群整体的QPS的问题,我们需要创建一种集群限流的模式,这时候我们很自然地就想到,可以找一个 server 来专门统计总的调用量,其它的实例都与这台 server 通信来判断是否可以调用。这就是最基础的集群流控的方式。

原理

集群限流的原理很简单,和单机限流一样,都需要对 qps 等数据进行统计,区别就在于单机版是在每个实例中进行统计,而集群版是有一个专门的实例进行统计。

这个专门的用来统计数据的称为 Sentinel 的 token server,其他的实例作为 Sentinel 的 token client 会向 token server 去请求 token,如果能获取到 token,则说明当前的 qps 还未达到总的阈值,否则就说明已经达到集群的总阈值,当前实例需要被 block,如下图所示:

和单机流控相比,集群流控中共有两种身份:

Token Client:集群流控客户端,用于向所属 Token Server 通信请求 token。集群限流服务端会返回给客户端结果,决定是否限流。 Token Server:即集群流控服务端,处理来自 Token Client 的请求,根据配置的集群规则判断是否应该发放 token(是否允许通过)。

而单机流控中只有一种身份,每个 sentinel 都是一个 token server。

注意,集群限流中的 token server 是单点的,一旦 token server 挂掉,那么集群限流就会退化成单机限流的模式。

Sentinel 集群流控支持限流规则和热点规则两种规则,并支持两种形式的阈值计算方式:

集群总体模式:即限制整个集群内的某个资源的总体 qps 不超过此阈值。 单机均摊模式:单机均摊模式下配置的阈值等同于单机能够承受的限额,token server 会根据连接数来计算总的阈值(比如独立模式下有 3 个 client 连接到了 token server,然后配的单机均摊阈值为 10,则计算出的集群总量就为 30),按照计算出的总的阈值来进行限制。这种方式根据当前的连接数实时计算总的阈值,对于机器经常进行变更的环境非常适合。

部署方式

token server 有两种部署方式:

一种是独立部署,就是单独启动一个 token server 服务来处理 token client 的请求,如下图所示:

如果独立部署的 token server 服务挂掉的话,那其他的 token client 就会退化成本地流控的模式,也就是单机版的流控,所以这种方式的集群限流需要保证 token server 的高可用性。

一种是嵌入部署,即作为内置的 token server 与服务在同一进程中启动。在此模式下,集群中各个实例都是对等的,token server 和 client 可以随时进行转变,如下图所示:

嵌入式部署的模式中,如果 token server 服务挂掉的话,我们可以将另外一个 token client 升级为token server来,当然啦如果我们不想使用当前的 token server 的话,也可以选择另外一个 token client 来承担这个责任,并且将当前 token server 切换为 token client。Sentinel 为我们提供了一个 api 来进行 token server 与 token client 的切换:


	 
		 
			 http  :  //<ip>:<port>/setClusterMode?mode=<xxx>  
	 

其中 mode 为 0 代表 client, 1 代表 server, -1 代表关闭。

PS:注意应用端需要引入集群限流客户端或服务端的相应依赖。

集群限流控制台

sentinel为用户提供集群限流控制台功能,能够通过控制台配置集群的限流规则以及配置集群的Server与Client。

集群限流客户端

要想使用集群限流功能,必须引入集群限流 client 相关依赖:


	 
		 
			 <dependency>  
		 
			 <groupId>  com.alibaba.csp  </groupId>  
		 
			 <artifactId>  sentinel-cluster-client-default  </artifactId>  
		 
			 <version>  1.8.0  </version>  
		 
			 </dependency>  
	 

集群限流服务端

要想使用集群限流服务端,必须引入集群限流 server 相关依赖:


	 
		 
			 <dependency>  
		 
			 <groupId>  com.alibaba.csp  </groupId>  
		 
			 <artifactId>  sentinel-cluster-server-default  </artifactId>  
		 
			 <version>  1.8.0  </version>  
		 
			 </dependency>  
	 

我们结合server和client实现一个嵌入式模式。在pom中同时引入上面的两个依赖,并配置sentinel控制台地址,实现一个查询订单的接口。

pom


	 
		 
			 <dependency>  
		 
			 <groupId>  com.alibaba.csp  </groupId>  
		 
			 <artifactId>  sentinel-cluster-server-default  </artifactId>  
		 
			 </dependency>  
		 
			    
		 
			 <dependency>  
		 
			 <groupId>  com.alibaba.csp  </groupId>  
		 
			 <artifactId>  sentinel-cluster-client-default  </artifactId>  
		 
			 </dependency>  
	 

application.yml


	 
		 
			 server  :  
		 
			 port  :     9091  
		 
			    
		 
			 spring  :  
		 
			 application  :  
		 
			 name  :   cloudalibaba  -  sentinel  -  clusterServer  
		 
			 cloud  :  
		 
			 sentinel  :  
		 
			 transport  :  
		 
			 #配置sentinel dashboard地址  
		 
			 dashboard  :   localhost  :  8080  
		 
			 port  :     8719     #默认8719端口  
	 

OrderController


	 
		 
			 @RestController  
		 
			 public     class     OrderController     {  
		 
			 /**  
		 
			 * 查询订单  
		 
			 * @return  
		 
			 */  
		 
			 @GetMapping  (  "/order/{id}"  )  
		 
			 public     CommonResult  <  Order  >   getOrder  (  @PathVariable  (  "id"  )     Long   id  ){  
		 
			    
		 
			 Order   order   =     new     Order  (  id  ,     "212121"  );  
		 
			 return     CommonResult  .  success  (  order  .  toString  ());  
		 
			 }  
		 
			 }  
	 

代码示例如cloudalibaba-sentinel-cluster-embedded9091

修改VM options配置,启动三个不同端口的实例,即可。


	 
		 
			 -  Dserver  .  port  =  9091     -  Dproject  .  name  =  cloudalibaba  -  sentinel  -  clusterServer   -  Dcsp  .  sentinel  .  log  .  use  .  pid  =  true     
		 
			 -  Dserver  .  port  =  9092     -  Dproject  .  name  =  cloudalibaba  -  sentinel  -  clusterServer   -  Dcsp  .  sentinel  .  log  .  use  .  pid  =  true     
		 
			 -  Dserver  .  port  =  9093     -  Dproject  .  name  =  cloudalibaba  -  sentinel  -  clusterServer   -  Dcsp  .  sentinel  .  log  .  use  .  pid  =  true     
	 

控制台配置

登录sentinel的控制台,并有访问量后,我们就可以在 Sentinel上面看到集群流控,如下图所示:

点击添加Token Server。

从实例列表中选择一个作为Server端,其他作为Client端,并选中到右侧Client列表,配置token sever端的最大允许的QPS,用于对 Token Server 的资源使用进行限制,防止在嵌入模式下影响应用本身。

配置完成之后的Token Server列表,如下图所示

使用控制台配置token Server、token Client以及限流规则,有很多的缺点:

1、限流规则,不能持久化,应用重启之后,规则丢失。

2、token Server 、token Client配置也会丢失。

官方推荐给集群限流服务端注册动态配置源来动态地进行配置。我们使用nacos作为配置中心,动态配置客户端与服务端属性以及限流规则,实现动态集群限流。

sentinel结合nacos实现集群限流

我们使用Nacos对cloudalibaba-sentinel-cluster-embedded9091进行改造,实现动态配置源来动态进行配置。

配置源注册的相关逻辑可以置于 InitFunc 实现类中,并通过 SPI 注册,在 Sentinel 初始化时即可自动进行配置源加载监听。

嵌入模式部署

添加ClusterInitFunc类


	 
		 
			 public     class     ClusterInitFunc     implements     InitFunc     {  
		 
			    
		 
			 //应用名称  
		 
			 private     static     final     String   APP_NAME   =     AppNameUtil  .  getAppName  ();  
		 
			    
		 
			 //nacos集群地址  
		 
			 private     final     String   remoteAddress   =     "localhost:8848"  ;  
		 
			    
		 
			 //nacos配置的分组名称  
		 
			 private     final     String   groupId   =     "SENTINEL_GROUP"  ;  
		 
			    
		 
			 //配置的dataId  
		 
			 private     final     String   flowDataId   =   APP_NAME   +     Constants  .  FLOW_POSTFIX  ;  
		 
			 private     final     String   paramDataId   =   APP_NAME   +     Constants  .  PARAM_FLOW_POSTFIX  ;  
		 
			 private     final     String   configDataId   =   APP_NAME   +     Constants  .  CLIENT_CONFIG_POSTFIX  ;  
		 
			 private     final     String   clusterMapDataId   =   APP_NAME   +     Constants  .  CLUSTER_MAP_POSTFIX  ;  
		 
			    
		 
			 private     static     final     String   SEPARATOR   =     "@"  ;  
		 
			    
		 
			    
		 
			 @Override  
		 
			 public     void   init  ()     {  
		 
			 // Register client dynamic rule data source.  
		 
			 //动态数据源的方式配置sentinel的流量控制和热点参数限流的规则。  
		 
			 initDynamicRuleProperty  ();  
		 
			    
		 
			 // Register token client related data source.  
		 
			 // Token client common config  
		 
			 // 集群限流客户端的配置属性  
		 
			 initClientConfigProperty  ();  
		 
			 // Token client assign config (e.g. target token server) retrieved from assign map:  
		 
			 //初始化Token客户端  
		 
			 initClientServerAssignProperty  ();  
		 
			    
		 
			 // Register token server related data source.  
		 
			 // Register dynamic rule data source supplier for token server:  
		 
			 //集群的流控规则,比如限制整个集群的流控阀值,启动的时候需要添加-Dproject.name=项目名  
		 
			 registerClusterRuleSupplier  ();  
		 
			 // Token server transport config extracted from assign map:  
		 
			 //初始化server的端口配置  
		 
			 initServerTransportConfigProperty  ();  
		 
			    
		 
			 // Init cluster state property for extracting mode from cluster map data source.  
		 
			 //初始化集群中服务是客户端还是服务端  
		 
			 initStateProperty  ();  
		 
			 }  
		 
			    
		 
			 private     void   initDynamicRuleProperty  ()     {  
		 
			    
		 
			 //流量控制的DataId分别是APP_NAME + Constants.FLOW_POSTFIX;热点参数限流规则的DataId是APP_NAME + Constants.PARAM_FLOW_POSTFIX;  
		 
			    
		 
			 ReadableDataSource  <  String  ,     List  <  FlowRule  >>   ruleSource   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 flowDataId  ,   source   ->     js  ON  .  parseObject  (  source  ,     new     TypeReference  <  List  <  FlowRule  >>()     {}));  
		 
			 FlowRuleManager  .  register2Property  (  ruleSource  .  getProperty  ());  
		 
			    
		 
			 ReadableDataSource  <  String  ,     List  <  ParamFlowRule  >>   paramRuleSource   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 paramDataId  ,   source   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  List  <  ParamFlowRule  >>()     {}));  
		 
			 ParamFlowRuleManager  .  register2Property  (  paramRuleSource  .  getProperty  ());  
		 
			 }  
		 
			    
		 
			 private     void   initClientConfigProperty  ()     {  
		 
			 ReadableDataSource  <  String  ,     ClusterClientConfig  >   clientConfigDs   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 configDataId  ,   source    编程客栈   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  ClusterClientConfig  >()     {}));  
		 
			 ClusterClientConfigManager  .  registerClientConfigProperty  (  clientConfigDs  .  getProperty  ());  
		 
			 }  
		 
			    
		 
			 private     void   initServerTransportConfigProperty  ()     {  
		 
			 ReadableDataSource  <  String  ,     ServerTransportConfig  >   serverTransportDs   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 clusterMapDataId  ,   source   ->     {  
		 
			 List  <  ClusterGroupEntity  >   groupList   =     new     Gson  ().  fromJson  (  source  ,     new     TypeToken  <  List  <  ClusterGroupEntity  >>(){}.  getType  ());  
		 
			 return     Optional  .  ofNullable  (  groupList  )  
		 
			  www  .  cppcns  .  com      .  flatMap  (  this  ::  extractServerTransportConfig  )  
		 
			 .  orElse  (  null  );  
		 
			 });  
		 
			 ClusterServerConfigManager  .  registerServerTransportProperty  (  serverTransportDs  .  getProperty  ());  
		 
			 }  
		 
			    
		 
			 private     void   registerClusterRuleSupplier  ()     {  
		 
			 // Register cluster flow rule property supplier which creates data source by namespace.  
		 
			 // Flow rule dataId format: ${namespace}-flow-rules  
		 
			 ClusterFlowRuleManager  .  setPropertySupplier  (  namespace     ->     {  
		 
			 ReadableDataSource  <  String  ,     List  <  FlowRule  >>   ds   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 namespace     +     Constants  .  FLOW_POSTFIX  ,   source   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  List  <  FlowRule  >>()     {}));  
		 
			 return   ds  .  getProperty  ();  
		 
			 });  
		 
			 // Register cluster parameter flow rule property supplier which creates data source by namespace.  
		 
			 ClusterParamFlowRuleManager  .  setPropertySupplier  (  namespace     ->     {  
		 
			 ReadableDataSource  <  String  ,     List  <  ParamFlowRule  >>   ds   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 namespace     +     Constants  .  PARAM_FLOW_POSTFIX  ,   source   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  List  <  ParamFlowRule  >>()     {}));  
		 
			 return   ds  .  getProperty  ();  
		 
			 });  
		 
			 }  
		 
			    
		 
			 private     void   initClientServerAssignProperty  ()     {  
		 
			 // Cluster map format:  
		 
			 // [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","serverId":"112.12.88.68@8728","port":11111}]  
		 
			 // serverId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)  
		 
			 ReadableDataSource  <  String  ,     ClusterClientAssignConfig  >   clientAssignDs   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 clusterMapDataId  ,   source   ->     {  
		 
			 List  <  ClusterGroupEntity  >   groupList   =     new     Gson  ().  fromJson  (  source  ,     new     TypeToken  <  List  <  ClusterGroupEntity  >>(){}.  getType  ());  
		 
			 return     Optional  .  ofNullable  (  groupList  )  
		 
			 .  flatMap  (  this  ::  extractClientAssignment  )  
		 
			 .  orElse  (  null  );  
		 
			 });  
		 
			 ClusterClientConfigManager  .  registerServerAssignProperty  (  clientAssignDs  .  getProperty  ());  
		 
			 }  
		 
			    
		 
			 private     void   initStateProperty  ()     {  
		 
			 // Cluster map format:  
		 
			 // [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","serverId":"112.12.88.68@8728","port":11111}]  
		 
			 // serverId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)  
		 
			 ReadableDataSource  <  String  ,     Integer  >   clusterModeDs   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 clusterMapDataId  ,   source   ->     {  
		 
			 List  <  ClusterGroupEntity  >   groupList   =     new     Gson  ().  fromJson  (  source  ,     new     TypeToken  <  List  <  ClusterGroupEntity  >>(){}.  getType  ());  
		 
			 return     Optional  .  ofNullable  (  groupList  )  
		 
			 .  map  (  this  ::  extractMode  )  
		 
			 .  orElse  (  ClusterStateManager  .  CLUSTER_NOT_STARTED  );  
		 
			 });  
		 
			 ClusterStateManager  .  registerProperty  (  clusterModeDs  .  getProperty  ());  
		 
			 }  
		 
			    
		 
			 private     int   extractMode  (  List  <  ClusterGroupEntity  >   groupList  )     {  
		 
			 // If any server group serverId matches current, then it's token server.  
		 
			 if     (  groupList  .  stream  ().  anyMatch  (  this  ::  machineEqual  ))     {  
		 
			 return     ClusterStateManager  .  CLUSTER_SERVER  ;  
		 
			 }  
		 
			 // If current machine belongs to any of the token server group, then it's token client.  
		 
			 // Otherwise it's unassigned, should be set to NOT_STARTED.  
		 
			 boolean   canBeClient   =   groupList  .  stream  ()  
		 
			 .  flatMap  (  e   ->   e  .  getClientSet  ().  stream  ())  
		 
			 .  filter  (  Objects  ::  nonNull  )  
		 
			 .  anyMatch  (  e   ->   e  .  equals  (  getCurrentMachineId  ()));  
		 
			 return   canBeClient   ?     ClusterStateManager  .  CLUSTER_CLIENT   :     ClusterStateManager  .  CLUSTER_NOT_STARTED  ;  
		 
			 }  
		 
			    
		 
			 private     Optional  <  ServerTransportConfig  >   extractServerTransportConfig  (  List  <  ClusterGroupEntity  >   groupList  )     {  
		 
			 return   groupList  .  stream  ()  
		 
			 .  filter  (  this  ::  machineEqual  )  
		 
			 .  findAny  ()  
		 
			 .  map  (  e   ->     new     ServerTransportConfig  ().  setPort  (  e  .  getPort  ()).  setIdleSeconds  (  600  ));  
		 
			 }  
		 
			    
		 
			 private     Optional  <  ClusterClientAssignConfig  >   extractClientAssignment  (  List  <  ClusterGroupEntity  >   groupList  )     {  
		 
			 if     (  groupList  .  stream  ().  anyMatch  (  this  ::  m   www  .  cppcns  .  com   achineEqual  ))     {  
		 
			 return     Optional  .  empty  ();  
		 
			 }  
		 
			 // Build client assign config from t   JTVlR   he client set of target server group.  
		 
			 for     (  ClusterGroupEntity     group     :   groupList  )     {  
		 
			 if     (  group  .  getClientSet  ().  contains  (  getCurrentMachineId  ()))     {  
		 
			 String   ip   =     group  .  getIp  ();  
		 
			 Integer   port   =     group  .  getPort  ();  
		 
			 return     Optional  .  of  (  new     ClusterClientAssignConfig  (  ip  ,   port  ));  
		 
			 }  
		 
			 }  
		 
			 return     Optional  .  empty  ();  
		 
			 }  
		 
			    
		 
			 private     boolean   machineEqual  (  /*@Valid*/     ClusterGroupEntity     group  )     {  
		 
			 return   getCurrentMachineId  ().  equals  (  group  .  getServerId  ());  
		 
			 }  
		 
			    
		 
			 private     String   getCurrentMachineId  ()     {  
		 
			 // Note: this may not work well for container-based env.  
		 
			 return     HostNameUtil  .  getIp  ()     +   SEPARATOR   +     TransportConfig  .  getRuntimePort  ();  
		 
			 }  
		 
			 }  
	 

在resources文件夹下创建META-INF/service,,然后创建一个叫做com.alibaba.csp.sentinel.init.InitFunc的文件,在文件中指名实现InitFunc接口的类全路径,内容如下:


	 
		 
			 com  .  liang  .  springcloud  .  alibaba  .  init  .  ClusterInitFunc  
	 

添加配置的解析类:


	 
		 
			 public     class     ClusterGroupEntity     implements     Serializable     {  
		 
			    
		 
			 private     String   serverId  ;  
		 
			 private     String   ip  ;  
		 
			 private     Integer   port  ;  
		 
			 private     Set  <  String  >   clientSet  ;  
		 
			    
		 
			 public     String   getServerId  ()     {  
		 
			 return   serverId  ;  
		 
			 }  
		 
			    
		 
			 public     void   setServerId  (  String   serverId  )     {  
		 
			 this  .  serverId   =   serverId  ;  
		 
			 }  
		 
			    
		 
			 public     String   getIp  ()     {  
		 
			 return   ip  ;  
		 
			 }  
		 
			    
		 
			 public     void   setIp  (  String   ip  )     {  
		 
			 this  .  ip   =   ip  ;  
		 
			 }  
		 
			    
		 
			 public     Integer   getPort  ()     {  
		 
			 return   port  ;  
		 
			 }  
		 
			    
		 
			 public     void   setPort  (  Integer   port  )     {  
		 
			 this  .  port   =   port  ;  
		 
			 }  
		 
			    
		 
			 public     Set  <  String  >   getClientSet  ()     {  
		 
			 return   clientSet  ;  
		 
			 }  
		 
			    
		 
			 public     void   setClientSet  (  Set  <  String  >   clientSet  )     {  
		 
			 this  .  clientSet   =   clientSet  ;  
		 
			 }  
		 
			    
		 
			 @Override  
		 
			 public     String   toString  ()     {  
		 
			 return     "ClusterGroupEntity{"     +  
		 
			 "serverId='"     +   serverId   +     '\''     +  
		 
			 ", ip='"     +   ip   +     '\''     +  
		 
			 ", port="     +   port   +  
		 
			 ", clientSet="     +   clientSet   +  
		 
			 '}'  ;  
		 
			 }  
		 
			 }  
	 

在Nacos中添加动态规则配置,以及token server与token client的配置:

DataId:cloudalibaba-sentinel-clusterServer-flow-rules Group:SENTINEL_GROUP 配置内容(json格式):


	 
		 
			 [  
		 
			 {  
		 
			 "resource"     :     "/order/{id}"  ,     // 限流的资源名称  
		 
			 "grade"     :     1  ,     // 限流模式为:qps,线程数限流0,qps限流1  
		 
			 "count"     :     20  ,     // 阈值为:20  
		 
			 "clusterMode"     :     true  ,     // 是否是集群模式,集群模式为:true  
		 
			 "clusterConfig"     :     {  
		 
			 "flowId"     :     111  ,     // 全局唯一id  
		 
			 "thresholdType"     :     1  ,     // 阈值模式为:全局阈值,0是单机均摊,1是全局阀值  
		 
			 "fallbackToLocalWhenFail"     :     true     // 在 client 连接失败或通信失败时,是否退化到本地的限流模式  
		 
			 }  
		 
			 }  
		 
			 ]  
	 

DataId:cloudalibaba-sentinel-clusterServer-cluster-client-config Group:SENTINEL_GROUP 配置内容(json格式):


	 
		 
			 {  
		 
			 "requestTimeout"  :     20  
		 
			 }  
	 

DataId:cloudalibaba-sentinel-clusterServer-cluster-map Group:SENTINEL_GROUP 配置内容(json格式):


	 
		 
			 [{  
		 
			 "clientSet"  :     [  "10.133.40.30@8721"  ,     "10.133.40.30@8722"  ],  
		 
			 "ip"  :     "10.133.40.30"  ,  
		 
			 "serverId"  :     "10.133.40.30@8720"  ,  
		 
			 "port"  :     18730     //这个端口是token server通信的端口  
		 
			 }]  
	 

重新启动服务,并访问接口,我们可以看到流控规则与集群流控都自动配置完成。我们需要测试,我们集群流控是否已经生效。

不断执行以下命令:


	 
		 
			 ab   -  n   100     -  c   50   http  :  //localhost:9091/order/1  
		 
			 ab   -  n   100     -  c   50   http  :  //localhost:9092/order/3  
		 
			 ab   -  n   100     -  c   50   http  :  //localhost:9093/order/1  
	 

测试效果图:

我们从实时监控图上可以看出,资源名为 /order/{id} ,整个集群的QPS为20,跟我们的配置是一样的。当作为token server的机器挂掉后,集群限流会退化到 local 模式的限流,即在本地按照单机阈值执行限流检查。

Token Server 分配配置:

上面这张图可以很好帮忙我们解释嵌入模式的具体实现。通过配置信息解析,管理我们的token server与token client。

适用范围:

嵌入模式适合某个应用集群内部的流控。由于隔离性不佳,token server会影响应用本身,需要限制 token server 的总QPS。

独立模式部署

独立模式相对于嵌入模式而言就是将token server与应用隔离,进行独立部署。将嵌入模式中token server和token client分离,分别进行配置。我们只需要将 InitFunc 实现类进行拆分。

token server的nacos配置

server的名称空间配置,(集群的namespace或客户端项目名)如下:

DataId:cluster-server-namespace-set Group:SENTINEL_ALONE_GROUP 配置内容(json格式):


	 
		 
			 [  
		 
			 "cloudalibaba-sentinel-cluster-client-alone"  
		 
			 ]  
	 

server的通信端口配置,如下:

DataId:cluster-server-transport-config Group:SENTINEL_ALONE_GROUP 配置内容(json格式):


	 
		 
			 {  
		 
			 "idleSecods"  :  600  ,  
		 
			 "port"  :     18730  
		 
			 }  
	 

Token sever的流控限制配置,如下:

DataId:cluster-server-flow-config Group:SENTINEL_ALONE_GROUP 配置内容(json格式):


	 
		 
			 {  
		 
			 "exceedCount"  :  1.0  ,  
		 
			 "maxAllowedQps"  :  20000  ,  
		 
			 "namespace"  :  "cloudalibaba-sentinel-clust   编程客栈   er-client-alone"  
		 
			 }  
	 

token server的host地址与端口号配置,如下:

DataId: cluster-server-config Group:SENTINEL_ALONE_GROUP 配置内容(json格式):


	 
		 
			 {  
		 
			 "serverHost"  :     "10.133.40.30"  ,  
		 
			 "serverPort"  :     18730  
		 
			 }  
	 

token server的InitFunc类:


	 
		 
			 /**  
		 
			 * @PROJECT_NAME: SpringCloud-Learning  
		 
			 * @USER: yuliang  
		 
			 * @DESCRIPTION:  
		 
			 * @DATE: 2021-04-01 10:01  
		 
			 */  
		 
			 public     class     ClusterServerInitFunc     implements     InitFunc     {  
		 
			    
		 
			 //nacos集群地址  
		 
			 private     final     String   remoteAddress   =     "localhost:8848"  ;  
		 
			 //配置的分组名称  
		 
			 private     final     String   groupId   =     "SENTINEL_ALONE_GROUP"  ;  
		 
			    
		 
			 //配置的dataId  
		 
			 private     final     String   namespaceSetDataId   =     "cluster-server-namespace-set"  ;  
		 
			 private     final     String   serverTransportDataId   =     "cluster-server-transport-config"  ;  
		 
			 private     final     String   serverFlowDataId   =     "cluster-server-flow-config"  ;  
		 
			    
		 
			 @Override  
		 
			 public     void   init  ()     {  
		 
			    
		 
			 //监听特定namespace(集群的namespace或客户端项目名)下的集群限流规则  
		 
			 initPropertySupplier  ();  
		 
			 // 设置tokenServer管辖的作用域(即管理哪些应用)  
		 
			 initTokenServerNameSpaces  ();  
		 
			    
		 
			 // Server transport configuration data source.  
		 
			 //Server端配置  
		 
			 initServerTransportConfig  ();  
		 
			    
		 
			 // 初始化最大qps  
		 
			 initServerFlowConfig  ();  
		 
			    
		 
			 //初始化服务器状态  
		 
			 initStateProperty  ();  
		 
			    
		 
			 }  
		 
			    
		 
			 private     void   initPropertySupplier  (){  
		 
			    
		 
			 // Register cluster flow rule property supplier which creates data source by namespace.  
		 
			 // Flow rule dataId format: ${namespace}-flow-rules  
		 
			 ClusterFlowRuleManager  .  setPropertySupplier  (  namespace     ->     {  
		 
			 ReadableDataSource  <  String  ,     List  <  FlowRule  >>   ds   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 namespace     +     Constants  .  FLOW_POSTFIX  ,  
		 
			 source   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  List  <  FlowRule  >>()     {}));  
		 
			 return   ds  .  getProperty  ();  
		 
			 });  
		 
			 // Register cluster parameter flow rule property supplier which creates data source by namespace.  
		 
			 ClusterParamFlowRuleManager  .  setPropertySupplier  (  namespace     ->     {  
		 
			 ReadableDataSource  <  String  ,     List  <  ParamFlowRule  >>   ds   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 namespace     +     Constants  .  PARAM_FLOW_POSTFIX  ,   source   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  List  <  ParamFlowRule  >>()     {}));  
		 
			 return   ds  .  getProperty  ();  
		 
			 });  
		 
			    
		 
			 }  
		 
			    
		 
			    
		 
			 private     void   initTokenServerNameSpaces  (){  
		 
			 // Server namespace set (scope) data source.  
		 
			 ReadableDataSource  <  String  ,     Set  <  String  >>   namespaceDs   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 namespaceSetDataId  ,   source   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  Set  <  String  >>()     {}));  
		 
			 ClusterServerConfigManager  .  registerNamespaceSetProperty  (  namespaceDs  .  getProperty  ());  
		 
			 }  
		 
			    
		 
			 private     void   initServerTransportConfig  (){  
		 
			 // Server transport configuration data source.  
		 
			 ReadableDataSource  <  String  ,     ServerTransportConfig  >   transportConfigDs   =     new     NacosDataSource  <>(  remoteAddress  ,  
		 
			 groupId  ,   serverTransportDataId  ,  
		 
			 source   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  ServerTransportConfig  >()     {}));  
		 
			 ClusterServerConfigManager  .  registerServerTransportProperty  (  transportConfigDs  .  getProperty  ());  
		 
			 }  
		 
			    
		 
			    
		 
			 private     void   initServerFlowConfig  (){  
		 
			    
		 
			 // Server namespace set (scope) data source.  
		 
			 ReadableDataSource  <  String  ,     ServerFlowConfig  >   serverFlowConfig   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 serverFlowDataId  ,   source   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  ServerFlowConfig  >()     {}));  
		 
			    
		 
			 ClusterServerConfigManager  .  registerGlobalServerFlowProperty  (  serverFlowConfig  .  getProperty  ());  
		 
			 }  
		 
			    
		 
			 private     void   initStateProperty  ()     {  
		 
			 ClusterStateManager  .  applyState  (  ClusterStateManager  .  CLUSTER_SERVER  );  
		 
			    
		 
			 }  
		 
			 }  
	 

token client的nacos配置

客户端请求超时配置,如下:

DataId:cluster-client-config Group:SENTINEL_ALONE_GROUP 配置内容(json格式):


	 
		 
			 {  
		 
			 "requestTimeout"  :     20  
		 
			 }  
	 

流控限流配置,如下:

DataId: cloudalibaba-sentinel-cluster-client-alone-flow-rules Group:SENTINEL_ALONE_GROUP 配置内容(json格式):


	 
		 
			 [  
		 
			 {  
		 
			 "resource"     :     "/order/{id}"  ,     // 限流的资源名称  
		 
			 "grade"     :     1  ,     // 限流模式为:qps  
		 
			 "count"     :     30  ,     // 阈值为:30  
		 
			 "clusterMode"     :     true  ,     // 集群模式为:true  
		 
			 "clusterConfig"     :     {  
		 
			 "flowId"     :     111  ,     // 全局唯一id  
		 
			 "thresholdType"     :     1  ,     // 阈值模式为:全局阈值  
		 
			 "fallbackToLocalWhenFail"     :     true     // 在 client 连接失败或通信失败时,是否退化到本地的限流模式  
		 
			 }  
		 
			 }  
		 
			 ]  
	 

热点限流配置,如下:

DataId:cloudalibaba-sentinel-cluster-client-alone-param-rules Group:SENTINEL_ALONE_GROUP 配置内容(json格式):


	 
		 
			 [  
		 
			 {  
		 
			 "resource"     :     "order"  ,     // 限流的资源名称  
		 
			 "paramIdx"     :     1  ,     //参数索引  
		 
			 "grade"     :     1  ,     // 限流模式为:qps  
		 
			 "count"     :     10  ,     // 阈值为:10  
		 
			 "clusterMode"     :     true  ,     // 集群模式为:true  
		 
			 "clusterConfig"     :     {  
		 
			 "flowId"     :     121  ,     // 全局唯一id  
		 
			 "thresholdType"     :     1  ,     // 阈值模式为:全局阈值  
		 
			 "fallbackToLocalWhenFail"     :     true     // 在 client 连接失败或通信失败时,是否退化到本地的限流模式  
		 
			 },  
		 
			 "paramFlowItemList"  :[     //索引为1的参数值为hot时,接口阈值为50,其他值均为10  
		 
			 {  
		 
			 object  :     "hot"  ,  
		 
			 count  :     50  ,  
		 
			 classType  :     "  java  .lang.String"  
		 
			 }  
		 
			 ]  
		 
			 }  
		 
			 ]  
	 

Token client的InitFunc类:


	 
		 
			 /**  
		 
			 * @PROJECT_NAME: SpringCloud-Learning  
		 
			 * @USER: yuliang  
		 
			 * @DESCRIPTION:  
		 
			 * @DATE: 2021-04-01 17:47  
		 
			 */  
		 
			 public     class     ClusterClientInitFunc     implements     InitFunc     {  
		 
			    
		 
			 //项目名称  
		 
			 private     static     final     String   APP_NAME   =     AppNameUtil  .  getAppName  ();  
		 
			 //nacos集群地址  
		 
			 private     final     String   remoteAddress   =     "localhost:8848"  ;  
		 
			 //nacos配置的分组名称  
		 
			 private     final     String   groupId   =     "SENTINEL_ALONE_GROUP"  ;  
		 
			    
		 
			 //项目名称 + Constants的配置名称,组成配置的dataID  
		 
			 private     final     String   flowDataId   =   APP_NAME   +     Constants  .  FLOW_POSTFIX  ;  
		 
			 private     final     String   paramDataId   =   APP_NAME   +     Constants  .  PARAM_FLOW_POSTFIX  ;  
		 
			 private     final     String   configDataId   =     "cluster-client-config"  ;  
		 
			 private     final     String   serverDataId   =     "cluster-server-config"  ;  
		 
			    
		 
			    
		 
			 @Override  
		 
			 public     void   init  ()     throws     Exception     {  
		 
			    
		 
			 // Register client dynamic rule data source.  
		 
			 //客户端,动态数据源的方式配置sentinel的流量控制和热点参数限流的规则。  
		 
			 initDynamicRuleProperty  ();  
		 
			    
		 
			 // Register token client related data source.  
		 
			 // Token client common config  
		 
			 // 集群限流客户端的配置属性  
		 
			 initClientConfigProperty  ();  
		 
			 // Token client assign config (e.g. target token server) retrieved from assign map:  
		 
			 //初始化Token客户端  
		 
			 initClientServerAssignProperty  ();  
		 
			    
		 
			 //初始化客户端状态  
		 
			 initStateProperty  ();  
		 
			 }  
		 
			    
		 
			 private     void   initDynamicRuleProperty  ()     {  
		 
			    
		 
			 //流量控制的DataId分别是APP_NAME + Constants.FLOW_POSTFIX;热点参数限流规则的DataId是APP_NAME + Constants.PARAM_FLOW_POSTFIX;  
		 
			    
		 
			 ReadableDataSource  <  String  ,     List  <  FlowRule  >>   ruleSource   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 flowDataId  ,   source   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  List  <  FlowRule  >>()     {}));  
		 
			 FlowRuleManager  .  register2Property  (  ruleSource  .  getProperty  ());  
		 
			    
		 
			 ReadableDataSource  <  String  ,     List  <  ParamFlowRule  >>   paramRuleSource   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 paramDataId  ,   source   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  List  <  ParamFlowRule  >>()     {}));  
		 
			 ParamFlowRuleManager  .  register2Property  (  paramRuleSource  .  getProperty  ());  
		 
			 }  
		 
			    
		 
			 private     void   initClientConfigProperty  ()     {  
		 
			 ReadableDataSource  <  String  ,     ClusterClientConfig  >   clientConfigDs   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 configDataId  ,   source   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  ClusterClientConfig  >()     {}));  
		 
			 ClusterClientConfigManager  .  registerClientConfigProperty  (  clientConfigDs  .  getProperty  ());  
		 
			 }  
		 
			    
		 
			 private     void   initClientServerAssignProperty  ()     {  
		 
			 ReadableDataSource  <  String  ,     ClusterClientAssignConfig  >   clientAssignDs   =     new     NacosDataSource  <>(  remoteAddress  ,   groupId  ,  
		 
			 serverDataId  ,   source   ->   JSON  .  parseObject  (  source  ,     new     TypeReference  <  ClusterClientAssignConfig  >()     {}));  
		 
			 ClusterClientConfigManager  .  registerServerAssignProperty  (  clientAssignDs  .  getProperty  ());  
		 
			 }  
		 
			    
		 
			 private     void   initStateProperty  ()     {  
		 
			 ClusterStateManager  .  applyState  (  ClusterStateManager  .  CLUSTER_CLIENT  );  
		 
			    
		 
			 }  
		 
			 }  
	 

核心的代码与配置,如上所示,其他代码,可以访问:


	 
		 
			 <module>  cloudalibaba-sentinel-cluster-server-alone9092  </module>  
		 
			 <module>  cloudalibaba-sentinel-cluster-client-alone9093  </module>  
	 

测试:

启动cloudalibaba-sentinel-cluster-server-alone9092,我们启动两个实例,模拟集群(可以启动多个):


	 
		 
			 -  Dserver  .  port  =  9092     -  Dcsp  .  sentinel  .  log  .  use  .  pid  =  true  
		 
			 -  Dserver  .  port  =  9094     -  Dcsp  .  sentinel  .  log  .  use  .  pid  =  true  
	 

启动cloudalibaba-sentinel-cluster-client-alone9093,我们启动1个实例,模拟server(实现master选举之后,可以启动多个):


	 
		 
			 -  Dserver  .  port  =  9093     -  Dcsp  .  sentinel  .  log  .  use  .  pid  =  true  
	 

不断执行以下命令,进行接口访问测试:


	 
		 
			 ab   -  n   100     -  c   50   http  :  //localhost:9092/order/1  
		 
			 ab   -  n   100     -  c   50   http  :  //localhost:9094/order/3  
	 

我们从实时监控图上可以看出,资源名为 /order/{id} ,整个集群的QPS为30,跟我们的配置是一样的。当作为token server的机器挂掉后,集群限流会退化到 local 模式的限流,即在本地按照单机阈值执行限流检查。

热点限流已经为大家实现了,大家可以自行测试,比较简单,不再累述。


	 
		 
			 ab   -  n   100     -  c   50   http  :  //localhost:9092/hot_order/1/hot  
		 
			 ab   -  n   100     -  c   50   http  :  //localhost:9094/hot_order/1/hot  
		 
			    
		 
			 ab   -  n   100     -  c   50   http  :  //localhost:9092/hot_order/1/nothot  
		 
			 ab   -  n   100     -  c   50   http  :  //localhost:9094/hot_order/1/nothot  
	 

其它

若在生产环境使用集群限流,管控端还需要关注以下的问题:

Token Server 自动管理、调度(分配/选举 Token Server) Token Server 高可用,在某个 server 不可用时自动 failover 到其它机器

 总结

集群流控,有两种模式,嵌入模式和独立模式,个人不建议在业务系统使用集群流控,集群流控可以在网关层做,业务层的话可以使用单机流控,相对来说简单好上手。token server目前存在单点问题,需要个人实现master选举,并修改 cluster-server-config 的IP即可。

代码示例

本文示例读者可以通过查看下面仓库中的项目,如下所示:


	 
		 
			 <module>  cloudalibaba-sentinel-cluster  </module>  
	 

github: https://github测试数据/jiuqiyuliang/SpringCloud-Learning

到此这篇关于Sentinel实现动态配置的集群流控的文章就介绍到这了,更多相关Sentinel集群流控内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/jiuqiyuliang/article/details/115524800

查看更多关于Sentinel实现动态配置的集群流控的方法的详细内容...

  阅读:16次