好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

一篇带给你Sentinel 流控原理

我们在项目中添加 Spring Cloud Sentinel 依赖添加后 spring-cloud-starter-alibaba-sentinel 在 Spring-Boot 启动的过程中回去初始化 spring.factories 中的配置信息,如:SentinelWebAutoConfiguration 、SentinelAutoConfiguration 等配置文件来初始化

再讲代码之前我先声明一下我的版本号sentinel 1.8.0 。后续的所有内容均基于该版本进行

@ResoureSetinel 工作原理

 

配置 流控 规则我们最简单的方式就是通过 @ResoureSetinel 的方式来管理,该注解可以直接定义流控规则、降级规则。下面是一个简单的使用例子:

@SentinelResource(value =  "ResOrderGet" ,                    fallback =  "fallback" ,                    fallbackClass = SentinelResourceExceptionHandler.class,                    blockHandler =  "blockHandler" ,                    blockHandlerClass = SentinelResourceExceptionHandler.class                   )  @GetMapping( "/order/get/{id}" )  public  CommonResult<StockModel> getStockDetails(@PathVariable  Integer  id) {    StockModel stockModel = new StockModel();    stockModel.setCode( "STOCK==>1000" );    stockModel.setId(id);     return  CommonResult.success(stockModel);  } 

如果大家熟悉 Spring 相关的组件大家都可以想到,这里多半是通过Spring Aop. 的方式来拦截 getStockDetails 方法。我们先看看SentinelAutoConfiguration 配置文件,我们可以找到 SentinelResourceAspect Bean 的定义方法。

@Bean  @ConditionalOnMissingBean  public  SentinelResourceAspect sentinelResourceAspect() {      return  new SentinelResourceAspect();  } 

让后我们再来看看 SentinelResourceAspect 具体是怎么处理的,源码如下:

// 定义 Pointcut  @Pointcut( "@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)" )  public  void sentinelResourceAnnotationPointcut() {  }  // Around 来对被标记 @SentinelResource 注解的方法进行处理  @Around( "sentinelResourceAnnotationPointcut()" )  public  Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {    Method originMethod = resolveMethod(pjp);    // 获取注解信息    SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);    // 获取资源名称    String resourceName = getResourceName(annotation.value(), originMethod);    EntryType entryType = annotation.entryType();     int  resourceType = annotation.resourceType();    Entry entry =  null ;    try {      // 执行 entry      entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());      // 执行业务方法      Object result = pjp.proceed();      // 返回       return  result;    } catch (BlockException ex) {      // 处理 BlockException       return  handleBlockException(pjp, annotation, ex);    } catch (Throwable ex) {      Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();      // The  ignore  list will be checked  first .      if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {        throw ex;      }      if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {        traceException(ex);        // 处理降级         return  handleFallback(pjp, annotation, ex);      }      //  No  fallback  function  can handle the exception, so throw it  out .      throw ex;    }  } 

我们总结一下, @SentinelResource 的执行过程, 首先是通过 Aop 进行拦截,然后通过 SphU.entry 执行对应的流控规则,最后调用业务方法。如果触发流控规则首先处理流控异常 BlockException 然后在判断是否有服务降级的处理,如果有就调用 fallback 方法。通过 handleBlockException 、handleFallback 进行处理。

责任链模式处理流控

 

通过上面的梳理,我们知道对于流控的过程,核心处理方法就是 SphU.entry 。在这个方法中其实主要就是初始化流控 Solt 和执行 Solt. 在这个过程中会对:簇点定义、流量控制、熔断降级、系统白名单等页面功能进行处理。

1. 初始化责任链

 

下面是初始化 Solt 的核心代码在 SphU.entryWithPriority

// 删减部分代码  private Entry entryWithPriority(ResourceWrapper resourceWrapper,  int   count , boolean prioritized, Object... args)    throws BlockException {    // 初始化责任链    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);    Entry e = new CtEntry(resourceWrapper, chain, context);    try {      // 执行 entry      chain.entry(context, resourceWrapper,  null ,  count , prioritized, args);    } catch (BlockException e1) {      e.exit( count , args);      // 异常抛出,让 SentinelResourceAspect.invokeResourceWithSentinel 统一处理      throw e1;    } catch (Throwable e1) {      // This should  not  happen, unless there are errors existing  in  Sentinel internal.      RecordLog.info( "Sentinel unexpected exception" , e1);    }     return  e;  } 

通过 lookProcessChain 方法我逐步的查找,我们可以看到最终的责任链初始化类,默认是 DefaultSlotChainBuilder

 

public  class DefaultSlotChainBuilder implements SlotChainBuilder {      @Override       public  ProcessorSlotChain build() {          ProcessorSlotChain chain = new DefaultProcessorSlotChain();          // Note: the instances  of  ProcessorSlot should be different, since they are  not  stateless.          // 通过 SPI 去加载所有的  ProcessorSlot 实现,通过  Order  排序          List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);           for  (ProcessorSlot slot : sortedSlotList) {              if (!(slot instanceof AbstractLinkedProcessorSlot)) {                  RecordLog.warn( "The ProcessorSlot("  + slot.getClass().getCanonicalName() +  ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain" );                   continue ;              }                        // 添加到 chain 尾部              chain.addLast((AbstractLinkedProcessorSlot<?>) slot);          }           return  chain;      }  } 

2. 责任链的处理过程

 

我们可以通过断点的方式来查看在 sortedSlotList 集合中所有的 solt 顺序如下图所示:

我们可以通过如下的顺序进行逐个的简单的分析一下

NodeSelectorSolt CusterBuilderSolt LogSlot StatisicSlot AuthoritySolt SystemSolts ParamFlowSolt FlowSolt DegradeSlot

对于 Sentinel 的 Slot 流控协作流程可以参考官方给出的文档, 如下图所示:

FlowSolt 流控

 

通过 NodeSelectorSolt、CusterBuilderSolt、StatisicSlot 等一系列的请求数据处理,在 FlowSolt会进入流控规则,所有的 Solt 都会执行 entry 方法, 如下所示

// FlowSolt 的 entry 方法  @Override  public  void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,  int   count ,                    boolean prioritized, Object... args) throws Throwable {    // 检查流量    checkFlow(resourceWrapper, context, node,  count , prioritized);    fireEntry(context, resourceWrapper, node,  count , prioritized, args);  } 

在后续的流程中,会执进行判断具体的流控策略,默认是快速失败,会执行 DefaultController 方法。

// DefaultController  @Override  public  boolean canPass(Node node,  int  acquireCount, boolean prioritized) {    // 当前资源的调用次数     int  curCount = avgUsedTokens(node);    // 当前资源的调用次数 + 1 > 当前阈值    if (curCount + acquireCount >  count ) {      // 删减比分代码      // 不通过       return   false ;    }    // 通过     return   true ;  }  private  int  avgUsedTokens(Node node) {    if (node ==  null ) {       return  DEFAULT_AVG_USED_TOKENS;    }     return  grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : ( int )(node.passQps());  } 

如果上面返回不通过会回到,那么会抛出 FlowException

public  void checkFlow( Function <String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,                        Context context, DefaultNode node,  int   count , boolean prioritized) throws BlockException {    if (ruleProvider ==  null  || resource ==  null ) {       return ;    }    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());    if (rules !=  null ) {       for  (FlowRule  rule  : rules) {        if (!canPassCheck( rule , context, node,  count , prioritized)) {          // 流控规则不通过,会抛出 FlowException          throw new FlowException( rule .getLimitApp(),  rule );        }      }    }  } 

然后会在 StatisticSlot 中增加统计信息, 最后会抛出给 SentinelResourceAspect 进行处理,完成流控功能。我们再来看看这个异常信息,如果是BlockException 异常,会进入 handleBlockException 方法处理, 如果是其他的业务异常首先会判断是否有配置 fallback 处理如果有,就调用 handleFallback 没有就继续往外抛,至此完成流控功能

try {    entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());    Object result = pjp.proceed();     return  result;  } catch (BlockException ex) {     return  handleBlockException(pjp, annotation, ex);  } catch (Throwable ex) {    Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();    // The  ignore  list will be checked  first .    if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {      throw ex;    }    if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {      traceException(ex);       return  handleFallback(pjp, annotation, ex);    }    //  No  fallback  function  can handle the exception, so throw it  out .    throw ex;  } 

DegradeSlot 降级

 

断路器的作用是当某些资源一直出现故障时,将触发断路器。断路器不会继续访问已经发生故障的资源,而是拦截请求并返回故障信号。

Sentinel 在 DegradeSlot 这个 Slot 中实现了熔断降级的功能,它有三个状态 OPEN 、HALF_OPEN、CLOSED 以ResponseTimeCircuitBreaker RT 响应时间维度来分析, 断路器工作的过程。下面是一个标准断路器的工作流程:

在 Sentinel 实现的源码过程如下图所示:

Sentinel 通过 Web 拦截器

 

Sentinel 在默认情况下, 不使用 @ResourceSentinel 注解实现流控的时候, Sentinel 通过拦截器进行流控实现的。初始化类在 SentinelWebAutoConfiguration 它实现了 WebMvcConfigurer 接口,在 sentinelWebInterceptor 方法初始化 SentinelWebInterceptor 等 Bean。

@Bean  @ConditionalOnProperty( name  =  "spring.cloud.sentinel.filter.enabled" ,                         matchIfMissing =  true )  public  SentinelWebInterceptor sentinelWebInterceptor(    SentinelWebMvcConfig sentinelWebMvcConfig) {     return  new SentinelWebInterceptor(sentinelWebMvcConfig);  } 

我们在 SentinelWebInterceptor 的核心方法 preHandle 中处理,这里面我们又可以看到 SphU.entry 熟悉的过程调用流控的责任链。由于逻辑都类似,此处不再多说。代码如下:

public  boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)    throws Exception {    try {      String resourceName = getResourceName(request);      if (StringUtil.isEmpty(resourceName)) {         return   true ;      }      if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {         return   true ;      }      // Parse the request origin using registered origin parser.      String origin = parseOrigin(request);      String contextName = getContextName(request);      ContextUtil.enter(contextName, origin);      Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType. IN );      request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);       return   true ;    } catch (BlockException e) {      try {        handleBlockException(request, response, e);      } finally {        ContextUtil.exit();      }       return   false ;    }  } 

参考文档

https://github.com/alibaba/Sentinel/wiki

https://martinfowler.com/bliki/CircuitBreaker.html

原文链接:https://mp.weixin.qq.com/s/88ZnMVwvMKVk_uWTOvK-qg

查看更多关于一篇带给你Sentinel 流控原理的详细内容...

  阅读:43次