好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

RocketMQ源码分析之Broker过期消息清理机制

前言

前面文章讲了消息是如何保存的以及consumeQueue与Index文件更新机制。随着消息的增加,Broker不可能一直保存所有消息,Broker是按照什么规则清理消息的呢?被消费过后的消息就会被清理掉吗?下面我们来介绍Broker消息清理机制。

Broker消息清理机制简介

消息是被顺序存储在CommitLog文件中的,且消息长度不定长,因此消息的清理不是以消息为单位进行的,而是以CommitLog为单位进行的。默认情况下,Broker会清理单个CommitLog文件中最后一条消息超过 72小时的CommitLog文件 ,除了用户手动清理为,下面几种情况会被默认清理。

CommitLog清理机制

CommitLog文件过期(72小时),且达到清理时间点( 默认为04:00~05:00 ),自动清理过期的CommitLog文件

CommitLog文件过期(72小时),且CommitLog所在磁盘分区占用率已经达到 过期清理警戒线 (默认75%),无论是否到达清理时间点都会自动清理过期文件

CommitLog所在磁盘分区占用率已经达到 清理警戒线 (默认85%),无论是否过期,都会从最早的文件开始清理,一次最多清理10个文件

CommitLog所在磁盘分区占用率已经达到 系统危险警戒线 (默认90%),Broker将拒绝消息写入

Broker至少会保留最新的CommitLog文件

ConsumeQueue清理机制

如果ConsumeQueue文件关联CommitLog都被清理,则清理此ConsumeQueue文件 Broker每个Topic-QueueId至少会保留最新的文件

IndexFile清理机制

如果IndexFile所有索引单元关联CommitLog都被清理,则清理此IndexFile

Broker与消息清理相关配置

?

1

2

3

4

5

6

7

8

# 文件自动清理时间,单位H,默认 72

fileReservedTime= 72

# CommitLog物理文件删除间隔,但是ms,默认 100

deleteCommitLogFilesInterval = 100

# 文件自动清理时间,默认 04 ,即凌晨 4 点

deleteWhen = "04"

# 硬盘占用率所在分区过期清理警戒线,超过这个值,无论是否到达清理时间,都会自动清理过期文件

diskMaxUsedSpaceRatio = 75

消息清理机制源码分析

消息定时清理的是由DefaultMessageStore类负责的,它在启动时(start)会调用 DefaultMessageStore#addScheduleTask 添加和消息存储相关的定时任务,其中就包括消息删除相关的定时任务 DefaultMessageStore.this.cleanFilesPeriodically() ,这个定时任务在 Broker启动后60s开始,每隔10秒执行一次 。

?

1

2

3

4

5

6

7

8

9

10

11

// org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask

private void addScheduleTask() {

     this .scheduledExecutorService.scheduleAtFixedRate( new Runnable() {

         @Override

         public void run() {

             // commitLog、consumeQueue和IndexFile定时删除

             DefaultMessageStore. this .cleanFilesPeriodically();

         }

     }, 1000 * 60 , this .messageStoreConfig.getCleanResourceInterval() /*10s*/ , TimeUnit.MILLISECONDS);

     // ...

}

在cleanFilesPeriodically()中有两个方法, cleanCommitLogService.run() 负责清理CommitLog, cleanConsumeQueueService.run() 负责清理ConsumeQueue和IndexFile。

?

1

2

3

4

5

6

7

// org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically

private void cleanFilesPeriodically() {

     // 清理CommitLog

     this .cleanCommitLogService.run();

     // 清理ConsumeQueue和IndexFile

     this .cleanConsumeQueueService.run();

}

CommitLog清理源码分析

CommitLog清理方法 CleanCommitLogService#run 调用了 CleanCommitLogService#deleteExpiredFiles ,deleteExpiredFiles方法的核心代码逻辑如下,以下三种情况会触发CommitLog文件的删除

当前时间是凌晨4点 CommitLog所在磁盘分区硬盘占用率超过75% 手动删除CommitLog

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

// org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles

private void deleteExpiredFiles() {

     // 是否是凌晨4点,用小时匹配[04:00,05:00)

     boolean timeup = this .isTimeToDelete();

     // >75%就会返回true,如果大于85%,则触发强制删除

     boolean spacefull = this .isSpaceToDelete();

     // 手动删除次数是否>0

     boolean manualDelete = this .manualDeleteFileSeveralTimes > 0 ;

     if (timeup /*凌晨4点*/ || spacefull /*空间满了*/ || manualDelete /*手动删除*/ ) {

         boolean cleanAtOnce = DefaultMessageStore. this .getMessageStoreConfig().isCleanFileForciblyEnable() /*默认true*/ && this .cleanImmediately /*空间占用超过85%,触发强制删除*/ ;

                 // 删除CommitLog

         deleteCount = DefaultMessageStore. this 测试数据mitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,

             destroyMapedFileIntervalForcibly, cleanAtOnce);

         if (deleteCount > 0 ) {

         } else if (spacefull) {

             log.warn( "disk space will be full soon, but delete file failed." );

         }

     }

}

CommitLog的清理逻辑在 MappedFileQueue#deleteExpiredFileByTime ,其核心代码如下所示,主要分为下面几个步骤

复制MappedFileQueue中的mappedFiles,循环处理删除逻辑,循环 mfsLength-1 次,也就是无论如何都会保留最新的MappedFile 如果MappedFile的最后修改时间超过72小时或CommitLog所在磁盘分区硬盘占用率超过85%触发强制删除MappedFile,则会删除MappedFile,每次删除最多删除10个MappedFile,相邻MappedFile删除时间间隔默认100ms 删除MappedFileQueue的mappedFiles数组中已删除的MappedFile,并返回删除MappedFile的数量

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByTime

public int deleteExpiredFileByTime( final long expiredTime, final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately) {

     // 复制一份当前mappedFile

     Object[] mfs = this .copyMappedFiles( 0 );

         // 会保留最后一个MappedFile

     int mfsLength = mfs.length - 1 ;

     int deleteCount = 0 ;

     List<MappedFile> files = new ArrayList<MappedFile>();

     if ( null != mfs) {

         for ( int i = 0 ; i < mfsLength; i++) {

             MappedFile mappedFile = (MappedFile) mfs[i];

             // 最后修改时间+过期时间

             long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;

             // 如果commitLog所在磁盘分区总容量超过85%,触发立即删除,或者超过了72小时的mappedFile

             if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {

                 // 删除mappedFile

                 if (mappedFile.destroy(intervalForcibly)) {

                     files.add(mappedFile);

                     deleteCount++;

                     // 一次最多删除10个mappedFile

                     if (files.size() >= DELETE_FILES_BATCH_MAX /*10*/ ) {

                         break ;

                     }

                     if (deleteFilesInterval > 0 && (i + 1 ) < mfsLength) {

                         try {

                             // 删除文件时间间隔,默认100ms

                             Thread.sleep(deleteFilesInterval);

                         } catch (InterruptedException e) {

                         }

                     }

                 } else {

                     break ;

                 }

             } else {

                 //avoid deleting files in the middle

                 break ;

             }

         }

     }

     // 从MappedFileQueue的mappedFiles中删除这个mappedFile

     deleteExpiredFile(files);

     return deleteCount;

}

ConsumeQueue和IndexFile清理源码分析

ConsumeQueue和IndexFile清理方法 CleanConsumeQueueService#run 调用了 CleanConsumeQueueService#deleteExpiredFiles 方法清理ConsumeQueue和IndexFile。 CleanConsumeQueueService#deleteExpiredFiles 核心代码如下,包括两个主要逻辑

遍历consumeQueueTable中的ConsumeQueue,调用 ConsumeQueue#deleteExpiredFile 删除过期ConsumeQueue 调用 IndexService#deleteExpiredFile 删除过期IndexFile

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

// org.apache.rocketmq.store.DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles

private void deleteExpiredFiles() {

     if (minOffset > this .lastPhysicalMinOffset) {

         ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore. this .consumeQueueTable;

                 // 遍历ConsumeQueue

         for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {

             for (ConsumeQueue logic : maps.values()) {

                 // 删除consumeQueue

                 int deleteCount = logic.deleteExpiredFile(minOffset);

                                 // ... 间隔100ms

             }

         }

                 // 删除indexFile

         DefaultMessageStore. this .indexService.deleteExpiredFile(minOffset);

     }

}

ConsumeQueue文件清理

ConsumeQueue文件底层也是MappedFile,清理ConsumeQueue调用 MappedFileQueue#deleteExpiredFileByOffset 清理ConsumeQueue的过期MappedFile,源码如下,核心逻辑

复制MappedFileQueue中的mappedFiles,循环处理清理逻辑,循环 mfsLength-1 次,也就是无论如何都会保留最新的MappedFile 如果ConsumeQueue的MappedFile最后一个存储单元对应消息在CommitLog中的偏移量小于CommitLog的最小偏移量,说明当前MappedFile所有存储单元对应所有CommitLog的消息都已经被清理,因此调用 MappedFile#destroy 清理当前MappedFile 删除MappedFileQueue缓存的mappedFiles列表中已经被清理MappedFile

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByOffset

public int deleteExpiredFileByOffset( long offset, int unitSize) {

     // 复制一份mappedFiles

     Object[] mfs = this .copyMappedFiles( 0 );

     List<MappedFile> files = new ArrayList<MappedFile>();

     int deleteCount = 0 ;

     if ( null != mfs) {

         int mfsLength = mfs.length - 1 ;

         for ( int i = 0 ; i < mfsLength; i++) {

             boolean destroy;

             MappedFile mappedFile = (MappedFile) mfs[i];

             // 取consumeQueue最后一条消息Buffer切片

             SelectMappedBufferResult result = mappedFile.selectMappedBuffer( this .mappedFileSize - unitSize);

             if (result != null ) {

                 // consumeQueue最后一个存储单元消息在commitLog的偏移量

                 long maxOffsetInLogicQueue = result.getByteBuffer().getLong();

                 result.release();

                 // 如果consumeQueue最后一条消息已经小于commitLog的最小offset,则说明要删除了

                 destroy = maxOffsetInLogicQueue < offset;

                 if (destroy) {

                     log.info( "physic min offset " + offset + ", logics in current mappedFile max offset "

                         + maxOffsetInLogicQueue + ", delete it" );

                 }

             }

             // 删除ConsumeQueue的MappedFile

             if (destroy && mappedFile.destroy( 1000 * 60 )) {

                 files.add(mappedFile);

                 deleteCount++;

             } else {

                 break ;

             }

         }

     }

     // 删除MappedFileQueue的mappedFiles列表中已经删除的MappedFile

     deleteExpiredFile(files);

     return deleteCount;

}

IndexFile清理

IndexFile清理逻辑与ConsumeQueue类似,都是删除文件中关联的CommitLog消息全部被删除的文件。核心逻辑包括下面两个

获取IndexFileList中所有最大offset小于CommitLog最小offset的IndexFile 删除过期的IndexFile

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

// org.apache.rocketmq.store.index.IndexService#deleteExpiredFile(long)

public void deleteExpiredFile( long offset) {

     Object[] files = null ;

     try {

         // indexFileList的第一个索引文件的最后一个offset

         long endPhyOffset = this .indexFileList.get( 0 ).getEndPhyOffset();

         if (endPhyOffset < offset) {

             files = this .indexFileList.toArray();

         }

     }

     if (files != null ) {

         List<IndexFile> fileList = new ArrayList<IndexFile>();

         for ( int i = 0 ; i < (files.length - 1 ); i++) {

             IndexFile f = (IndexFile) files[i];

             // IndexFile中最大的offset小于CommitLog最小offset,说明文件可以被删除

             if (f.getEndPhyOffset() < offset) {

                 fileList.add(f);

             } else {

                 break ;

             }

         }

         // 删除过期的IndexFile,并将其从indexFileList缓存中删除

         this .deleteExpiredFile(fileList);

     }

}

总结

Broker消息清理机制由DefaultMessageStore负责,CommitLog、ConsumeQueue和IndexFile的清理都是按照文件颗粒度进行。

每10s检查一次,通常情况下每天凌晨4点删除超过72小时的CommitLog;如果CommitLog所在磁盘分区的磁盘占用率超过75%,则会触发CommitLog文件清理;如果CommitLog所在磁盘分区的磁盘占用率超过85%,则会强制删除CommitLog文件;

如果ConsumeQueue和IndexFile关联CommitLog都被删除,ConsumeQueue文件和IndexFile也会被清理。

以上就是RocketMQ | 源码分析】Broker过期消息清理机制的详细内容,更多关于RocketMQ | 源码分析】Broker过期消息清理机制的资料请关注其它相关文章!

原文链接:https://juejin.cn/post/7229799197096149049

查看更多关于RocketMQ源码分析之Broker过期消息清理机制的详细内容...

  阅读:17次