好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

RocketMQ之Consumer整体介绍启动源码分析

前言

从本篇文章开始,我们将逐步开始分析Consumer的源码,首先我们将整体介绍Consumer的接口和相关实现类以及DefaultMQPushConsumer的主要API和关键属性,然后我们将分析Consumer的启动过程源码,通过对启动过程的分析,之前我们分析过Producer和Broker的启动源码,Consumer的启动源码与Producer还是有很多相似的地方。

Consumer整体介绍

Consumer实现类

RocketMQ给我们提供的Consumer实现类如下图所示,包括推送式的 DefaultMQPushConsumer 和拉取式的 DefaultMQPullConsumer 、 DefaultLitePullConsumer ,从图中可以看到 DefaultMQPullConsumer 已经被标注为deprecated,如果需要使用拉取式的Consumer,官方推荐使用DefaultLitePullConsumer。

Consumer消费类型

拉取式消费

Consumer主动从Broker拉去消息,消费消息的主动权由Consumer控制。一旦获取了批量消息,就会启动消费过程。不过这种方式实时性较弱,即Broker中有了新的消息时消费者并不能及时发现并消费。

推送式消费

该模式下Broker收到数据后会主动推送给Consumer,这种方式一般实时性比较高。

RocketMQ官方更推荐我们在日常工作中使用 DefaultMQPushConsumer ,它已经能够满足我们大多数使用场景。从技术上讲,这个 DefaultMQPushConsumer 客户端 实际上是底层拉取服务的包装器 。当从代理中提取的消息到达时,它大致调用注册的回调处理程序来馈送消息。本篇文章,我们将介绍 DefaultMQPushConsumer 的启动流程

DefaultMQPushConsumer主要API

DefaultMQPushConsumer实现了MQConsumer和MQPushConsumer接口,DefaultMQPushConsumer的主要API都在这两个接口中定义了,如下所示

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

// org.apache.rocketmq.client.consumer.MQConsumer

public interface MQConsumer extends MQAdmin {

     // 如果消费失败,消息将被发送回代理,并延迟消耗一些时间

     void sendMessageBack( final MessageExt msg /*消息*/ , final int delayLevel /*延迟级别*/ , final String brokerName); 

     // 根据topic从使用者缓存中获取消息队列

     Set<MessageQueue> fetchSubscribeMessageQueues( final String topic) throws MQClientException;

}

// org.apache.rocketmq.client.consumer.MQPushConsumer

public interface MQPushConsumer extends MQConsumer {

     // 启动Consumer

     void start() throws MQClientException;

     // 关闭Consumer

     void shutdown();

     // 注册并发消息Listener

     void registerMessageListener( final MessageListenerConcurrently messageListener);

     // 注册顺序消息Listener,将会有序地接收消息。一个队列一个线程

     void registerMessageListener( final MessageListenerOrderly messageListener);

     // 订阅Topic

     void subscribe( final String topic, final String subExpression) throws MQClientException;

     // 退订topic

     void unsubscribe( final String topic);

}

DefaultMQPushConsumer关键属性

DefaultMQPushConsumer的关键属性如下所示

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

     // DefaultMQPushConsumer的默认实现,DefaultMQPushConsumer中大部分功能都是对它的代理

     protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

     // 相同角色的消费者需要具有完全相同的subscriptions和consumerGroup才能正确实现负载平衡,它需要全局唯一

     private String consumerGroup;

     // 消息模型定义了如何将消息传递到每个消费者客户端的方式,默认是集群模式

     private MessageModel messageModel = MessageModel.CLUSTERING;

     // 第一次消费时指定的消费策略,默认是CONSUME_FROM_LAST_OFFSET

     private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

     // 队列分配算法,指定如何将消息队列分配给每个使用者客户端。

     private AllocateMessageQueueStrategy allocateMessageQueueStrategy;

     // 订阅关系

     private Map<String /* topic */ , String /* sub expression */ > subscription = new HashMap<String, String>();

     // 消息监听器

     private MessageListener messageListener;

     // 消息消费进度存储器

     private OffsetStore offsetStore;

     // 最小消费线程数

     private int consumeThreadMin = 20 ;

     // 最大消费线程数

     private int consumeThreadMax = 20 ;

     // 推送模式下拉去消息的间隔时间,默认一次拉取消息完成后立刻继续拉取

     private long pullInterval = 0 ;

     // 批量消费数量

     private int consumeMessageBatchMaxSize = 1 ;

     // 批量拉取的数量

     private int pullBatchSize = 32 ;

     // 每次拉取时是否更新订阅关系,默认是false

     private boolean postSubscriptionWhenPull = false ;

     // 消息最大重试次数,如果消息消费最大次数超过maxReconsumeTimes还未成功,则消息将被转移到一个失败队列

     private int maxReconsumeTimes = - 1 ;

     //延迟将该队列的消息提交到消费者线程的等待时间,默认延迟1s

     private long suspendCurrentQueueTimeMillis = 1000 ;

     // 消息阻塞消费线程的最大超时时间,默认15分钟

     private long consumeTimeout = 15 ;

     // 关闭使用者时等待消息的最长时间,0表示没有等待。

     private long awaitTerminationMillisWhenShutdown = 0 ;

}

Consumer消费模式

Consumer提供下面两种消费模式,由上面DefaultMQPushConsumer的messageModel定义

广播模式(BROADCASTING)

广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条消息会被相同Consumer Group中的所有Consumer消费

集群模式(CLUSTERING)

集群模式是Consumer默认的消费模式,集群消费模式下,相同Consumer Group的每个Consumer按照负载均衡策略分摊同一个Topic消息,即每条消息只会被相同Consumer Group中的一个Consumer消费

Consumer消费策略

Consumer主要提了下面三种消费策略

CONSUME_FROM_LAST_OFFSET

这是Consumer默认的消费策略,它分为两种情况,如果Broker的磁盘消息未过期且未被删除,则从最小偏移量开始消费。如果磁盘已过期,并被删除,则从最大偏移量开始消费。

CONSUME_FROM_FIRST_OFFSET

从最早可用的消息开始消费

CONSUME_FROM_TIMESTAMP

从指定的时间戳开始消费,这意味着在 consumeTimestamp 之前生成的消息将被忽略

Consumer使用

要使用Consumer开始消费消息,至少需要下面5个步骤

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

public static void main(String[] args) throws MQClientException {

     // 1. 传入CONSUMER_GROUP,创建DefaultMQPushConsumer

     DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);

     // 2. 设置namesrvAddr

     consumer.setNamesrvAddr( "127.0.0.1:9876" );

     // 3. 订阅Topic

     consumer.subscribe(TOPIC, "*" );

     // 4.注册消息Listener

     consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -&gt; {

         System.out.printf( "%s Receive New Messages: %s %n" , Thread.currentThread().getName(), msg);

         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

     });

     // 5.启动Consumer

     consumer.start();

}

DefaultMQPushConsumer源码分析

启动源码分析

DefaultMQPushConsumer只是设置属性,Consumer的初始化实际是在 DefaultMQPushConsumer#start 中执行的, DefaultMQPushConsumer#start 实际调用了 DefaultMQPushConsumerImpl#start 执行初始化。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start

public void start() throws MQClientException {

     // consumerGroup封装namespace

     setConsumerGroup(NamespaceUtil.wrapNamespace( this .getNamespace(), this .consumerGroup));

     // DefaultMQPushConsumerImpl启动

     this .defaultMQPushConsumerImpl.start();

     // 消息轨迹跟踪服务,默认null

     if ( null != traceDispatcher) {

         try {

             traceDispatcher.start( this .getNamesrvAddr(), this .getAccessChannel());

         } catch (MQClientException e) {

             log.warn( "trace dispatcher start failed " , e);

         }

     }

}

下面我们来分步骤分析 DefaultMQPushConsumerImpl#start 代码

第一步:

先将Consumer的状态更新为START_FAILED 校验Consumer的配置。主要校验ConsumerGroup, 消费模式校验(MessageModel),消费开始位置(ConsumeFromWhere),消费时间戳(默认是半小时之前),队列分配策略(默认是AllocateMessageQueueAveragely),订阅Topic和Subscription关系校验,消息监听器(MessageListener)校验等。 将Consumer中的订阅关系拷贝到RebalanceImpl中,Consumer中订阅关系的来源主要包括 DefaultMQPushConsumerImpl#subscribe 方法获取,也会订阅重试topic,其主题名为 %RETRY%+消费者组名 ,消费者启动时会自动订阅该主题 如果是集群模式,则修改消费者名称为 PID#时间戳

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

public synchronized void start() throws MQClientException {

   //...

   // 状态先设置为启动失败

   this .serviceState = ServiceState.START_FAILED;

   // 校验配置,ConsumerGroup校验,

   this .checkConfig();

   // 订阅关系copy到RebalanceImpl中

   this .copySubscription();

   // 如果是集群模式,消费者名称如果是DEFAULT,则会改成:PID#时间戳

   if ( this .defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {

       this .defaultMQPushConsumer.changeInstanceNameToPID();

   }

   //...

}

第二步:

主要是初始化MQClientInstance、RebalanceImpl和pullAPIWrapper。

**MQClientInstance:**是消息拉取服务,主要用于拉取消息,同一个进程内的所有Consumer会使用同一个MQClientInstance

**RebalanceImpl:**是消费者负载均衡服务,用于确定消费者消费的消息队列以及负载均衡。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

// 生成一个MQClientInstance

this .mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance( this .defaultMQPushConsumer, this .rpcHook);

// 设置消费者组

this .rebalanceImpl.setConsumerGroup( this .defaultMQPushConsumer.getConsumerGroup());

// 消息消费模式

this .rebalanceImpl.setMessageModel( this .defaultMQPushConsumer.getMessageModel());

// 设置消息消费模式

this .rebalanceImpl.setAllocateMessageQueueStrategy( this .defaultMQPushConsumer.getAllocateMessageQueueStrategy());

// 设置MQClientInstance

this .rebalanceImpl.setmQClientFactory( this .mQClientFactory);

// 构建拉消息包装器

this .pullAPIWrapper = new PullAPIWrapper(

     mQClientFactory,

     this .defaultMQPushConsumer.getConsumerGroup(), isUnitMode());

this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

第三步:

根据消息消费模式的不同设置不同的消息消费进度存储器(OffsetStore),如果是 广播模式 ,则使用LocalFileOffsetStore作为消息进度存储器,如果是 集群模式 则使用RemoteBrokerOffsetStore作为消息进度存储器。创建完成之后调用load()方法加载偏移量,如果是LocalFileOffsetStore将会从本地加载。

广播模式下:LocalFileOffsetStore将消费进度存储在Consumer本地的 ${user.home}/.rocketmq_offsets/clientId/consumerGroup/offsets.json 文件中

集群模式下:RemoteBrokerOffsetStore将消费进度存储在Broker

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

if ( this .defaultMQPushConsumer.getOffsetStore() != null ) {

     this .offsetStore = this .defaultMQPushConsumer.getOffsetStore();

} else {

     switch ( this .defaultMQPushConsumer.getMessageModel()) {

         case BROADCASTING:

             // 如果是广播模式,则使用LocalFileOffsetStore存储偏移量

             this .offsetStore = new LocalFileOffsetStore( this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup());

             break ;

         case CLUSTERING:

             // 如果是集群模式,则使用RemoteBrokerOffsetStore存储偏移量

             this .offsetStore = new RemoteBrokerOffsetStore( this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup());

             break ;

         default :

             break ;

     }

     this .defaultMQPushConsumer.setOffsetStore( this .offsetStore);

}

// 如果是广播模式,则从本地文件load偏移量,如果是集群模式则是一个空实现

this .offsetStore.load();

第四步:

根据消息监听器的类型不同创建不同的消息消费服务(并发/顺序消息消费服务),并启动。然后注册消费者组和消费者信息到MQClientInstance中的consumerTable中,注册成功后启动MQClientInstance客户端通信实例。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

// 如果是顺序消费

if ( this .getMessageListenerInner() instanceof MessageListenerOrderly) {

     this .consumeOrderly = true ;

     this .consumeMessageService =

         new ConsumeMessageOrderlyService( this , (MessageListenerOrderly) this .getMessageListenerInner());

// 如果是并发消费

} else if ( this .getMessageListenerInner() instanceof MessageListenerConcurrently) {

     this .consumeOrderly = false ;

     this .consumeMessageService =

         new ConsumeMessageConcurrentlyService( this , (MessageListenerConcurrently) this .getMessageListenerInner());

}

this .consumeMessageService.start();

// 将自身注册到MQClientInstance

boolean registerOK = mQClientFactory.registerConsumer( this .defaultMQPushConsumer.getConsumerGroup(), this );

// ...

mQClientFactory.start();

第五步:

?

1

2

3

4

5

6

7

8

9

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

// 向Namesrv拉取并更新当前消费者订阅topic路由信息

this .updateTopicSubscribeInfoWhenSubscriptionChanged();

// 随机选择一个Broker,发送检查客户端tag配置的请求,主要是检测Broker是否支持SQL92类型的tag过滤以及SQL92的tag语法是否正确

this .mQClientFactory.checkClientInBroker();

// 给所有Broker发送心跳

this .mQClientFactory.sendHeartbeatToAllBrokerWithLock();

// 唤醒负载均衡服务rebalanceService,并进行rebalance

this .mQClientFactory.rebalanceImmediately();

总结

本篇文章我们介绍了Consumer的API,属性,接口和实现类,通过对这几部分的了解,我们能够对Consumer有一个整体的认识。我们还分析了DefaultMQPushConsumer的启动的源码,通过对 DefaultMQPushConsumer#start 开始逐渐深入分析DefaultMQPushConsumer的启动过程,能够帮助我们对Consumer消费消息一些关键的类如MQClientInstance,OffsetStore,RebalanceImpl,ConsumeMessageService由一个初步的认识,由助于我们后续详细了解这些服务的工作原理。

以上就是RocketMQ 源码分析之Consumer整体介绍启动分析的详细内容,更多关于RocketMQ Consumer源码解析的资料请关注其它相关文章!

原文链接:https://juejin.cn/post/7230558835228278843

查看更多关于RocketMQ之Consumer整体介绍启动源码分析的详细内容...

  阅读:16次