好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

elasticsearch集群发现zendiscovery的Ping机制分析

 zenDiscovery实现机制

ping是集群发现的基本手段,通过在网络上广播或者指定ping某些节点获取集群信息,从而可以找到集群的master加入集群。zenDiscovery实现了两种ping机制:广播与单播。本篇将详细分析一些这MulticastZenPing机制的实现为后面的集群发现和master选举做好铺垫。

广播的过程

首先看一下广播(MulticastZenPing),广播的原理很简单,节点启动后向网络发送广播信息,任何收到的节点只要集群名字相同都应该对此广播信息作出回应。这样该节点就获取了集群的相关信息。它定义了一个action:"internal:discovery/zen/multicast"和广播的信息头:INTERNAL_HEADER 。之前说过NettyTransport是cluster通信的基础,但是广播却没有使它。它使用了java的MulticastSocket。这里简单的介绍一下MulticastSocket的使用。它是一个UDP 机制的socket,用来进行多个数据包的广播。它可以帮到一个ip形成一个group,任何MulticastSocket都可以join进来,组内的socket发送的信息会被订阅了改组的所有机器接收到。elasticsearch对其进行了封装形成了MulticastChannel,有兴趣可以参考相关源码。 

首先看一下MulticastZenPing的几个辅助内部类:

它总共定义了4个内部类,这些内部类和它一起完成广播功能。FinalizingPingCollection是一pingresponse的容器,所有的响应都用它来存储。MulticastPingResponseRequestHandler它是response处理类,类似于之前所说的nettytransportHandler,它虽然使用的不是netty,但是它也定义了一个messageReceived的方法,当收到请求时直接返回一个response。

MulticastPingResponse就不用细说了,它就是一个响应类。最后要着重说一下Receiver类,因为广播并不是使用NettyTransport,因此对于消息处理逻辑都在Receiver中。在初始化MulticastZenPing时会将receiver注册进去。

?

1

2

3

4

5

6

7

8

9

10

protected void doStart() throws ElasticsearchException {

         try {

             ....

             multicastChannel = MulticastChannel.getChannel(nodeName(), shared,

                     new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),

                     new Receiver()); //将receiver注册到channel中

         } catch (Throwable t) {

           ....

         }

     }

Receiver类基础了Listener,实现了3个方法,消息经过onMessage方法区分,如果是内部ping则使用handleNodePingRequest方法处理,否则使用handleExternalPingRequest处理,区分方法很简单,就是读取信息都看它是否符合所定义的INTERNAL_HEADER 信息头。

nodeping处理代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

private void handleNodePingRequest( int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {

            ....

             final DiscoveryNodes discoveryNodes = contextProvider.nodes();

             final DiscoveryNode requestingNode = requestingNodeX;

             if (requestingNode.id().equals(discoveryNodes.localNodeId())) {

                 // 自身发出的ping,忽略

                 return ;

             }

         //只接受本集群ping

             if (!requestClusterName.equals(clusterName)) {

             ... return ;

             }

             // 两个client间不需要ping

             if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) { return ;

             }

         //新建一个response

             final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();

             multicastPingResponse.id = id;

             multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());

         //无法连接的情况

             if (!transportService.nodeConnected(requestingNode)) {

                 // do the connect and send on a thread pool

                 threadPool.generic().execute( new Runnable() {

                     @Override

                     public void run() {

                         // connect to the node if possible

                         try {

                             transportService.connectToNode(requestingNode);

                             transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

                                 @Override

                                 public void handleException(TransportException exp) {

                                     logger.warn( "failed to receive confirmation on sent ping response to [{}]" , exp, requestingNode);

                                 }

                             });

                         } catch (Exception e) {

                             if (lifecycle.started()) {

                                 logger.warn( "failed to connect to requesting node {}" , e, requestingNode);

                             }

                         }

                     }

                 });

             } else {

                 transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

                     @Override

                     public void handleException(TransportException exp) {

                         if (lifecycle.started()) {

                             logger.warn( "failed to receive confirmation on sent ping response to [{}]" , exp, requestingNode);

                         }

                     }

                 });

             }

         }

     }

另外的一个方法是处理外部ping信息,处理过程是返回cluster的信息(这种外部ping的具体作用没有研究不是太清楚)。以上是响应MulticastZenPing的过程,收到其它节点的响应信息后它会把本节点及集群的master节点相关信息返回给广播节点。这样广播节点就获知了集群的相关信息。在MulticastZenPing类中还有一个类 MulticastPingResponseRequestHandler,它的作用是广播节点对其它节点对广播信息响应的回应,广播节点的第二次发送信息的过程。它跟其它TransportRequestHandler一样它有messageReceived方法,在启动时注册到transportserver中,只处理一类action:"internal:discovery/zen/multicast"。

ping请求的发送策略

代码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

public void ping( final PingListener listener, final TimeValue timeout) {

        ....

     //产生一个id

         final int id = pingIdGenerator.incrementAndGet();

         try {

             receivedResponses.put(id, new PingCollection());

             sendPingRequest(id); //第一次发送ping请求

             // 等待时间的1/2后再次发送一个请求

             threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2 ), ThreadPool.Names.GENERIC, new AbstractRunnable() {

                 @Override

                 public void onFailure(Throwable t) {

                     logger.warn( "[{}] failed to send second ping request" , t, id);

                     finalizePingCycle(id, listener);

                 }

                 @Override

                 public void doRun() {

                     sendPingRequest(id);

             //再过1/2时间再次发送一个请求

                     threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2 ), ThreadPool.Names.GENERIC, new AbstractRunnable() {

                         @Override

                         public void onFailure(Throwable t) {

                             logger.warn( "[{}] failed to send third ping request" , t, id);

                             finalizePingCycle(id, listener);

                         }

                         @Override

                         public void doRun() {

                             // make one last ping, but finalize as soon as all nodes have responded or a timeout has past

                             PingCollection collection = receivedResponses.get(id);

                             FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);

                             receivedResponses.put(id, finalizingPingCollection);

                             logger.trace( "[{}] sending last pings" , id);

                             sendPingRequest(id);

                 //最后一次发送请求,超时的1/4后

                             threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4 ), ThreadPool.Names.GENERIC, new AbstractRunnable() {

                                 @Override

                                 public void onFailure(Throwable t) {

                                     logger.warn( "[{}] failed to finalize ping" , t, id);

                                 }

                                 @Override

                                 protected void doRun() throws Exception {

                                     finalizePingCycle(id, listener);

                                 }

                             });

                         }

                     });

                 }

             });

         } catch (Exception e) {

             logger.warn( "failed to ping" , e);

             finalizePingCycle(id, listener);

         }

     }

发送过程主要是调用sendPingRequest(id)方法,在该方法中会将id,信息头,版本,本地节点信息一起写入到BytesStreamOutput中然后将其进行广播,这个广播信息会被其它机器上的Receiver接收并处理,并且响应该ping请求。另外一个需要关注的是以上加说明的部分,它通过链时的定期发送请求,在等待时间内可能会发出4次请求,这种发送方式会造成大量的ping请求重复,幸好ping的资源消耗小,但是好处是可以尽可能保证在timeout这个时间段内集群的新增节点都能收到这个ping信息。在单播中也采用了该策略。

总结

广播的过程:广播使用的是jdk的MulticastSocket,在timeout时间内4次发生ping请求,ping请求包括一个id,信息头,本地节点的一些信息;这些信息在其它节点中被接收到交给Receiver处理,Receiver会将集群的master和本机的相关信息通过transport返回给广播节点。广播节点收到这些信息后会理解使用transport返回一个空的response。至此一个广播过程完成。

在节点分布在多个网段时,广播就失效了,因为广播信息不可达。这个时间就需要使用单播去ping指定的节点获取cluster的相关信息。这就是单播的用处。单播使用的是NettyTransport,它会使用跟广播一样的链式请求向指定的节点发送请求。信息的处理方式是之前所介绍的NettyTransport标准的信息处理过程。

以上就是elasticsearch集群发现zendiscovery的Ping机制分析的详细内容,更多关于elasticsearch集群发现zendiscovery Ping的资料请关注其它相关文章!

原文链接:https://www.cnblogs.com/zziawanblog/p/6551549.html

查看更多关于elasticsearch集群发现zendiscovery的Ping机制分析的详细内容...

  阅读:13次