前言
前面文章讲了消息是如何保存的以及consumeQueue与Index文件更新机制。随着消息的增加,Broker不可能一直保存所有消息,Broker是按照什么规则清理消息的呢?被消费过后的消息就会被清理掉吗?下面我们来介绍Broker消息清理机制。
Broker消息清理机制简介
消息是被顺序存储在CommitLog文件中的,且消息长度不定长,因此消息的清理不是以消息为单位进行的,而是以CommitLog为单位进行的。默认情况下,Broker会清理单个CommitLog文件中最后一条消息超过 72小时的CommitLog文件 ,除了用户手动清理为,下面几种情况会被默认清理。
CommitLog清理机制
CommitLog文件过期(72小时),且达到清理时间点( 默认为04:00~05:00 ),自动清理过期的CommitLog文件
CommitLog文件过期(72小时),且CommitLog所在磁盘分区占用率已经达到 过期清理警戒线 (默认75%),无论是否到达清理时间点都会自动清理过期文件
CommitLog所在磁盘分区占用率已经达到 清理警戒线 (默认85%),无论是否过期,都会从最早的文件开始清理,一次最多清理10个文件
CommitLog所在磁盘分区占用率已经达到 系统危险警戒线 (默认90%),Broker将拒绝消息写入
Broker至少会保留最新的CommitLog文件
ConsumeQueue清理机制
如果ConsumeQueue文件关联CommitLog都被清理,则清理此ConsumeQueue文件 Broker每个Topic-QueueId至少会保留最新的文件IndexFile清理机制
如果IndexFile所有索引单元关联CommitLog都被清理,则清理此IndexFile
Broker与消息清理相关配置
1 2 3 4 5 6 7 8 |
# 文件自动清理时间,单位H,默认 72 fileReservedTime= 72 # CommitLog物理文件删除间隔,但是ms,默认 100 deleteCommitLogFilesInterval = 100 # 文件自动清理时间,默认 04 ,即凌晨 4 点 deleteWhen = "04" # 硬盘占用率所在分区过期清理警戒线,超过这个值,无论是否到达清理时间,都会自动清理过期文件 diskMaxUsedSpaceRatio = 75 |
消息清理机制源码分析
消息定时清理的是由DefaultMessageStore类负责的,它在启动时(start)会调用 DefaultMessageStore#addScheduleTask 添加和消息存储相关的定时任务,其中就包括消息删除相关的定时任务 DefaultMessageStore.this.cleanFilesPeriodically() ,这个定时任务在 Broker启动后60s开始,每隔10秒执行一次 。
1 2 3 4 5 6 7 8 9 10 11 |
// org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask private void addScheduleTask() { this .scheduledExecutorService.scheduleAtFixedRate( new Runnable() { @Override public void run() { // commitLog、consumeQueue和IndexFile定时删除 DefaultMessageStore. this .cleanFilesPeriodically(); } }, 1000 * 60 , this .messageStoreConfig.getCleanResourceInterval() /*10s*/ , TimeUnit.MILLISECONDS); // ... } |
在cleanFilesPeriodically()中有两个方法, cleanCommitLogService.run() 负责清理CommitLog, cleanConsumeQueueService.run() 负责清理ConsumeQueue和IndexFile。
1 2 3 4 5 6 7 |
// org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically private void cleanFilesPeriodically() { // 清理CommitLog this .cleanCommitLogService.run(); // 清理ConsumeQueue和IndexFile this .cleanConsumeQueueService.run(); } |
CommitLog清理源码分析
CommitLog清理方法 CleanCommitLogService#run 调用了 CleanCommitLogService#deleteExpiredFiles ,deleteExpiredFiles方法的核心代码逻辑如下,以下三种情况会触发CommitLog文件的删除
当前时间是凌晨4点 CommitLog所在磁盘分区硬盘占用率超过75% 手动删除CommitLog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
// org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles private void deleteExpiredFiles() { // 是否是凌晨4点,用小时匹配[04:00,05:00) boolean timeup = this .isTimeToDelete(); // >75%就会返回true,如果大于85%,则触发强制删除 boolean spacefull = this .isSpaceToDelete(); // 手动删除次数是否>0 boolean manualDelete = this .manualDeleteFileSeveralTimes > 0 ; if (timeup /*凌晨4点*/ || spacefull /*空间满了*/ || manualDelete /*手动删除*/ ) { boolean cleanAtOnce = DefaultMessageStore. this .getMessageStoreConfig().isCleanFileForciblyEnable() /*默认true*/ && this .cleanImmediately /*空间占用超过85%,触发强制删除*/ ; // 删除CommitLog deleteCount = DefaultMessageStore. this 测试数据mitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0 ) { } else if (spacefull) { log.warn( "disk space will be full soon, but delete file failed." ); } } } |
CommitLog的清理逻辑在 MappedFileQueue#deleteExpiredFileByTime ,其核心代码如下所示,主要分为下面几个步骤
复制MappedFileQueue中的mappedFiles,循环处理删除逻辑,循环 mfsLength-1 次,也就是无论如何都会保留最新的MappedFile 如果MappedFile的最后修改时间超过72小时或CommitLog所在磁盘分区硬盘占用率超过85%触发强制删除MappedFile,则会删除MappedFile,每次删除最多删除10个MappedFile,相邻MappedFile删除时间间隔默认100ms 删除MappedFileQueue的mappedFiles数组中已删除的MappedFile,并返回删除MappedFile的数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByTime public int deleteExpiredFileByTime( final long expiredTime, final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately) { // 复制一份当前mappedFile Object[] mfs = this .copyMappedFiles( 0 ); // 会保留最后一个MappedFile int mfsLength = mfs.length - 1 ; int deleteCount = 0 ; List<MappedFile> files = new ArrayList<MappedFile>(); if ( null != mfs) { for ( int i = 0 ; i < mfsLength; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; // 最后修改时间+过期时间 long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; // 如果commitLog所在磁盘分区总容量超过85%,触发立即删除,或者超过了72小时的mappedFile if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { // 删除mappedFile if (mappedFile.destroy(intervalForcibly)) { files.add(mappedFile); deleteCount++; // 一次最多删除10个mappedFile if (files.size() >= DELETE_FILES_BATCH_MAX /*10*/ ) { break ; } if (deleteFilesInterval > 0 && (i + 1 ) < mfsLength) { try { // 删除文件时间间隔,默认100ms Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break ; } } else { //avoid deleting files in the middle break ; } } } // 从MappedFileQueue的mappedFiles中删除这个mappedFile deleteExpiredFile(files); return deleteCount; } |
ConsumeQueue和IndexFile清理源码分析
ConsumeQueue和IndexFile清理方法 CleanConsumeQueueService#run 调用了 CleanConsumeQueueService#deleteExpiredFiles 方法清理ConsumeQueue和IndexFile。 CleanConsumeQueueService#deleteExpiredFiles 核心代码如下,包括两个主要逻辑
遍历consumeQueueTable中的ConsumeQueue,调用 ConsumeQueue#deleteExpiredFile 删除过期ConsumeQueue 调用 IndexService#deleteExpiredFile 删除过期IndexFile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// org.apache.rocketmq.store.DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles private void deleteExpiredFiles() { if (minOffset > this .lastPhysicalMinOffset) { ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore. this .consumeQueueTable; // 遍历ConsumeQueue for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { // 删除consumeQueue int deleteCount = logic.deleteExpiredFile(minOffset); // ... 间隔100ms } } // 删除indexFile DefaultMessageStore. this .indexService.deleteExpiredFile(minOffset); } } |
ConsumeQueue文件清理
ConsumeQueue文件底层也是MappedFile,清理ConsumeQueue调用 MappedFileQueue#deleteExpiredFileByOffset 清理ConsumeQueue的过期MappedFile,源码如下,核心逻辑
复制MappedFileQueue中的mappedFiles,循环处理清理逻辑,循环 mfsLength-1 次,也就是无论如何都会保留最新的MappedFile 如果ConsumeQueue的MappedFile最后一个存储单元对应消息在CommitLog中的偏移量小于CommitLog的最小偏移量,说明当前MappedFile所有存储单元对应所有CommitLog的消息都已经被清理,因此调用 MappedFile#destroy 清理当前MappedFile 删除MappedFileQueue缓存的mappedFiles列表中已经被清理MappedFile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByOffset public int deleteExpiredFileByOffset( long offset, int unitSize) { // 复制一份mappedFiles Object[] mfs = this .copyMappedFiles( 0 ); List<MappedFile> files = new ArrayList<MappedFile>(); int deleteCount = 0 ; if ( null != mfs) { int mfsLength = mfs.length - 1 ; for ( int i = 0 ; i < mfsLength; i++) { boolean destroy; MappedFile mappedFile = (MappedFile) mfs[i]; // 取consumeQueue最后一条消息Buffer切片 SelectMappedBufferResult result = mappedFile.selectMappedBuffer( this .mappedFileSize - unitSize); if (result != null ) { // consumeQueue最后一个存储单元消息在commitLog的偏移量 long maxOffsetInLogicQueue = result.getByteBuffer().getLong(); result.release(); // 如果consumeQueue最后一条消息已经小于commitLog的最小offset,则说明要删除了 destroy = maxOffsetInLogicQueue < offset; if (destroy) { log.info( "physic min offset " + offset + ", logics in current mappedFile max offset " + maxOffsetInLogicQueue + ", delete it" ); } } // 删除ConsumeQueue的MappedFile if (destroy && mappedFile.destroy( 1000 * 60 )) { files.add(mappedFile); deleteCount++; } else { break ; } } } // 删除MappedFileQueue的mappedFiles列表中已经删除的MappedFile deleteExpiredFile(files); return deleteCount; } |
IndexFile清理
IndexFile清理逻辑与ConsumeQueue类似,都是删除文件中关联的CommitLog消息全部被删除的文件。核心逻辑包括下面两个
获取IndexFileList中所有最大offset小于CommitLog最小offset的IndexFile 删除过期的IndexFile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// org.apache.rocketmq.store.index.IndexService#deleteExpiredFile(long) public void deleteExpiredFile( long offset) { Object[] files = null ; try { // indexFileList的第一个索引文件的最后一个offset long endPhyOffset = this .indexFileList.get( 0 ).getEndPhyOffset(); if (endPhyOffset < offset) { files = this .indexFileList.toArray(); } } if (files != null ) { List<IndexFile> fileList = new ArrayList<IndexFile>(); for ( int i = 0 ; i < (files.length - 1 ); i++) { IndexFile f = (IndexFile) files[i]; // IndexFile中最大的offset小于CommitLog最小offset,说明文件可以被删除 if (f.getEndPhyOffset() < offset) { fileList.add(f); } else { break ; } } // 删除过期的IndexFile,并将其从indexFileList缓存中删除 this .deleteExpiredFile(fileList); } } |
总结
Broker消息清理机制由DefaultMessageStore负责,CommitLog、ConsumeQueue和IndexFile的清理都是按照文件颗粒度进行。
每10s检查一次,通常情况下每天凌晨4点删除超过72小时的CommitLog;如果CommitLog所在磁盘分区的磁盘占用率超过75%,则会触发CommitLog文件清理;如果CommitLog所在磁盘分区的磁盘占用率超过85%,则会强制删除CommitLog文件;
如果ConsumeQueue和IndexFile关联CommitLog都被删除,ConsumeQueue文件和IndexFile也会被清理。
以上就是RocketMQ | 源码分析】Broker过期消息清理机制的详细内容,更多关于RocketMQ | 源码分析】Broker过期消息清理机制的资料请关注其它相关文章!
原文链接:https://juejin.cn/post/7229799197096149049
查看更多关于RocketMQ源码分析之Broker过期消息清理机制的详细内容...