好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

elasticsearch的zenDiscovery和master选举机制原理分析

前言

上一篇 通过 ElectMasterService源码,分析了master选举的原理的大部分内容:master候选节点ID排序保证选举一致性及通过设置最小可见候选节点数目避免brain split。节点排序后选举只能保证局部一致性,如果发生节点接收到了错误的集群状态就会选举出错误的master,因此必须有其它措施来保证选举的一致性。这就是上一篇所提到的第二点:被选举的数量达到一定的数目同时自己也选举自己,这个节点才能成为master。这一点体现在zenDiscovery中,本篇将结合节点的发现过程进一步介绍master选举机制。

节点启动后首先启动join线程,join线程会寻找cluster的master节点,如果集群之前已经启动,并且运行良好,则试图连接集群的master节点,加入集群。否则(集群正在启动)选举master节点,如果自己被选为master,则向集群中其它节点发送一个集群状态更新的task,如果master是其它节点则试图加入该集群。

join的代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

private void innerJoinCluster() {

         DiscoveryNode masterNode = null ;

         final Thread currentThread = Thread.currentThread();

      //一直阻塞直到找到master节点,在集群刚刚启动,或者集群master丢失的情况,这种阻塞能够保证集群一致性

         while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {

             masterNode = findMaster();

         }

       //有可能自己会被选举为master(集群启动,或者加入时正在选举)

       if (clusterService.localNode().equals(masterNode)) {

       //如果本身是master,则需要向其它所有节点发送集群状态更新

             clusterService.submitStateUpdateTask( "zen-disco-join (elected_as_master)" , Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {

                 @Override

                 public ClusterState execute(ClusterState currentState) {

             //选举时错误的,之前的master状态良好,则不更新状态,仍旧使用之前状态。

                     if (currentState.nodes().masterNode() != null ) {

                        return currentState;

                     }

                     DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().localNode().id());

                     // update the fact that we are the master...

                     ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();

                     currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build();

                     // eagerly run reroute to remove dead nodes from routing table

                     RoutingAllocation.Result result = allocationService.reroute(currentState);

                     return ClusterState.builder(currentState).routingResult(result).build();

                 }

                 @Override

                 public void onFailure(String source, Throwable t) {

                     logger.error( "unexpected failure during [{}]" , t, source);

                     joinThreadControl.markThreadAsDoneAndStartNew(currentThread);

                 }

                 @Override

                 public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {

                     if (newState.nodes().localNodeMaster()) {

                         // we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)

                         joinThreadControl.markThreadAsDone(currentThread);

                         nodesFD.updateNodesAndPing(newState); // start the nodes FD

                     } else {

                         // if we're not a master it means another node published a cluster state while we were pinging

                         // make sure we go through another pinging round and actively join it

                         joinThreadControl.markThreadAsDoneAndStartNew(currentThread);

                     }

                     sendInitialStateEventIfNeeded();

                     long count = clusterJoinsCounter.incrementAndGet();

                     logger.trace( "cluster joins counter set to [{}] (elected as master)" , count);

                 }

             });

         } else {

             // 找到的节点不是我,试图连接该master

             final boolean success = joinElectedMaster(masterNode);

             // finalize join through the cluster state update thread

             final DiscoveryNode finalMasterNode = masterNode;

             clusterService.submitStateUpdateTask( "finalize_join (" + masterNode + ")" , new ClusterStateNonMasterUpdateTask() {

                 @Override

                 public ClusterState execute(ClusterState currentState) throws Exception {

                     if (!success) {

                         // failed to join. Try again...

                         joinThreadControl.markThreadAsDoneAndStartNew(currentThread);

                         return currentState;

                     }

                     if (currentState.getNodes().masterNode() == null ) {

                         // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have

                         // a valid master.

                         logger.debug( "no master node is set, despite of join request completing. retrying pings." );

                         joinThreadControl.markThreadAsDoneAndStartNew(currentThread);

                         return currentState;

                     }

                     if (!currentState.getNodes().masterNode().equals(finalMasterNode)) {

                         return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join" );

                     }

                     // Note: we do not have to start master fault detection here because it's set at {@link #handleNewClusterStateFromMaster }

                     // when the first cluster state arrives.

                     joinThreadControl.markThreadAsDone(currentThread);

                     return currentState;

                 }

                 @Override

                 public void onFailure(String source, @Nullable Throwable t) {

                     logger.error( "unexpected error while trying to finalize cluster join" , t);

                     joinThreadControl.markThreadAsDoneAndStartNew(currentThread);

                 }

             });

         }

     }

以上就是join的过程。zenDiscovery在启动时会启动一个join线程,这个线程调用了该方法。同时在节点离开,master丢失等情况下也会重启这一线程仍然运行join方法。

findMaster方法

这个方法体现了master选举的机制。代码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

private DiscoveryNode findMaster() {

       //ping集群中的节点

         ZenPing.PingResponse[] fullPingResponses = pingService.pingAndWait(pingTimeout);

         if (fullPingResponses == null ) { return null ;

         } // 过滤所得到的ping响应,虑除client节点,单纯的data节点

         List<ZenPing.PingResponse> pingResponses = Lists.newArrayList();

         for (ZenPing.PingResponse pingResponse : fullPingResponses) {

             DiscoveryNode node = pingResponse.node();

             if (masterElectionFilterClientNodes && (node.clientNode() || (!node.masterNode() && !node.dataNode()))) {

                 // filter out the client node, which is a client node, or also one that is not data and not master (effectively, client)

             } else if (masterElectionFilterDataNodes && (!node.masterNode() && node.dataNode())) {

                 // filter out data node that is not also master

             } else {

                 pingResponses.add(pingResponse);

             }

         }

        final DiscoveryNode localNode = clusterService.localNode();

         List<DiscoveryNode> pingMasters = newArrayList();

      //获取所有ping响应中的master节点,如果master节点是节点本身则过滤掉。pingMasters列表结果要么为空(本节点是master)要么是同一个节点(出现不同节点则集群出现了问题

不过没关系,后面会进行选举)

         for (ZenPing.PingResponse pingResponse : pingResponses) {

             if (pingResponse.master() != null ) {

                 if (!localNode.equals(pingResponse.master())) {

                     pingMasters.add(pingResponse.master());

                 }

             }

         }

         // nodes discovered during pinging

         Set<DiscoveryNode> activeNodes = Sets.newHashSet();

         // nodes discovered who has previously been part of the cluster and do not ping for the very first time

         Set<DiscoveryNode> joinedOnceActiveNodes = Sets.newHashSet();

         Version minimumPingVersion = localNode.version();

     for (ZenPing.PingResponse pingResponse : pingResponses) {

         activeNodes.add(pingResponse.node());

         minimumPingVersion = Version.smallest(pingResponse.node().version(), minimumPingVersion);

          if (pingResponse.hasJoinedOnce() != null && pingResponse.hasJoinedOnce()) {

           joinedOnceActiveNodes.add(pingResponse.node());

         }

    }

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

//本节点暂时是master也要加入候选节点进行选举

         if (localNode.masterNode()) {

             activeNodes.add(localNode);

             long joinsCounter = clusterJoinsCounter.get();

             if (joinsCounter > 0 ) {

                 logger.trace( "adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})" , joinsCounter);

                 joinedOnceActiveNodes.add(localNode);

             }

         }

       //pingMasters为空,则本节点是master节点,

     if (pingMasters.isEmpty()) {

             if (electMaster.hasEnoughMasterNodes(activeNodes)) { //保证选举数量,说明有足够多的节点选举本节点为master,但是这还不够,本节点还需要再选举一次,如果

          本次选举节点仍旧是自己,那么本节点才能成为master。这里就体现了master选举的第二条原则。

                 DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes);

                 if (master != null ) {

                     return master;

                 }

                 return electMaster.electMaster(activeNodes);

             } else {

                 // if we don't have enough master nodes, we bail, because there are not enough master to elect from

                 logger.trace( "not enough master nodes [{}]" , activeNodes);

                 return null ;

             }

         } else {

         //pingMasters不为空(pingMasters列表中应该都是同一个节点),本节点没有被选举为master,那就接受之前的选举。

             return electMaster.electMaster(pingMasters);

         }

     }

上面的重点部分都做了标注,就不再分析。除了findMaster方法,还有一个方法也体现了master选举,那就是handleMasterGone。下面是它的部分代码,提交master丢失task部分,

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

clusterService.submitStateUpdateTask( "zen-disco-master_failed (" + masterNode + ")" , Priority.IMMEDIATE,  new   ProcessedClusterStateNonMasterUpdateTask() {          

         @Override

             public ClusterState execute(ClusterState currentState) {

                 //获取到当前集群状态下的所有节点

                 DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(currentState.nodes())

                         // make sure the old master node, which has failed, is not part of the nodes we publish

                         .remove(masterNode.id())

                         .masterNodeId( null ).build();

           //rejoin过程仍然是重复findMaster过程

           if (rejoin) {

                     return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "master left (reason = " + reason + ")" );

                 }

           //无法达到选举数量,进行findMaster过程

                 if (!electMaster.hasEnoughMasterNodes(discoveryNodes)) {

                     return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "not enough master nodes after master left (reason = " + reason + ")" );

                 }

           //在当前集群状态下,如果候选节点数量达到预期数量,那么选举出来的节点一定是同一个节点,因为所有的节点看到的集群states是一致的

                 final DiscoveryNode electedMaster = electMaster.electMaster(discoveryNodes); // elect master

                 final DiscoveryNode localNode = currentState.nodes().localNode();

               ....

             }

从以上的代码可以看到master选举节点的应用场景,无论是findMaster还是handlemasterGone,他们都保证了选举一致性。那就是所选节点数量必须要达到一定的数量,否则不能认为选举成功,进入等待环境。如果当前节点被其它节点选举为master,仍然要进行选举一次以保证选举的一致性。这样在保证了选举数量同时对候选节点排序从而保证选举的一致性。

发现和加入集群是zenDiscovery的主要功能,当然它还有一些其它功能,如处理节点离开(handleLeaveRequest),处理master发送的最小clustersates(handleNewClusterStateFromMaster)等功能。这里就不一一介绍,有兴趣请参考相关源码。

总结

本节结合zenDiscovery,分析了master选举的另外一部分内容。同时zenDiscovery是节点发现集群功能的集合,它主要功能是发现(选举)出集群的master节点,并试图加入集群。同时如果 本机是master还会处理节点的离开和节点丢失,如果不是master则会处理来自master的节点状态更新。

以上就是elasticsearch的zenDiscovery和master选举机制原理分析的详细内容,更多关于elasticsearch的zenDiscovery和master选举机制的资料请关注其它相关文章!

原文链接:https://HdhCmsTestcnblogs测试数据/zziawanblog/p/6583107.html

查看更多关于elasticsearch的zenDiscovery和master选举机制原理分析的详细内容...

  阅读:13次