好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

RocketMQ broker文件清理源码解析

1. broker 清理文件介绍

本系列 RocketMQ4.8注释github地址 ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

1.1 哪些文件需要清理

首先我们需要介绍下在RocketMQ中哪些文件需要清理,其实可以想一想,在RocketMQ中哪些文件是一直在往里面写入东西的,最容易想到的就是 commitlog 了,因为在一个broker 进程中,所有的普通消息,事务消息,系统消息啥的都往这个 commitlog 中写,随着时间的越来越长,然后 commitlog 就会越积攒越多,肯定会有磁盘放不下的那一天,而且我们消息消费完成后,那些被消费完成后的消息其实作用就很小了,可能会有这么一个场景,比如说我线上出现了某个问题,我想看下关于这个问题的消息有没有被消费到,可能你会用到这个消息,但是这种问题一般就是比较紧急的,最近实效的,之前那些消息其实作用就基本没有了,所以就需要清理掉之前的消息。其实不光 commitlog 需要清理,还需要清理一下 ConsumeQueue 与 indexFile , 因为你 commitlog 里面的消息都被清理了, ConsumeQueue 与 indexFile 再保存着之前的一些数据,就是纯粹浪费空间了。

所以说 broker 文件清理主要是清理 commitlog , ConsumeQueue , indexFile 。

1.2 RocketMQ文件清理的机制

我们介绍下 RocketMQ 文件清理的机制, RocketMQ 默认是 清理72小时之前的消息 ,然后它有几个触发条件, 默认是 凌晨4点触发清 理, 除非你你这个 磁盘空间占用到75% 以上了。在清理 commitlog 的时候,并不是一条消息一条消息的清理,拿到所有的 MappedFile (抛去现在还在用着的,也就是最后一个) ,然后比对每个 MappedFile 的最后一条消息的时间,如果是 72小时之前 的就把 MappedFile 对应的文件删除了,销毁对应 MappedFile ,这种情况的话只要你 MappedFile 最后一条消息还在存活实效内的话 ,它就不会清理你这个 MappedFile ,就算你这个 MappedFile 靠前的消息过期了。但是有一种情况它不管你消息超没超过72小时,直接就是删,那就是磁盘空间不足的时候,也就是占了 85% 以上了,就会立即清理。

清理完成 commitlog 之后,就会拿到 commitlog 中最小的 offset ,然后去 ConsumeQueue 与 indexFile 中把小于 offset 的记录删除掉。清理 ConsumeQueue 的时候也是遍历 MappedFile ,然后它的最后一条消息(unit)小于 commitlog 中最小的 offset 的话,就说明这个 MappedFile 都小于 offset ,因为他们是顺序追加写的,这个MappedFile 就会清理掉,如果你 MappedFile 最后一个unit不是小于 offset 的话,这个 MappedFile 就不删了。

2. 源码解析

我们来看下源码是怎样实现的: 在broker 存储器 DefaultMessageStore 启动(start)的时候,会添加几个任务调度,其中有一个就是文件清理的:

?
1
2
3
4
5
6
7
8
9
10
11
private void addScheduleTask() {
     // todo 清理过期文件 每隔10s
     this .scheduledExecutorService.scheduleAtFixedRate( new Runnable() {
         @Override
         public void run() {
             // todo
             DefaultMessageStore. this .cleanFilesPeriodically();
         }
     }, 1000 * 60 , this .messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
     ...
}

默认是 10s 执行一次,可以看到它调用了 DefaultMessageStore 的 cleanFilesPeriodically 方法:

?
1
2
3
4
5
6
private void cleanFilesPeriodically() {
     // todo 清除CommitLog文件
     this .cleanCommitLogService.run();
     // todo 清除ConsumeQueue文件
     this .cleanConsumeQueueService.run();
}

2.1 清理commitlog

我们先来看下关于 commitlog 的清理工作:

?
1
2
3
4
5
6
7
8
9
public void run() {
     try {
         // todo 删除过期文件
         this .deleteExpiredFiles();
         this .redeleteHangedFile();
     } catch (Throwable e) {
         DefaultMessageStore.log.warn( this .getServiceName() + " service has exception. " , e);
     }
}

我们看下 deleteExpiredFiles 方法的实现:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void deleteExpiredFiles() {
     int deleteCount = 0 ;
     // 文件保留时间,如果超过了该时间,则认为是过期文件,可以被删除
     long fileReservedTime = DefaultMessageStore. this .getMessageStoreConfig().getFileReservedTime();
     // 删除物理文件的间隔时间,在一次清除过程中,可能需要被删除的文件不止一个,该值指定两次删除文件的间隔时间
     int deletePhysicFilesInterval = DefaultMessageStore. this .getMessageStoreConfig().getDeleteCommitLogFilesInterval();
     // 在清除过期文件时,如
     //果该文件被其他线程占用(引用次数大于0,比如读取消息),此时会
     //阻止此次删除任务,同时在第一次试图删除该文件时记录当前时间
     //戳,destroyMapedFileIntervalForcibly表示第一次拒绝删除之后能
     //保留文件的最大时间,在此时间内,同样可以被拒绝删除,超过该时
     //间后,会将引用次数设置为负数,文件将被强制删除
     int destroyMapedFileIntervalForcibly = DefaultMessageStore. this .getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
     // 指定删除文件的时间点,RocketMQ通过deleteWhen设置每天在
     //固定时间执行一次删除过期文件操作,默认凌晨4点
     boolean timeup = this .isTimeToDelete();
     // todo 检查磁盘空间是否充足,如果磁盘空间不充足,则返回true,表示应该触发过期文件删除操作
     boolean spacefull = this .isSpaceToDelete();
     // 预留手工触发机制,可以通过调用excuteDeleteFilesManualy
     //方法手工触发删除过期文件的操作,目前RocketMQ暂未封装手工触发
     //文件删除的命令
     boolean manualDelete = this .manualDeleteFileSeveralTimes &gt; 0 ;
     if (timeup || spacefull || manualDelete) {
         if (manualDelete)
             this .manualDeleteFileSeveralTimes--;
         boolean cleanAtOnce = DefaultMessageStore. this .getMessageStoreConfig().isCleanFileForciblyEnable() &amp;&amp; this .cleanImmediately;
         log.info( "begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}" ,
             fileReservedTime,
             timeup,
             spacefull,
             manualDeleteFileSeveralTimes,
             cleanAtOnce);
         fileReservedTime *= 60 * 60 * 1000 ;
         // todo 文件的销毁和删除
         deleteCount = DefaultMessageStore. this .commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
             destroyMapedFileIntervalForcibly, cleanAtOnce);
         if (deleteCount &gt; 0 ) {
         } else if (spacefull) {
             log.warn( "disk space will be full soon, but delete file failed." );
         }
     }
}

开始几个参数,一个是文件保留实效 默认是72小时 ,你可以使用 fileReservedTime 来配置,一个是删除文件的 间隔100ms ,再就是强行销毁 MappedFile 的 120s (这个为啥要强行销毁,因为它还害怕还有地方用着这个MappedFile,它有个专门的引用计数器,比如说我还有地方要读它的消息,这个时候计数器就是+1的)。

接着就是判断到没到删除的那个时间,它 默认是凌晨4点才能删除 :

?
1
2
3
4
5
6
7
8
9
10
private boolean isTimeToDelete() {
     // 什么时候删除,默认是凌晨4点 -&gt; 04
     String when = DefaultMessageStore. this .getMessageStoreConfig().getDeleteWhen();
     // 判断是不是到点了 就是判断的当前小时 是不是等于 默认的删除时间
     if (UtilAll.isItTimeToDo(when)) {
         DefaultMessageStore.log.info( "it's time to reclaim disk space, " + when);
         return true ;
     }
     return false ;
}

再接着就是看看空间是不是充足,看看磁盘空间使用占比是什么样子的:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private boolean isSpaceToDelete() {
     // 表示CommitLog文件、ConsumeQueue文件所在磁盘分区的最大使用量,如果超过该值,则需要立即清除过期文件
     double ratio = DefaultMessageStore. this .getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0 ;
     // 表示是否需要立即执行清除过期文件的操作
     cleanImmediately = false ;
     {
         // 当前CommitLog目录所在的磁盘分区的磁盘使用率,通过File#getTotalSpace方法获取文件所在磁盘分区的总容量,
         //通过File#getFreeSpace方法获取文件所在磁盘分区的剩余容量
         double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
         // diskSpaceWarningLevelRatio:默认0.90。如果磁盘分区使用率超过该阈值,将设置磁盘为不可写,此时会拒绝写入新消息
         // 如果当前磁盘分区使用率大于diskSpaceWarningLevelRatio,应该立即启动过期文件删除操作
         if (physicRatio &gt; diskSpaceWarningLevelRatio) {
             // 设置 磁盘不可写
             boolean diskok = DefaultMessageStore. this .runningFlags.getAndMakeDiskFull();
             if (diskok) {
                 DefaultMessageStore.log.error( "physic disk maybe full soon " + physicRatio + ", so mark disk full" );
             }
             cleanImmediately = true ;
         //diskSpaceCleanForciblyRatio:默认0.85 如果磁盘分区使用超过该阈值,建议立即执行过期文件删除,但不会拒绝写入新消息
         // 如果当前磁盘分区使用率大于diskSpaceCleanForciblyRatio,建议立即执行过期文件清除
         } else if (physicRatio &gt; diskSpaceCleanForciblyRatio) {
             cleanImmediately = true ;
         } else {
             // 设置 磁盘可以写入
             boolean diskok = DefaultMessageStore. this .runningFlags.getAndMakeDiskOK();
             if (!diskok) {
                 DefaultMessageStore.log.info( "physic disk space OK " + physicRatio + ", so mark disk ok" );
             }
         }
         // 如果当前磁盘使用率小于diskMaxUsedSpaceRatio,则返回false,表示磁盘使用率正常,
         // 否则返回true,需要执行删除过期文件
         if (physicRatio &lt; 0 || physicRatio &gt; ratio) {
             DefaultMessageStore.log.info( "physic disk maybe full soon, so reclaim space, " + physicRatio);
             return true ;
         }
     }
     /**
      * 对consumeQueue 做同样的判断
      */
     ...
     return false ;
}

这里其实不光是判断 commitlog 的存储区域,后面还有段判断 ConsumeQueue 的存储区域的,然后与这块逻辑一样,就没有放上。这里就是获取默认的最大使用占比 就是 75% ,接着就是看看 commitlog 存储的那地方使用了多少了,如果是使用 90% 了,就设置 runningFlag 说磁盘满了,立即清理设置成 true ,这个参数设置成true之后,就不会管你消息有没有超过72小时,如果你使用了 85% 以上了,也是设置立即清理,如果超过 75% 返回true。好了,磁盘占用空间这块我们就看完了。

接着看上面 deleteExpiredFiles 方法实现,还有一个 手动清除 的,这块我没有找到哪里有用到的,如果后续找到,会补充上, 判断 到了清理的点 或者是磁盘空间满了 或者是手动删除了,满足一个条件就ok了,如果是立即清除是个true,它这里这个 cleanAtOnce 变量就是true了,因为前面那个 强制清理是默认开启 的。

接着计算了一下 fileReservedTime 就是将小时转成了毫秒,为了后面好比对,最后就是调用 commitlo g的 deleteExpiredFile 方法清理了:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
  * 删除过期的文件
  * @param expiredTime 过期时间 默认72小时
  * @param deleteFilesInterval 删除文件的间隔 100ms
  * @param intervalForcibly  强制删除 1000 * 120
  * @param cleanImmediately 是不是要一次性清理了
  * @return
  */
public int deleteExpiredFile(
     final long expiredTime,
     final int deleteFilesInterval,
     final long intervalForcibly,
     final boolean cleanImmediately
) {
     // todo
     return this .mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}

可以看到 commitlog 对象调用 mappedFileQueue 的 deleteExpiredFileByTime 方法来处理的,这个 mappedFileQueue 就是管理了一堆 MappedFile :

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
  * 删除文件
  *
  * 从倒数第二个文件开始遍历,计算文件的最大存活时间,即文件的最后一次更新时间+文件存活时间(默认
  * 72小时),如果当前时间大于文件的最大存活时间或需要强制删除文
  * 件(当磁盘使用超过设定的阈值)时,执行MappedFile#destory方
  * 法,清除MappedFile占有的相关资源,如果执行成功,将该文件加入
  * 待删除文件列表中,最后统一执行File#delete方法将文件从物理磁盘
  * 中删除。
  */
public int deleteExpiredFileByTime( final long expiredTime,
     final int deleteFilesInterval,
     final long intervalForcibly,
     final boolean cleanImmediately) {
     // 拿到mappedFile的引用
     Object[] mfs = this .copyMappedFiles( 0 );
     if ( null == mfs)
         return 0 ;
     int mfsLength = mfs.length - 1 ;
     int deleteCount = 0 ;
     List<MappedFile> files = new ArrayList<MappedFile>();
     if ( null != mfs) {
         for ( int i = 0 ; i < mfsLength; i++) {
             MappedFile mappedFile = (MappedFile) mfs[i];
             // 计算文件的最大存活时间,即文件的最后一次更新时间+文件存活时间(默认72小时)
             long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
             // 如果当前时间大于文件的最大存活时间 或 需要强制删除文件(当磁盘使用超过设定的阈值)时
             if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                 // todo 执行destroy方法
                 if (mappedFile.destroy(intervalForcibly)) {
                     files.add(mappedFile);
                     deleteCount++;
                     // 一批 最多删除10 个
                     if (files.size() >= DELETE_FILES_BATCH_MAX) {
                         break ;
                     }
                     // 删除间隔
                     if (deleteFilesInterval > 0 && (i + 1 ) < mfsLength) {
                         try {
                             Thread.sleep(deleteFilesInterval);
                         } catch (InterruptedException e) {
                         }
                     }
                 } else {
                     break ;
                 }
             } else {
                 //avoid deleting files in the middle
                 break ;
             }
         }
     }
     // todo 统一执行File#delete方法将文件从物理磁盘中删除
     deleteExpiredFile(files);
     return deleteCount;
}

这里首先是拿到所有 MappedFile 的引用,然后就是遍历了,可以看到它这个length是-1的,也就是最后一个 MappedFile 是遍历不到的,这个是肯定的,因为最后一个 MappedFile 肯定是在用着的,如果你来个强制清理,一下清理了,就没法提供服务了。

遍历的时候,拿到对应 MappedFile 里面最后一条消息,看看它的写入时间是不是已经过了这个过期时间了,或者直接强制删除,就会执行 MappedFile 的销毁方法,而且带着销毁时间:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
  * 销毁方法
  * @param intervalForcibly 表示拒绝被销毁的最大存活时间
  * @return
  */
public boolean destroy( final long intervalForcibly) {
     // todo
     this .shutdown(intervalForcibly);
     // 清理结束
     if ( this .isCleanupOver()) {
         try {
             // 关闭文件通道,
             this .fileChannel.close();
             log.info( "close file channel " + this .fileName + " OK" );
             long beginTime = System.currentTimeMillis();
             // 删除物理文件
             boolean result = this .file.delete();
             log.info( "delete file[REF:" + this .getRefCount() + "] " + this .fileName
                 + (result ? " OK, " : " Failed, " ) + "W:" + this .getWrotePosition() + " M:"
                 + this .getFlushedPosition() + ", "
                 + UtilAll.computeElapsedTimeMilliseconds(beginTime));
         } catch (Exception e) {
             log.warn( "close file channel " + this .fileName + " Failed. " , e);
         }
         return true ;
     } else {
         log.warn( "destroy mapped file[REF:" + this .getRefCount() + "] " + this .fileName
             + " Failed. cleanupOver: " + this .cleanupOver);
     }
     return false ;
}
public void shutdown( final long intervalForcibly) {
     // 关闭MappedFile
     if ( this .available) {
         this .available = false ;
         // 初次关闭的时间戳
         this .firstShutdownTimestamp = System.currentTimeMillis();
         // todo 尝试释放资源
         this .release();
     } else if ( this .getRefCount() > 0 ) {
         if ((System.currentTimeMillis() - this .firstShutdownTimestamp) >= intervalForcibly) {
             this .refCount.set(- 1000 - this .getRefCount());
             this .release();
         }
     }
}

这里就不详细说了,其实就是 shutdown ,然后过了 120s后强制把引用清了 ,之后就是关闭 channel ,删除对应文件。

接着往下说,就是销毁成功了,会记录删除数量,判断删了多少了,一批是最多删10个的,这块应该是怕影响性能的,你一直删的的话,这东西很消耗磁盘性能,容易影响其他写入,读取功能,如果你销毁失败,直接就停了。最后就是将删除的这些 MappedFile 从 MappedFileQueue 中删除掉。再回到 commitlog clean service 的 run 方法:

?
1
2
3
4
5
6
7
8
9
10
public void run() {
     try {
         // todo 删除过期文件
         this .deleteExpiredFiles();
         // todo
         this .redeleteHangedFile();
     } catch (Throwable e) {
         DefaultMessageStore.log.warn( this .getServiceName() + " service has exception. " , e);
     }
}

我们 deleteExpiredFiles 方法已经介绍完了,然后再来看看第二个方法是干嘛的,这个其实就是判断第一个 MappedFile 还可不可用了,如果不可用的话,就删了,这块有可能是上面 deleteExpiredFiles 方法 MappedFile 销毁失败,然后设置了不可用,但是没有清理掉,所以这块再来善后下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void redeleteHangedFile() {
     // redeleteHangedFileInterval间隔 默认1000*120
     int interval = DefaultMessageStore. this .getMessageStoreConfig().getRedeleteHangedFileInterval();
     // 当前时间戳
     long currentTimestamp = System.currentTimeMillis();
     if ((currentTimestamp - this .lastRedeleteTimestamp) > interval) {
         this .lastRedeleteTimestamp = currentTimestamp;
         // 获取强制销毁Mapped文件间隔
         int destroyMapedFileIntervalForcibly =
             DefaultMessageStore. this .getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
         // todo 重新删除第一个MappedFile
         if (DefaultMessageStore. this .commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
         }
     }
}
public boolean retryDeleteFirstFile( final long intervalForcibly) {
     // 获取到 第一个mappedFile
     MappedFile mappedFile = this .getFirstMappedFile();
     if (mappedFile != null ) {
         // 不可用
         if (!mappedFile.isAvailable()) {
             log.warn( "the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());
             // 销毁
             boolean result = mappedFile.destroy(intervalForcibly);
             if (result) {
                 log.info( "the mappedFile re delete OK, " + mappedFile.getFileName());
                 List<MappedFile> tmpFiles = new ArrayList<MappedFile>();
                 tmpFiles.add(mappedFile);
                 this .deleteExpiredFile(tmpFiles);
             } else {
                 log.warn( "the mappedFile re delete failed, " + mappedFile.getFileName());
             }
             return result;
         }
     }
     return false ;
}

这块就是看第一个 MappedFile 还可不可用,不可用的话,就销毁掉。好了 commitlog 文件清理源码就解析完成了。接下来看下这个 ConsumeQueue与indexFile 的清理。

2.2 ConsumeQueue 清理

?
1
2
3
4
5
6
private void cleanFilesPeriodically() {
     // todo 清除CommitLog文件
     this .cleanCommitLogService.run();
     // todo 清除ConsumeQueue文件
     this .cleanConsumeQueueService.run();
}

DefaultMessageStore.CleanConsumeQueueService#run:

?
1
2
3
4
5
6
7
8
public void run() {
     try {
         // 删除 过期的file
         this .deleteExpiredFiles();
     } catch (Throwable e) {
         DefaultMessageStore.log.warn( this .getServiceName() + " service has exception. " , e);
     }
}

接下来DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void deleteExpiredFiles() {
     // 删除间隔 100
     int deleteLogicsFilesInterval = DefaultMessageStore. this .getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
     // 获取 commitLog 的最小offset
     long minOffset = DefaultMessageStore. this .commitLog.getMinOffset();
     if (minOffset > this .lastPhysicalMinOffset) {
         // 上次 清理 到哪了
         this .lastPhysicalMinOffset = minOffset;
         ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore. this .consumeQueueTable;
         // 遍历删除
         for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
             for (ConsumeQueue logic : maps.values()) {
                 // 进行删除
                 int deleteCount = logic.deleteExpiredFile(minOffset);
                 // 间隔
                 if (deleteCount > 0 && deleteLogicsFilesInterval > 0 ) {
                     try {
                         Thread.sleep(deleteLogicsFilesInterval);
                     } catch (InterruptedException ignored) {
                     }
                 }
             }
         }
         // todo 删除 过期的 indexFile
         DefaultMessageStore. this .indexService.deleteExpiredFile(minOffset);
     }
}

首先是获取删除间隔,然后拿到 commitlog 中最小的那个 offset ,接着就是判断上次清理位置与最小 offset 比较,如果 offset 大于它上次清理的位置的话,就说明 它得把最小 offset 之前的清理掉。先是记录最后一次清理的 offset 是最小 offset , 接着就是遍历所有的 ConsumeQueue ,调用每个 ConsumeQueue 的 deleteExpiredFile 方法来清理,我们来看下这个方法:

?
1
2
3
4
5
6
7
public int deleteExpiredFile( long offset) {
     // 进行销毁 然后得到销毁个数
     int cnt = this .mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
     // 纠正最小偏移量
     this .correctMinOffset(offset);
     return cnt;
}

CQ_STORE_UNIT_SIZE 这个就是每个 unit 占20个字节,见。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
  * 删除过期的file
  * @param offset 最小offset
  * @param unitSize 大小为20字节
  * @return
  */
public int deleteExpiredFileByOffset( long offset, int unitSize) {
     Object[] mfs = this .copyMappedFiles( 0 );
     List<MappedFile> files = new ArrayList<MappedFile>();
     int deleteCount = 0 ;
     if ( null != mfs) {
         int mfsLength = mfs.length - 1 ;
         for ( int i = 0 ; i < mfsLength; i++) {
             boolean destroy;
             MappedFile mappedFile = (MappedFile) mfs[i];
             // 最后一个单元位置到这个MappedFile结束,其实就是获取最后一个单元
             SelectMappedBufferResult result = mappedFile.selectMappedBuffer( this .mappedFileSize - unitSize);
             if (result != null ) {
                 // 获取最大的offset
                 long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
                 result.release();
                 // 判断是否销毁 如果小于offset 就要销毁
                 destroy = maxOffsetInLogicQueue < offset;
                 if (destroy) {
                     log.info( "physic min offset " + offset + ", logics in current mappedFile max offset "
                         + maxOffsetInLogicQueue + ", delete it" );
                 }
             } else if (!mappedFile.isAvailable()) { // Handle hanged file.
                 log.warn( "Found a hanged consume queue file, attempting to delete it." );
                 destroy = true ;
             } else {
                 log.warn( "this being not executed forever." );
                 break ;
             }
             // 进行销毁
             if (destroy && mappedFile.destroy( 1000 * 60 )) {
                 files.add(mappedFile);
                 deleteCount++;
             } else {
                 break ;
             }
         }
     }
     // 删除引用
     deleteExpiredFile(files);
     return deleteCount;
}

它的删除跟 commitlog 的差不多,只不过commitlog 是根据时间来判断的,它是根据commitlog 的offset 来判断的,判断要不要删除这个MappedFile,如果这个MappedFile最后一个unit 存储的offset 小于 commitlog 最小的offset 的话就要销毁了。接着就是销毁,超时时间是1分钟,最后是删除引用。

2.3 indexFile 清理

最后我们来看下 indexFile 的清理工作: DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles:

?
1
2
3
4
5
6
7
8
9
10
11
private void deleteExpiredFiles() {
     // 删除间隔 100
     int deleteLogicsFilesInterval = DefaultMessageStore. this .getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
     // 获取 commitLog 的最小offset
     long minOffset = DefaultMessageStore. this .commitLog.getMinOffset();
     if (minOffset > this .lastPhysicalMinOffset) {
         ...
         // todo 删除 过期的 indexFile
         DefaultMessageStore. this .indexService.deleteExpiredFile(minOffset);
     }
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
  * 删除 过期文件
  * @param offset 最小的offset 小于这个offset都要删除
  */
public void deleteExpiredFile( long offset) {
     Object[] files = null ;
     try {
         // 获取读锁
         this .readWriteLock.readLock().lock();
         if ( this .indexFileList.isEmpty()) {
             return ;
         }
         // 获取第一个indexFile 的一个offset
         long endPhyOffset = this .indexFileList.get( 0 ).getEndPhyOffset();
         if (endPhyOffset < offset) {
             files = this .indexFileList.toArray();
         }
     } catch (Exception e) {
         log.error( "destroy exception" , e);
     } finally {
         this .readWriteLock.readLock().unlock();
     }
     if (files != null ) {
         // 找到需要删除的indexFile
         List<IndexFile> fileList = new ArrayList<IndexFile>();
         for ( int i = 0 ; i < (files.length - 1 ); i++) {
             IndexFile f = (IndexFile) files[i];
             if (f.getEndPhyOffset() < offset) {
                 fileList.add(f);
             } else {
                 break ;
             }
         }
         // 删除
         this .deleteExpiredFile(fileList);
     }
}

可以看到,先是拿第一个 indexFile 看看有没有小于 commitlog 最小 offset 的情况发生,这里也是拿的 indexFile 最后一个 offset 做的对比,因为这块也是按照 offset 大小 前后顺序处理的,最后一个的 offest 肯定是这个 indexFile 中最大的了,如果第一个 indexFile 满足了的话,就会拿到所有引用,然后遍历找出符合条件的 indexFile , 调用 deleteExpiredFile 方法遍历销毁:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void deleteExpiredFile(List<IndexFile> files) {
     if (!files.isEmpty()) {
         try {
             this .readWriteLock.writeLock().lock();
             for (IndexFile file : files) {
                 // 销毁
                 boolean destroyed = file.destroy( 3000 );
                 // 从index 集合中移除
                 destroyed = destroyed && this .indexFileList.remove(file);
                 if (!destroyed) {
                     log.error( "deleteExpiredFile remove failed." );
                     break ;
                 }
             }
         } catch (Exception e) {
             log.error( "deleteExpiredFile has exception." , e);
         } finally {
             this .readWriteLock.writeLock().unlock();
         }
     }
}

这里就是遍历销毁,然后移除对这个 indexFile 管理。

3. 总结

本文主要是介绍了RocketMQ broker 消息清理机制 ,介绍了主要清理哪些文件 : commitlog ,ConsumeQueue,indexFile

接着就是介绍了什么时候触发清理,比如说凌晨4点 ,磁盘没满85% 以上的话,就是清理72小时之前的,如果是满了85%就除了还在用着的那个先清10个看看, 还有就是磁盘使用空间75% 以上也是会触发的, 低于85 % 清理72小时之前的,高于85% 先清理10个文件看看,这是commitlog的清理机制,关于ConsumeQueue与indexFile的话,就是与commitlog中最小的那个offset 有关了,小于commitlog中最小offset 的那些还是要清理掉的。 最后就是分别解析了一下commitlog 文件清理,ConsumeQueue 文件清理与indexFile 文件清理。

参考  

RocketMQ源码分析专栏

以上就是RocketMQ broker文件清理源码解析的详细内容,更多关于RocketMQ broker文件清理的资料请关注其它相关文章!

原文链接:https://juejin.cn/post/7216845797143429178

查看更多关于RocketMQ broker文件清理源码解析的详细内容...

  阅读:17次