好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

RocketMQ设计之故障规避机制

NameServer 为了简化和客户端通信,发现Broker故障时并不会立即通知客户端。故障规避机制就是用来解决当Broker出现故障, Producer 不能及时感知而导致消息发送失败的问题。默认不开启,如果开启,消息发送失败的时候会将失败的Broker暂时排除在队列选择列表外

MQFaultStrategy类的:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

public class MQFaultStrategy {

    private final static InternalLogger log = ClientLogger.getLog();

    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

 

    private boolean sendLatencyFaultEnable = false ;

 

    private long [] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};

    private long [] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

 

    public long [] getNotAvailableDuration() {

        return notAvailableDuration;

    }

 

    public void setNotAvailableDuration( final long [] notAvailableDuration) {

        this .notAvailableDuration = notAvailableDuration;

    }

 

    public long [] getLatencyMax() {

        return latencyMax;

    }

 

    public void setLatencyMax( final long [] latencyMax) {

        this .latencyMax = latencyMax;

    }

 

    public boolean isSendLatencyFaultEnable() {

        return sendLatencyFaultEnable;

    }

 

    public void setSendLatencyFaultEnable( final boolean sendLatencyFaultEnable) {

        this .sendLatencyFaultEnable = sendLatencyFaultEnable;

    }

 

    public MessageQueue selectOneMessageQueue( final TopicPublishInfo tpInfo, final String lastBrokerName) {

        //是否开启故障延迟机制

        if ( this .sendLatencyFaultEnable) {

            try {

                int index = tpInfo.getSendWhichQueue().getAndIncrement();

                for ( int i = 0 ; i < tpInfo.getMessageQueueList().size(); i++) {

                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();

                    if (pos < 0 )

                        pos = 0 ;

                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);

                    //判断Queue是否可用

                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {

                        if ( null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))

                            return mq;

                    }

                }

 

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();

                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);

                if (writeQueueNums > 0 ) {

                    final MessageQueue mq = tpInfo.selectOneMessageQueue();

                    if (notBestBroker != null ) {

                        mq.setBrokerName(notBestBroker);

                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);

                    }

                    return mq;

                } else {

                    latencyFaultTolerance.remove(notBestBroker);

                }

            } catch (Exception e) {

                log.error( "Error occurred when selecting message queue" , e);

            }

 

            return tpInfo.selectOneMessageQueue();

        }

 

        //默认轮询

        return tpInfo.selectOneMessageQueue(lastBrokerName);

    }

 

    public void updateFaultItem( final String brokerName, final long currentLatency, boolean isolation) {

        if ( this .sendLatencyFaultEnable) {

            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);

            this .latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);

        }

    }

 

    private long computeNotAvailableDuration( final long currentLatency) {

        for ( int i = latencyMax.length - 1 ; i >= 0 ; i--) {

            if (currentLatency >= latencyMax[i])

                return this .notAvailableDuration[i];

        }

 

        return 0 ;

    }

}

在选择查找路由时,选择消息队列的关键步骤:

先按轮询算法选择一个消息队列 从故障列表判断该消息队列是否可用

LatencyFaultToleranceImpl中判断是否可用:

?

1

2

3

4

5

6

7

8

9

10

11

12

@Override

public boolean isAvailable( final String name) {

    final FaultItem faultItem = this .faultItemTable.get(name);

    if (faultItem != null ) {

        return faultItem.isAvailable();

    }

    return true ;

}

 

public boolean isAvailable() {

            return (System.currentTimeMillis() - startTimestamp) >= 0 ;

        }

判断是否在故障列表中,不在故障列表中代表可用。 在故障列表中判断当前时间是否大于等于故障规避的开始时间 startTimestamp

在消息发送结束后和发送出现异常时调用 updateFaultItem() 方法来更新故障列表, computeNotAvailableDuration() 根据响应时间来计算故障周期时长,响应时间越长故障周期越长。网络异常、Broker异常、客户端异常都是固定响应时长30s,它们故障周期时长为10分钟。消息发送成功或线程中断异常响应时间在100毫秒以内,故障周期时长为0。

LatencyFaultToleranceImpl类的updateFaultItem方法:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

@Override

public void updateFaultItem( final String name, final long currentLatency, final long notAvailableDuration) {

    FaultItem old = this .faultItemTable.get(name);

    if ( null == old) {

        final FaultItem faultItem = new FaultItem(name);

        faultItem.setCurrentLatency(currentLatency);

        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

 

        //加入故障列表

        old = this .faultItemTable.putIfAbsent(name, faultItem);

        if (old != null ) {

            old.setCurrentLatency(currentLatency);

            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        }

    } else {

        old.setCurrentLatency(currentLatency);

        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

    }

}

FaultItem 存储Broker名称、响应时长、故障规避开始时间,最重要的是故障规避开始时间,用来判断Queue是否可用

到此这篇关于RocketMQ设计之故障规避机制的文章就介绍到这了,更多相关RocketMQ故障规避机制内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.51cto测试数据/u_15460453/5075811

查看更多关于RocketMQ设计之故障规避机制的详细内容...

  阅读:13次