好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

RocketMQ消息存储文件的加载与恢复机制源码分析

前言

前面文章我们介绍了Broker是如何将消息全量存储到CommitLog文件中,并异步生成dispatchRequest任务更新ConsumeQueue,IndexFile的过程以及ConsumeQueue和IndexFile的文件结构。由于是异步转发消息,就可能出现消息成功存储到CommitLog文件,转发请求任务执行失败,Broker宕机了,此时CommitLog和Index消息并未处理完,导致CommitLog与ConsumeQueue和IndexFile文件中的数据不一致。如果由一部分消息在CommitLog中存在,在ConsumeQueue中不存在,那么这部分消息Consumer将永远无法消费到了,那么Broker是如何保证数据一致性的呢?

StoreCheckPoint介绍

StoreCheckPoint的作用是记录CommitLog,ConsumeQueue和IndexFile的刷盘点,当Broker异常结束时会根据StoreCheckPoint的数据恢复,StoreCheckPoint属性如下

?

1

2

3

4

5

6

7

8

public class StoreCheckpoint {

     // commitLog最后一条信息的刷盘时间戳

     private volatile long physicMsgTimestamp = 0 ;

     // consumeQueue最后一个存储单元刷盘时间戳

     private volatile long logicsMsgTimestamp = 0 ;

     // 最近一个已经写完IndexFile的最后一条记录刷盘时间戳

     private volatile long indexMsgTimestamp = 0 ;

}

StoreCheckPoint文件的存储位置是 ${user.home}/store/checkpoint ,文件的固定长度为4K,但StoreCheckPoint只占用了前24个字节,存储格式如下图所示

StoreCheckPoint时间戳更新时机

physicMsgTimestamp

FlushRealTimeService刷盘时更新

?

1

2

3

4

5

6

7

// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run

public void run() {

   // ...

   // 更新CommitLog刷盘时间戳

   if (storeTimestamp > 0 ) {                         CommitLog. this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

   }

}

GroupCommitService刷盘时更新

?

1

2

3

4

5

6

7

8

// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit

private void doCommit() {

   // ...

   // 更新CommitLog刷盘时间戳

   if (storeTimestamp > 0 ) {

CommitLog. this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

   }

}

logicsMsgTimestamp

ConsumeQueue保存消息存储单元时更新

?

1

2

3

4

5

6

7

8

// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper

public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {

   // ...

   // 如果consumeQueue保存成功,则更新ConsumeQueue存储点信息

   if (result) {

this .defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());

   }

}

ConsumeQueue刷盘时更新并触发StoreCheckPoint刷盘

?

1

2

3

4

5

6

7

8

9

10

11

12

// org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush

private void doFlush( int retryTimes) {

   // ...

   // 更新ConsumeQueue存储时间戳,并刷盘

   if ( 0 == flushConsumeQueueLeastPages) {

     if (logicsMsgTimestamp > 0 ) {

   DefaultMessageStore. this .getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);

     }

     // 更新存储点

     DefaultMessageStore. this .getStoreCheckpoint().flush();

   }

}

indexMsgTimestamp

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

// org.apache.rocketmq.store.index.IndexService#getAndCreateLastIndexFile

public IndexFile getAndCreateLastIndexFile() {

   // 获取最新IndexFile,如果IndexFile已经满了,需要创建一个新的IndexFile

   if (indexFile == null ) {

           indexFile =

               new IndexFile(fileName, this .hashSlotNum, this .indexNum, lastUpdateEndPhyOffset,

                   lastUpdateIndexTimestamp);

             // 如果创建新的IndexFile成功,原IndexFile刷盘

       if (indexFile != null ) {

           final IndexFile flushThisFile = prevIndexFile;

           Thread flushThread = new Thread( new Runnable() {

               @Override

               public void run() {

                     // indexFile刷盘

                   IndexService. this .flush(flushThisFile);

               }

           }, "FlushIndexFileThread" );

           flushThread.setDaemon( true );

           flushThread.start();

       }

   }

   return indexFile;

}

// org.apache.rocketmq.store.index.IndexService#flush

public void flush( final IndexFile f) {

     if ( null == f)

         return ;

     long indexMsgTimestamp = 0 ;

     if (f.isWriteFull()) {

         indexMsgTimestamp = f.getEndTimestamp();

     }

     f.flush();

     if (indexMsgTimestamp > 0 ) {

         // 更新checkPoint的indexMsgTimestamp并触发刷盘

         this .defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp);

         this .defaultMessageStore.getStoreCheckpoint().flush();

     }

}

保存消息Index,获取最新的IndexFile如果满了,则会创建一个新的IndexFile,并且更新IndexMsgTimestamp并触发StoreCheckPoint刷盘

StoreCheckPoint刷盘源码

StoreCheckPoint刷盘源码如下所示,就是将CommitLog,ConsumeQueue和IndexFile刷盘时间戳持久化到硬盘上,由上面源码可知它的刷盘触发时机

ConsumeQueue刷盘时触发 创建新IndexFile文件时触发

StoreCheckPoint刷盘源码如下

?

1

2

3

4

5

6

7

// org.apache.rocketmq.store.StoreCheckpoint#flush

public void flush() {

     this .mappedByteBuffer.putLong( 0 , this .physicMsgTimestamp);

     this .mappedByteBuffer.putLong( 8 , this .logicsMsgTimestamp);

     this .mappedByteBuffer.putLong( 16 , this .indexMsgTimestamp);

     this .mappedByteBuffer.force();

}

消息加载源码分析

在BrokerController启动时会调用 DefaultMessageStore#load 加载存储文件加载和恢复过程主要分为下面几步

判断Broker上次是否正常退出。这个判断逻辑是根据 ${user.home}/store/abort 是否存在。如果文件存在,说明上次是异常退出,如果文件不存在,则说明是正常退出。 加载CommitLog 加载ConsumeQueue 加载StoreCheckPoint 加载IndexFile 恢复ConsumeQueue与IndexFile 加载延迟队列服务

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

// org.apache.rocketmq.store.DefaultMessageStore#load

public boolean load() {

     boolean result = true ;

     try {

         // 1. Broker上次是否正常退出   

         boolean lastExitOK = ! this .isTempFileExist();

         log.info( "last shutdown {}" , lastExitOK ? "normally" : "abnormally" );

         // 2. 加载commitLog

         result = result && this 测试数据mitLog.load();

                 // 3. 加载consumeQueue

         result = result && this .loadConsumeQueue();

         if (result) {

             // 4. 加载StoreCheckPoint

             this .storeCheckpoint =

                 new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint( this .messageStoreConfig.getStorePathRootDir()));

             // 5. 加载IndexFile

             this .indexService.load(lastExitOK);

             // 6. 恢复ConsumeQueue与IndexFile

             this .recover(lastExitOK);

                         // 7. 延迟队列服务加载

             if ( null != scheduleMessageService) {

                 result =  this .scheduleMessageService.load();

             }

         }

     }

     return result;

}

CommitLog加载

前面文章介绍过,CommitLog文件的存储目录是 ${user.home}/store/commitlog/ ,并且CommitLog文件的底层是MappedFile,由MappedFileQueue管理。

CommitLog文件的加载其实调用的是 MappedFileQueue#load 方法,代码如下所示,load()中首先加载CommitLog文件目录下的所有文件,并调用doLoad()方法加载CommitLog。

?

1

2

3

4

5

6

7

8

9

// org.apache.rocketmq.store.MappedFileQueue#load

public boolean load() {

     File dir = new File( this .storePath /*${user.home}/store/commitlog/*/ );

     File[] ls = dir.listFiles();

     if (ls != null ) {

         return doLoad(Arrays.asList(ls));

     }

     return true ;

}

MappedFile的加载过程如下所示,核心逻辑主要分为下面三步

按照文件名称将文件排序,排序好的文件就会按照消息保存的先后顺序存放在列表中 校验文件大小与mappedFile是否一致,如果commitLog文件大小与mappedFileSize不一致,则说明配置被改了,或者CommitLog文件被修改 创建mappedFile,并且设置wrotePosition,flushedPosition,committedPosition为mappedFileSize

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

public boolean doLoad(List<File> files) {

     // 按照文件名称排序

     files.sort(Comparator测试数据paring(File::getName));

     for (File file : files) {

         // 如果commitLog文件大小与mappedFileSize不一致,则说明配置被改了,或者CommitLog文件被修改

         if (file.length() != this .mappedFileSize) {

             return false ;

         }

         try {

             // 创建MappedFile

             MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

             mappedFile.setWrotePosition( this .mappedFileSize);

             mappedFile.setFlushedPosition( this .mappedFileSize);

             mappedFile.setCommittedPosition( this .mappedFileSize);

             this .mappedFiles.add(mappedFile);

         }

     }

     return true ;

}

看到这里肯定会有疑问,加载后的MappedFile的wrotePosition,flushedPosition和committedPosition的值都为mappedFileSize,如果最后一个MappedFile没有使用完,Broker启动后还会从最后一个MappedFile开始写么?我们可以在后面消息文件恢复源码分析找到答案。

ConsumeQueue加载

从前面文章我们知道,ConsumeQueue文件底层其实也是MappedFile,因此ConsumeQueue文件的加载与CommitLog加载差别不大。ConsumeQueue加载逻辑为

获取ConsumeQueue目录下存储的所有Topic目录,遍历Topic目录 遍历每个Topic目录下的所有queueId目录,逐个加载ququeId中的所有MappedFile

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

// org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue

private boolean loadConsumeQueue() {

   // 获取consumeQueue目录

   File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue( this .messageStoreConfig.getStorePathRootDir() /*${user.home}/store */ ));

   // topic文件夹数组

   File[] fileTopicList = dirLogic.listFiles();

   if (fileTopicList != null ) {

       // 遍历topic   

       for (File fileTopic : fileTopicList) {

           // 获取topic名称

           String topic = fileTopic.getName();

           // 获取queueId文件夹数组

           File[] fileQueueIdList = fileTopic.listFiles();

           // 遍历queueId

           if (fileQueueIdList != null ) {

               for (File fileQueueId : fileQueueIdList) {

                   int queueId;

                   // 文件夹名称就是queueId

                   queueId = Integer.parseInt(fileQueueId.getName());

                   // 构建consumeQueue

                   ConsumeQueue logic = new ConsumeQueue( /* ... */ );

                   this .putConsumeQueue(topic, queueId, logic);

                   // ConsumeQueue加载

                   if (!logic.load()) {

                       return false ;

                   }

               }

           }

       }

   }

   return true ;

}

IndexFile加载

IndexFile文件加载过程调用的是 IndexService#load ,首先获取 ${user.home}/store/index 目录下的所有文件,遍历所有文件,如果IndexFile最后存储时间大于StoreCheckPoint中indexMsgTimestamp,则会先删除IndexFile

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

// org.apache.rocketmq.store.index.IndexService#load

public boolean load( final boolean lastExitOK) {

     // indexFile文件目录

     File dir = new File( this .storePath);

     // indexFile文件列表

     File[] files = dir.listFiles();

     if (files != null ) {

         // 文件排序

         Arrays.sort(files);

         for (File file : files) {

             try {

                 IndexFile f = new IndexFile(file.getPath(), this .hashSlotNum, this .indexNum, 0 , 0 );

                 f.load();

                 if (!lastExitOK) {

                     // 文件最后存储时间戳大于刷盘点,则摧毁indexFile,重建

                     if (f.getEndTimestamp() > this .defaultMessageStore.getStoreCheckpoint() /*存储点时间*/

                         .getIndexMsgTimestamp()) {

                         f.destroy( 0 );

                         continue ;

                     }

                 }

                 this .indexFileList.add(f);

             }

         }

     }

     return true ;

}

ConsumeQueue与IndexFile恢复

如果是正常退出,数据都已经正常刷盘,前面我们说到CommitLog在加载时的wrotePosition,flushedPosition,committedPosition都设置为mappedFileSize,

因此即使是正常退出,也会调用 CommitLog#recoverNormally 找到最后一条消息的位置,更新这三个属性。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

// org.apache.rocketmq.store.DefaultMessageStore#recover

private void recover( final boolean lastExitOK) {

     // consumeQueue中最大物理偏移量

     long maxPhyOffsetOfConsumeQueue = this .recoverConsumeQueue();

     if (lastExitOK) {

         // 正常退出文件恢复

         this 测试数据mitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);

     } else {

         // 异常退出文件恢复

         this 测试数据mitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);

     }

     // 恢复topicQueueTable

     this .recoverTopicQueueTable();

}

正常恢复的源码如下,由于Broker是正常关闭,因此CommitLog,ConsumeQueue与IndexFile都已经正确刷盘,并且三者的消息是一致的。正常恢复的主要目的是找到找到最后一条消息的偏移量,然后更新CommitLog的MappedFileQueue中的刷盘点(flushWhere)和提交点(committedWhere),

从最后3个mappedFile开始恢复,如果mappedFile总数不足3个,则从第0个mappedFile开始恢复 逐个遍历mappedFile,找到每个MappedFile的最后一条消息的偏移量,并将其更新到CommitLog中MappedFileQueue的刷盘点和提交点中 清除ConsumeQueue冗余数据

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

public void recoverNormally( long maxPhyOffsetOfConsumeQueue) {

     // 确认消息是否完整,默认是true

     boolean checkCRCOnRecover = this .defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();

     final List<MappedFile> mappedFiles = this .mappedFileQueue.getMappedFiles();

     if (!mappedFiles.isEmpty()) {

         // 默认从最后3个mappedFile开始恢复

         int index = mappedFiles.size() - 3 ;

         // 如果commitLog不足三个,则从第一个文件开始恢复

         if (index < 0 )

             index = 0 ;

         MappedFile mappedFile = mappedFiles.get(index);

         ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();

         // 最后一个MappedFile的文件起始偏移量

         long processOffset = mappedFile.getFileFromOffset();

         // mappedFileOffset偏移量

         long mappedFileOffset = 0 ;

         // 遍历CommitLog文件

         while ( true ) {

             // 校验消息完整性

             DispatchRequest dispatchRequest = this .checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);

             // 获取消息size

             int size = dispatchRequest.getMsgSize();

             // 返回结果为true并且消息size>0,说明消息是完整的

             if (dispatchRequest.isSuccess() && size > 0 ) {

                 mappedFileOffset += size;

             }

         }

         // 最大物理偏移量

         processOffset += mappedFileOffset;

         // 更新flushedWhere和committedPosition指针

         this .mappedFileQueue.setFlushedWhere(processOffset);

         this .mappedFileQueue.setCommittedWhere(processOffset);

         this .mappedFileQueue.truncateDirtyFiles(processOffset);

         // 清除ConsumeQueue冗余数据

         if (maxPhyOffsetOfConsumeQueue >= processOffset) {

             this .defaultMessageStore.truncateDirtyLogicFiles(processOffset /*CommitLog最大物理偏移量*/ );

         }

     }

}

异常恢复源码如下,由于上次Broker没有正常关闭,因此由可能存在CommitLog、ConsumeQueue与IndexFile不一致的情况,因此在异常恢复时可能需要恢复ConsumeQueue和IndexFile,异常恢复核心逻辑主要包括

倒序查CommitLog的mappedFile文件,找到第一条消息存储的时间戳比StoreCheckPoint里的physicMsgTimestamp,logicsMsgTimestamp和indexMsgTimestamp三者都小的最大MappedFile,该mappedFile至少有一部分消息是被正常转发,正常存储,正常刷盘的 从该mappedFile开始逐条转发消息,重新恢复ConsumeQueue和IndexFile 当遍历到最后一条消息,将其偏移量更新到CommitLog中MappedFileQueue的刷盘点和提交点中 清除ConsumeQueue冗余数据

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

// org.apache.rocketmq.store.CommitLog#recoverAbnormally

public void recoverAbnormally( long maxPhyOffsetOfConsumeQueue) {

     // 是否CRC校验

     boolean checkCRCOnRecover = this .defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();

     final List<MappedFile> mappedFiles = this .mappedFileQueue.getMappedFiles();

     if (!mappedFiles.isEmpty()) {

         // 最后一个mappedFile的index

         int index = mappedFiles.size() - 1 ;

         MappedFile mappedFile = null ;

         // 倒序遍历mappedFile数组,

         for (; index >= 0 ; index--) {

             mappedFile = mappedFiles.get(index);

             // 1. 如果第一条消息的时间戳小于存储点时间戳

             if ( this .isMappedFileMatchedRecover(mappedFile)) {

                 break ;

             }

         }

         long processOffset = mappedFile.getFileFromOffset();

         long mappedFileOffset = 0 ;

         while ( true ) {

             DispatchRequest dispatchRequest = this .checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);

             int size = dispatchRequest.getMsgSize();

             if (dispatchRequest.isSuccess()) {

                 if (size > 0 ) {

                     mappedFileOffset += size;

                     // 2. 转发消息

                     if ( this .defaultMessageStore.getMessageStoreConfig().isDuplicationEnable() /*消息是否可以重复,默认是false*/ ) {

                         if (dispatchRequest.getCommitLogOffset() < this .defaultMessageStore.getConfirmOffset()) {

                             this .defaultMessageStore.doDispatch(dispatchRequest);

                         }

                     } else {

                         this .defaultMessageStore.doDispatch(dispatchRequest);

                     }

                 }

         }

                 // 3. 更新MappedFileQueue中的刷盘位置和提交位置

         processOffset += mappedFileOffset;

         this .mappedFileQueue.setFlushedWhere(processOffset);

         this .mappedFileQueue.setCommittedWhere(processOffset);

         this .mappedFileQueue.truncateDirtyFiles(processOffset);

         // 清除ConsumeQueue中的冗余数据

         if (maxPhyOffsetOfConsumeQueue >= processOffset) {

             this .defaultMessageStore.truncateDirtyLogicFiles(processOffset);

         }

     }

}

总结

Broker启动时会分别加载CommitLog、ConsumeQueue与IndexFile。加载完成后,如果Broker上次是正常退出,只需要找到CommitLog的最后一条消息,并更新刷盘点和提交点。如果Broker上次是异常退出,就有可能出现ConsumeQueue、IndexFile与CommitLog不一致的情况,需要根据StoreCheckPoint存储的时间戳从CommitLog找到消息,逐条恢复ConsumeQueue与IndexFile。

以上就是RocketMQ | 源码分析】消息存储文件的加载与恢复机制的详细内容,更多关于RocketMQ 消息存储文件加载恢复的资料请关注其它相关文章!

原文链接:https://juejin.cn/post/7229324815191818298

查看更多关于RocketMQ消息存储文件的加载与恢复机制源码分析的详细内容...

  阅读:15次