好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Spring Cloud Ribbon的踩坑记录与原理详析

简介

spring cloud ribbon 是一个基于http和tcp的客服端负载均衡工具,它是基于netflix ribbon实现的。它不像服务注册中心、配置中心、api网关那样独立部署,但是它几乎存在于每个微服务的基础设施中。包括前面的提供的声明式服务调用也是基于该ribbon实现的。理解ribbon对于我们使用spring cloud来讲非常的重要,因为负载均衡是对系统的高可用、网络压力的缓解和处理能力扩容的重要手段之一。在上节的例子中,我们采用了声明式的方式来实现负载均衡。实际上,内部调用维护了一个resttemplate对象,该对象会使用ribbon的自动化配置,同时通过@loadbalanced开启客户端负载均衡。其实resttemplate是spring自己提供的对象,不是新的内容。读者不知道resttemplate可以查看相关的文档。

现象

前两天碰到一个ribbon相关的问题,觉得值得记录一下。表象是对外的接口返回内部异常,这个是封装的统

一错误信息,spring的异常处理器catch到未捕获异常统一返回的信息。因此到日志平台查看实际的异常:

org.springframework.web.client.httpclienterrorexception: 404 null

这里介绍一下背景,出现问题的开放网关,做点事情说白了就是转发对应的请求给后端的服务。这里用到了ribbon去做服务负载均衡、eureka负责服务发现。

这里出现404,首先看了下请求的url以及对应的参数,都没有发现问题,对应的后端服务也没有收到请求。这就比较诡异了,开始怀疑是ribbon或者eureka的缓存导致请求到了错误的ip或端口,但由于日志中打印的是eureka的serviceid而不是实际的ip:port,因此先加了个日志:

?

1

2

3

4

5

6

7

8

9

@slf4j

public class customhttprequestinterceptor implements clienthttprequestinterceptor {

 

  @override

  public clienthttpresponse intercept(httprequest request, byte [] body, clienthttprequestexecution execution) throws ioexception {

   log.info( "request , url:{},method:{}." , request.geturi(), request.getmethod());

   return execution.execute(request, body);

  }

}

这里是通过给resttemplate添加拦截器的方式,但要注意,ribbon也是通过给resttemplate添加拦截器实现的解析serviceid到实际的ip:port,因此需要注意下优先级添加到ribbon的 loadbalancerinterceptor 之后,我这里是通过spring的初始化完成事件的回调中添加的,另外也添加了另一条日志,在catch到这个异常的时候,利用eureka的 discoveryclient#getinstances 获取到当前的实例信息。

之后在测试环境中复现了这个问题,看了下日志,eurek中缓存的实例信息是对的,但是实际调用的确实另外一个服务的地址,从而导致了接口404。

源码解析

从上述的信息中可以知道,问题出在ribbon中,具体的原因后面会说,这里先讲一下spring cloud ribbon的初始化流程。

?

1

2

3

4

5

6

7

8

@configuration

@conditionalonclass ({ iclient. class , resttemplate. class , asyncresttemplate. class , ribbon. class })

@ribbonclients

@autoconfigureafter (name = "org.springframework.cloud.netflix.eureka.eurekaclientautoconfiguration" )

@autoconfigurebefore ({loadbalancerautoconfiguration. class , asyncloadbalancerautoconfiguration. class })

@enableconfigurationproperties ({ribboneagerloadproperties. class , serverintrospectorproperties. class })

public class ribbonautoconfiguration {

}

注意这个注解 @ribbonclients , 如果想要覆盖spring cloud提供的默认ribbon配置就可以使用这个注解,最终的解析类是:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

public class ribbonclientconfigurationregistrar implements importbeandefinitionregistrar {

 

  @override

  public void registerbeandefinitions(annotationmetadata metadata,

  beandefinitionregistry registry) {

  map<string, object> attrs = metadata.getannotationattributes(

  ribbonclients. class .getname(), true );

  if (attrs != null && attrs.containskey( "value" )) {

  annotationattributes[] clients = (annotationattributes[]) attrs.get( "value" );

  for (annotationattributes client : clients) {

  registerclientconfiguration(registry, getclientname(client),

   client.get( "configuration" ));

  }

  }

  if (attrs != null && attrs.containskey( "defaultconfiguration" )) {

  string name;

  if (metadata.hasenclosingclass()) {

  name = "default." + metadata.getenclosingclassname();

  } else {

  name = "default." + metadata.getclassname();

  }

  registerclientconfiguration(registry, name,

   attrs.get( "defaultconfiguration" ));

  }

  map<string, object> client = metadata.getannotationattributes(

  ribbonclient. class .getname(), true );

  string name = getclientname(client);

  if (name != null ) {

  registerclientconfiguration(registry, name, client.get( "configuration" ));

  }

  }

 

  private string getclientname(map<string, object> client) {

  if (client == null ) {

  return null ;

  }

  string value = (string) client.get( "value" );

  if (!stringutils.hastext(value)) {

  value = (string) client.get( "name" );

  }

  if (stringutils.hastext(value)) {

  return value;

  }

  throw new illegalstateexception(

  "either 'name' or 'value' must be provided in @ribbonclient" );

  }

 

  private void registerclientconfiguration(beandefinitionregistry registry,

  object name, object configuration) {

  beandefinitionbuilder builder = beandefinitionbuilder

  .genericbeandefinition(ribbonclientspecification. class );

  builder.addconstructorargvalue(name);

  builder.addconstructorargvalue(configuration);

  registry.registerbeandefinition(name + ".ribbonclientspecification" ,

  builder.getbeandefinition());

  }

}

atrrs包含defaultconfiguration,因此会注册ribbonclientspecification类型的bean,注意名称以 default. 开头,类型是ribbonautoconfiguration,注意上面说的ribbonautoconfiguration被@ribbonclients修饰。

然后再回到上面的源码:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

public class ribbonautoconfiguration {

  

 

   //上文中会解析被@ribbonclients注解修饰的类,然后注册类型为ribbonclientspecification的bean。

   //主要有两个: ribbonautoconfiguration、ribboneurekaautoconfiguration

  @autowired (required = false )

  private list<ribbonclientspecification> configurations = new arraylist<>();

  

  @bean

  public springclientfactory springclientfactory() {

     //初始化springclientfactory,并将上面的配置注入进去,这段很重要。

  springclientfactory factory = new springclientfactory();

  factory.setconfigurations( this .configurations);

  return factory;

  }

    //其他的都是提供一些默认的bean配置

 

  @bean

  @conditionalonmissingbean (loadbalancerclient. class )

  public loadbalancerclient loadbalancerclient() {

  return new ribbonloadbalancerclient(springclientfactory());

  }

 

  @bean

  @conditionalonclass (name = "org.springframework.retry.support.retrytemplate" )

  @conditionalonmissingbean

  public loadbalancedretrypolicyfactory loadbalancedretrypolicyfactory(springclientfactory clientfactory) {

  return new ribbonloadbalancedretrypolicyfactory(clientfactory);

  }

 

  @bean

  @conditionalonmissingclass (value = "org.springframework.retry.support.retrytemplate" )

  @conditionalonmissingbean

  public loadbalancedretrypolicyfactory neverretrypolicyfactory() {

  return new loadbalancedretrypolicyfactory.neverretryfactory();

  }

 

  @bean

  @conditionalonclass (name = "org.springframework.retry.support.retrytemplate" )

  @conditionalonmissingbean

  public loadbalancedbackoffpolicyfactory loadbalancedbackoffpolicyfactory() {

  return new loadbalancedbackoffpolicyfactory.nobackoffpolicyfactory();

  }

 

  @bean

  @conditionalonclass (name = "org.springframework.retry.support.retrytemplate" )

  @conditionalonmissingbean

  public loadbalancedretrylistenerfactory loadbalancedretrylistenerfactory() {

  return new loadbalancedretrylistenerfactory.defaultretrylistenerfactory();

  }

 

  @bean

  @conditionalonmissingbean

  public propertiesfactory propertiesfactory() {

  return new propertiesfactory();

  }

 

  @bean

  @conditionalonproperty (value = "ribbon.eager-load.enabled" , matchifmissing = false )

  public ribbonapplicationcontextinitializer ribbonapplicationcontextinitializer() {

  return new ribbonapplicationcontextinitializer(springclientfactory(),

  ribboneagerloadproperties.getclients());

  }

 

  @configuration

  @conditionalonclass (httprequest. class )

  @conditionalonribbonrestclient

  protected static class ribbonclientconfig {

 

  @autowired

  private springclientfactory springclientfactory;

 

  @bean

  public resttemplatecustomizer resttemplatecustomizer(

  final ribbonclienthttprequestfactory ribbonclienthttprequestfactory) {

  return new resttemplatecustomizer() {

  @override

  public void customize(resttemplate resttemplate) {

   resttemplate.setrequestfactory(ribbonclienthttprequestfactory);

  }

  };

  }

 

  @bean

  public ribbonclienthttprequestfactory ribbonclienthttprequestfactory() {

  return new ribbonclienthttprequestfactory( this .springclientfactory);

  }

  }

 

  //todo: support for autoconfiguring restemplate to use apache http client or okhttp

 

  @target ({ elementtype.type, elementtype.method })

  @retention (retentionpolicy.runtime)

  @documented

  @conditional (onribbonrestclientcondition. class )

  @interface conditionalonribbonrestclient { }

 

  private static class onribbonrestclientcondition extends anynestedcondition {

  public onribbonrestclientcondition() {

  super (configurationphase.register_bean);

  }

 

  @deprecated //remove in edgware"

  @conditionalonproperty ( "ribbon.http.client.enabled" )

  static class zuulproperty {}

 

  @conditionalonproperty ( "ribbon.restclient.enabled" )

  static class ribbonproperty {}

  }

}

注意这里的springclientfactory, ribbon默认情况下,每个eureka的serviceid(服务),都会分配自己独立的spring的上下文,即applicationcontext, 然后这个上下文中包含了必要的一些bean,比如: iloadbalancer 、 serverlistfilter 等。而spring cloud默认是使用resttemplate封装了ribbon的调用,核心是通过一个拦截器:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

@bean

  @conditionalonmissingbean

  public resttemplatecustomizer resttemplatecustomizer(

  final loadbalancerinterceptor loadbalancerinterceptor) {

  return new resttemplatecustomizer() {

  @override

  public void customize(resttemplate resttemplate) {

   list<clienthttprequestinterceptor> list = new arraylist<>(

   resttemplate.getinterceptors());

   list.add(loadbalancerinterceptor);

   resttemplate.setinterceptors(list);

  }

  };

  }

因此核心是通过这个拦截器实现的负载均衡:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

public class loadbalancerinterceptor implements clienthttprequestinterceptor {

 

  private loadbalancerclient loadbalancer;

  private loadbalancerrequestfactory requestfactory;

 

  @override

  public clienthttpresponse intercept( final httprequest request, final byte [] body,

  final clienthttprequestexecution execution) throws ioexception {

  final uri originaluri = request.geturi(); //这里传入的url是解析之前的,即http://serviceid/服务地址的形式

  string servicename = originaluri.gethost(); //解析拿到对应的serviceid

  assert .state(servicename != null , "request uri does not contain a valid hostname: " + originaluri);

  return this .loadbalancer.execute(servicename, requestfactory.createrequest(request, body, execution));

  }

}

然后将请求转发给loadbalancerclient:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

public class ribbonloadbalancerclient implements loadbalancerclient {

@override

  public <t> t execute(string serviceid, loadbalancerrequest<t> request) throws ioexception {

  iloadbalancer loadbalancer = getloadbalancer(serviceid); //获取对应的loadbalancer

  server server = getserver(loadbalancer); //获取服务器,这里会执行对应的分流策略,比如轮训

     //、随机等

  if (server == null ) {

  throw new illegalstateexception( "no instances available for " + serviceid);

  }

  ribbonserver ribbonserver = new ribbonserver(serviceid, server, issecure(server,

  serviceid), serverintrospector(serviceid).getmetadata(server));

 

  return execute(serviceid, ribbonserver, request);

  }

}

而这里的loadbalancer是通过上文中提到的springclientfactory获取到的,这里会初始化一个新的spring上下文,然后将ribbon默认的配置类,比如说: ribbonautoconfiguration 、 ribboneurekaautoconfiguration 等添加进去, 然后将当前spring的上下文设置为parent,再调用refresh方法进行初始化。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

public class springclientfactory extends namedcontextfactory<ribbonclientspecification> {

  protected annotationconfigapplicationcontext createcontext(string name) {

  annotationconfigapplicationcontext context = new annotationconfigapplicationcontext();

  if ( this .configurations.containskey(name)) {

  for ( class <?> configuration : this .configurations.get(name)

   .getconfiguration()) {

  context.register(configuration);

  }

  }

  for (map.entry<string, c> entry : this .configurations.entryset()) {

  if (entry.getkey().startswith( "default." )) {

  for ( class <?> configuration : entry.getvalue().getconfiguration()) {

   context.register(configuration);

  }

  }

  }

  context.register(propertyplaceholderautoconfiguration. class ,

  this .defaultconfigtype);

  context.getenvironment().getpropertysources().addfirst( new mappropertysource(

  this .propertysourcename,

  collections.<string, object> singletonmap( this .propertyname, name)));

  if ( this .parent != null ) {

  // uses environment from parent as well as beans

  context.setparent( this .parent);

  }

  context.refresh();

  return context;

  }

}

最核心的就在这一段,也就是说对于每一个不同的serviceid来说,都拥有一个独立的spring上下文,并且在第一次调用这个服务的时候,会初始化ribbon相关的所有bean, 如果不存在 才回去父context中去找。

再回到上文中根据分流策略获取实际的ip:port的代码段:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

public class ribbonloadbalancerclient implements loadbalancerclient {

@override

  public <t> t execute(string serviceid, loadbalancerrequest<t> request) throws ioexception {

  iloadbalancer loadbalancer = getloadbalancer(serviceid); //获取对应的loadbalancer

  server server = getserver(loadbalancer); //获取服务器,这里会执行对应的分流策略,比如轮训

     //、随机等

  if (server == null ) {

  throw new illegalstateexception( "no instances available for " + serviceid);

  }

  ribbonserver ribbonserver = new ribbonserver(serviceid, server, issecure(server,

  serviceid), serverintrospector(serviceid).getmetadata(server));

 

  return execute(serviceid, ribbonserver, request);

  }

}

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

protected server getserver(iloadbalancer loadbalancer) {

  if (loadbalancer == null ) {

  return null ;

  }

     // 选择对应的服务器

  return loadbalancer.chooseserver( "default" ); // todo: better handling of key

  }

public class zoneawareloadbalancer<t extends server> extends dynamicserverlistloadbalancer<t> {

@override

  public server chooseserver(object key) {

   if (!enabled.get() || getloadbalancerstats().getavailablezones().size() <= 1 ) {

    logger.debug( "zone aware logic disabled or there is only one zone" );

    return super .chooseserver(key); //默认不配置可用区,走的是这段

   }

   server server = null ;

   try {

    loadbalancerstats lbstats = getloadbalancerstats();

    map<string, zonesnapshot> zonesnapshot = zoneavoidancerule.createsnapshot(lbstats);

    logger.debug( "zone snapshots: {}" , zonesnapshot);

    if (triggeringload == null ) {

     triggeringload = dynamicpropertyfactory.getinstance().getdoubleproperty(

       "zoneawareniwsdiscoveryloadbalancer." + this .getname() + ".triggeringloadperserverthreshold" , 0 .2d);

    }

 

    if (triggeringblackoutpercentage == null ) {

     triggeringblackoutpercentage = dynamicpropertyfactory.getinstance().getdoubleproperty(

       "zoneawareniwsdiscoveryloadbalancer." + this .getname() + ".avoidzonewithblackoutpercetage" , 0 .99999d);

    }

    set<string> availablezones = zoneavoidancerule.getavailablezones(zonesnapshot, triggeringload.get(), triggeringblackoutpercentage.get());

    logger.debug( "available zones: {}" , availablezones);

    if (availablezones != null && availablezones.size() < zonesnapshot.keyset().size()) {

     string zone = zoneavoidancerule.randomchoosezone(zonesnapshot, availablezones);

     logger.debug( "zone chosen: {}" , zone);

     if (zone != null ) {

      baseloadbalancer zoneloadbalancer = getloadbalancer(zone);

      server = zoneloadbalancer.chooseserver(key);

     }

    }

   } catch (exception e) {

    logger.error( "error choosing server using zone aware logic for load balancer={}" , name, e);

   }

   if (server != null ) {

    return server;

   } else {

    logger.debug( "zone avoidance logic is not invoked." );

    return super .chooseserver(key);

   }

  }

 

  //实际走到的方法

  public server chooseserver(object key) {

   if (counter == null ) {

    counter = createcounter();

   }

   counter.increment();

   if (rule == null ) {

    return null ;

   } else {

    try {

     return rule.choose(key);

    } catch (exception e) {

     logger.warn( "loadbalancer [{}]: error choosing server for key {}" , name, key, e);

     return null ;

    }

   }

  }

}

也就是说最终会调用 irule 选择到一个节点,这里支持很多策略,比如随机、轮训、响应时间权重等:

?

1

2

3

4

5

6

7

8

public interface irule{

 

  public server choose(object key);

 

  public void setloadbalancer(iloadbalancer lb);

 

  public iloadbalancer getloadbalancer();

}

这里的loadbalancer是在baseloadbalancer的构造器中设置的,上文说过,对于每一个serviceid服务来说,当第一次调用的时候会初始化对应的spring上下文,而这个上下文中包含了所有ribbon相关的bean,其中就包括iloadbalancer、irule。

原因

通过跟踪堆栈,发现不同的serviceid,irule是同一个, 而上文说过,每个serviceid都拥有自己独立的上下文,包括独立的loadbalancer、irule,而irule是同一个,因此怀疑是这个bean是通过parent context获取到的,换句话说应用自己定义了一个这样的bean。查看代码果然如此。

这样就会导致一个问题,irule是共享的,而其他bean是隔离开的,因此后面的serviceid初始化的时候,会修改这个irule的loadbalancer, 导致之前的服务获取到的实例信息是错误的,从而导致接口404。

?

1

2

3

4

5

6

7

8

9

10

public class baseloadbalancer extends abstractloadbalancer implements

   primeconnections.primeconnectionlistener, iclientconfigaware {

  public baseloadbalancer() {

   this .name = default_name;

   this .ping = null ;

   setrule(default_rule); // 这里会设置irule的loadbalancer

   setuppingtask();

   lbstats = new loadbalancerstats(default_name);

  }

}

解决方案

解决方法也很简单,最简单就将这个自定义的irule的bean干掉,另外更标准的做法是使用ribbonclients注解,具体做法可以参考文档。

总结

核心原因其实还是对于spring cloud的理解不够深刻,用法有错误,导致出现了一些比较诡异的问题。对于自己使用的组件、框架、甚至于每一个注解,都要了解其原理,能够清楚的说清楚这个注解有什么效果,有什么影响,而不是只着眼于解决眼前的问题。

再次声明:代码不是我写的=_=

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。

原文链接:https://github测试数据/aCoder2013/blog/issues/29

查看更多关于Spring Cloud Ribbon的踩坑记录与原理详析的详细内容...

  阅读:12次