刷盘策略
CommitLog
的asyncPutMessage
方法中可以看到在写入消息之后,调用了submitFlushRequest
方法执行刷盘策略:
public class CommitLog { public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { // ... try { // 获取上一次写入的文件 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // ... // 写入消息 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); // ... } finally { beginTimeInLock = 0; putMessageLock.unlock(); } // ... // 执行刷盘 CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); // ... } }
刷盘有两种策略:
-
同步刷盘,表示消息写入到内存之后需要立刻刷到磁盘文件中。
同步刷盘会构建
GroupCommitRequest
组提交请求并设置本次刷盘后的位置偏移量的值(写入位置偏移量+写入数据字节数),然后将请求添加到flushDiskWatcher
和GroupCommitService
中进行刷盘。 -
异步刷盘,表示消息写入内存成功之后就返回,由MQ定时将数据刷入到磁盘中,会有一定的数据丢失风险。
public class CommitLog { // 监控刷盘 private final FlushDiskWatcher flushDiskWatcher; public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // 是否是同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 获取GroupCommitService final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; // 是否等待 if (messageExt.isWaitStoreMsgOK()) { // 构建组提交请求,传入本次刷盘后位置的偏移量:写入位置偏移量+写入数据字节数 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 添加到wather中 flushDiskWatcher.add(request); // 添加到service service.putRequest(request); // 返回 return request.future(); } else { service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } // 如果是异步刷盘 else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } }
同步刷盘
如果使用的是同步刷盘,首先获取了GroupCommitService
,然后构建GroupCommitRequest
组提交请求,将请求添加到flushDiskWatcher
和GroupCommitService
中,其中flushDiskWatcher用于监控刷盘是否超时,GroupCommitService用于提交刷盘数据。
构建GroupCommitRequest提交请求
GroupCommitRequest
是CommitLog
的内部类:
- nextOffset:写入位置偏移量+写入数据字节数,也就是本次刷盘成功后应该对应的flush偏移量
- flushOKFuture:刷盘结果
- deadLine:刷盘的限定时间,值为当前时间 + 传入的超时时间,超过限定时间还未刷盘完毕会被认为超时
public class CommitLog { public static class GroupCommitRequest { private final long nextOffset; // 刷盘状态 private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>(); private final long deadLine;// 刷盘的限定时间,超过限定时间还未刷盘完毕会被认为超时 public GroupCommitRequest(long nextOffset, long timeoutMillis) { this.nextOffset = nextOffset; // 设置限定时间:当前时间 + 超时时间 this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000); } public void wakeupCustomer(final PutMessageStatus putMessageStatus) { // 结束刷盘,设置刷盘状态 this.flushOKFuture.complete(putMessageStatus); } public CompletableFuture<PutMessageStatus> future() { // 返回刷盘状态 return flushOKFuture; } } }
GroupCommitService处理刷盘
GroupCommitService
是CommitLog
的内部类,从继承关系中可知它实现了Runnable接口,在run方法调用waitForRunning
等待刷盘请求的提交,然后处理刷盘,不过这个线程是在什么时候启动的呢?
public class CommitLog { /** * GroupCommit Service */ class GroupCommitService extends FlushCommitLogService { // ... // run方法 public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 等待刷盘请求的到来 this.waitForRunning(10); // 处理刷盘 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // ... } } }
刷盘线程的启动
在BrokerController的启动方法中,可以看到调用了messageStore
的start方法,前面可知使用的是DefaultMessageStore
,进入到DefaultMessageStore
的start方法,它又调用了commitLog
的start方法,在CommitLog
的start
方法中,启动了刷盘的线程和监控刷盘的线程:
public class BrokerController { public void start() throws Exception { if (this.messageStore != null) { // 启动 this.messageStore.start(); } // ... } } public class DefaultMessageStore implements MessageStore { /** * @throws Exception */ public void start() throws Exception { // ... this.flushConsumeQueueService.start(); // 调用CommitLog的启动方法 this.commitLog.start(); this.storeStatsService.start(); // ... } } public class CommitLog { private final FlushCommitLogService flushCommitLogService; // 刷盘 private final FlushDiskWatcher flushDiskWatcher; // 监控刷盘 private final FlushCommitLogService commitLogService; // commitLogService public void start() { // 启动刷盘的线程 this.flushCommitLogService.start(); flushDiskWatcher.setDaemon(true); // 启动监控刷盘的线程 flushDiskWatcher.start(); if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { this.commitLogService.start(); } } }
刷盘请求的处理
既然知道了线程在何时启动的,接下来详细看一下GroupCommitService
是如何处理刷盘提交请求的。
前面知道在GroupCommitService
的run方法中,调用了waitForRunning
方法等待刷盘请求,waitForRunning
在GroupCommitService
父类ServiceThread
中实现。ServiceThread
是一个抽象类,实现了Runnable接口,里面使用了CountDownLatch进行线程间的通信,大小设为1。
waitForRunning
方法在进入的时候先判断hasNotified
是否为true(已通知),并尝试将其更新为false(未通知),由于hasNotified的初始化值为false,所以首次进入的时候条件不成立,不会进入到这个处理逻辑,会继续执行后面的代码。
接着调用 waitPoint的reset方法将其重置为1,并调用waitPoint的await方法进行等待:
// ServiceThread public abstract class ServiceThread implements Runnable { // 是否通知,初始化为false protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false); // CountDownLatch用于线程间的通信 protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); // 等待运行 protected void waitForRunning(long interval) { // 判断hasNotified是否为true,并尝试将其更新为false if (hasNotified.compareAndSet(true, false)) { // 调用onWaitEnd this.onWaitEnd(); return; } // 重置waitPoint的值,也就是值为1 waitPoint.reset(); try { // 会一直等待waitPoint值降为0 waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { // 是否被通知设置为false hasNotified.set(false); this.onWaitEnd(); } } }
一、添加刷盘请求,唤醒刷盘线程
上面可知需要刷盘的时候调用了GroupCommitService
的putRequest
方法添加刷盘请求,在putRequest
方法中,将刷盘请求GroupCommitRequest
添加到了requestsWrite
组提交写请求链表中,然后调用wakeup方法唤醒刷盘线程,wakeup方法在它的父类ServiceThread
中实现。
在wakeup
方法中可以看到,首先将hasNotified
更改为了true表示处于已通知状态,然后调用了countDown方法,此时waitPoint
值变成0,就会唤醒之前waitForRunning
方法中一直在等待的线程。
public class CommitLog { /** * 组提交Service */ class GroupCommitService extends FlushCommitLogService { // 组提交写请求链表 private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>(); // ... // 添加提交请求 public synchronized void putRequest(final GroupCommitRequest request) { // 加锁 lock.lock(); try { // 加入到写请求链表 this.requestsWrite.add(request); } finally { lock.unlock(); } // 唤醒线程执行提交任务 this.wakeup(); } // ... } } // ServiceThread public abstract class ServiceThread implements Runnable { // CountDownLatch用于线程间的通信 protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); // 唤醒刷盘线程 public void wakeup() { // 更改状态为已通知状态 if (hasNotified.compareAndSet(false, true)) { // waitPoint的值减1,由于大小设置为1,减1之后变为0,会唤醒等待的线程 waitPoint.countDown(); } } // ... }
二、线程被唤醒,执行刷盘前的操作
waitForRunning
方法中的await
方法一直在等待countdown的值变为0,当上一步调用了wakeup后,就会唤醒该线程,然后开始往下执行,在finally中可以看到将是否被通知hasNotified又设置为了false,然后调用了onWaitEnd方法,GroupCommitService
方法中重写了该方法,里面又调用了swapRequests
方法将读写请求列表的数据进行了交换,putRequest方法中将提交的刷盘请求放在了写链表中,经过交换,数据会被放在读链表中,后续进行刷盘时会从读链表中获取请求进行处理:
// ServiceThread public abstract class ServiceThread implements Runnable { // CountDownLatch protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); // 等待运行 protected void waitForRunning(long interval) { if (hasNotified.compareAndSet(true, false)) { // 交换 this.onWaitEnd(); return; } // 重置 waitPoint.reset(); try { // 会一直等待countdown为0 waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { // 是否被通知设置为false hasNotified.set(false); this.onWaitEnd(); } } } public class CommitLog { /** * 组提交Service */ class GroupCommitService extends FlushCommitLogService { // 组提交写请求链表 private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>(); // 组提交读请求链表 private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>(); @Override protected void onWaitEnd() { // 交换读写请求列表的数据请求 this.swapRequests(); } private void swapRequests() { // 加锁 lock.lock(); try { // 将读写请求链表的数据进行交换 LinkedList<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } finally { lock.unlock(); } } // ... } }
这里使用读写链表进行交换应该是为了提升性能,如果只使用一个链表,在提交请求的时候需要往链表中添加请求,此时需要加锁,而刷盘线程在处理完请求之后是需要从链表中移除请求的,假设添加请求时加的锁还未释放,刷盘线程就要一直等待,而添加和处理完全可以同时进行,所以使用了两个链表,在添加请求的时候使用写链表,处理请求的时候对读写链表的数据进行交换使用读链表,这样只需在交换数据的时候加锁,以此来提升性能。
三、执行刷盘
waitForRunning
执行完毕后,会回到GroupCommitService
中的run方法开始继续往后执行代码,从代码中可以看到接下来会调用doCommit
方法执行刷盘。
doCommit
方法中对读链表中的数据进行了判空,如果不为空,进行遍历处理每一个提交请求,处理逻辑如下:
- 获取CommitLog映射文件记录的刷盘位置偏移量
flushedWhere
,判断是否大于请求设定的刷盘位置偏移量nextOffset
,正常情况下flush的位置应该小于本次刷入数据后的偏移量,所以如果flush位置大于等于本次请求设置的flush偏移量,本次将不能进行刷盘
-
开启一个循环,调用
mappedFileQueue
的flush
方法执行刷盘(具体的实现在异步刷盘的时候再看),由于CommitLog大小为1G,所以本次刷完之后,如果当前已经刷入的偏移量小于请求设定的位置,表示数据未刷完,需要继续刷,反之表示数据已经刷完,flushOK为true,for循环条件不满足结束执行。 -
请求处理之后会清空读链表。
public class CommitLog { /** * 组提交Service */ class GroupCommitService extends FlushCommitLogService { // 运行 public void run() { CommitLog.log.info(this.getServiceName() + " service started"); // 如果没有停止 while (!this.isStopped()) { try { // 等待唤醒刷盘线程 this.waitForRunning(10); // 进行提交 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // 睡眠10毫秒 try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn(this.getServiceName() + " Exception, ", e); } synchronized (this) { this.swapRequests(); } // 停止之前提交一次 this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); } // 提交刷盘 private void doCommit() { // 如果不为空 if (!this.requestsRead.isEmpty()) { // 遍历刷盘请求 for (GroupCommitRequest req : this.requestsRead) { // 获取映射文件的flush位置,判断是否大于请求设定的刷盘位置 boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); for (int i = 0; i < 2 && !flushOK; i++) { // 进行刷盘 CommitLog.this.mappedFileQueue.flush(0); // 由于CommitLog大小为1G,所以本次刷完之后,如果当前已经刷入的偏移量小于请求设定的位置,表示数据未刷完,需要继续刷,反之表示数据已经刷完,flushOK为true,for循环条件不满足结束执行 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } // 设置刷盘结果 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } // 请求处理完之后清空链表 this.requestsRead = new LinkedList<>(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } } }
刷盘超时监控
FlushDiskWatcher用于监控刷盘请求的耗时,它也继承了ServiceThread
,在Broker启动时开启了该线程,在run方法中,使用while循环,只要服务未停止,会一直从阻塞队列中获取提交的刷盘请求,开启while循环隔一段时间判断一下刷盘是否完成,如果未完成,会做如下判断:
- 使用当前时间减去请求设置的刷盘截止时间,如果已经超过截止时间,说明刷盘时间已经超时,调用
wakeupCustomer
方法设置刷盘结果为已超时 - 如果未超时,为了避免当前线程频繁的进行判断,将当前线程睡眠一会儿,睡眠的计算方式是使用刷盘请求设置的截止时间 - 当前时间,表示剩余的时间,然后除以1000000化为毫秒,得到距离刷盘截止时间的毫秒数sleepTime:
sleepTime
如果为0,只能是当前时间等于截止时间,也就是到了截止时间,此时同样调用wakeupCustomer
方法设置刷盘结果为已超时sleepTime
不为0,在10毫秒和sleepTime的值之间取较小的那个作为睡眠的毫秒数将当前线程睡眠,等待刷盘任务执行
public class FlushDiskWatcher extends ServiceThread { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); // 阻塞队列,存放提交请求 private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>(); @Override public String getServiceName() { return FlushDiskWatcher.class.getSimpleName(); } @Override public void run() { // 如果未停止 while (!isStopped()) { GroupCommitRequest request = null; try { // 从阻塞队列中获取提交请求 request = commitRequests.take(); } catch (InterruptedException e) { log.warn("take flush disk commit request, but interrupted, this may caused by shutdown"); continue; } // 如果还未完成 while (!request.future().isDone()) { long now = System.nanoTime(); // 如果已经超时 if (now - request.getDeadLine() >= 0) { // 设置刷盘结果为超时 request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT); break; } // 避免频繁的判断,使用(截止时间 - 当前时间)/1000000 计算一个毫秒数 long sleepTime = (request.getDeadLine() - now) / 1_000_000; // 在计算的毫秒数与10之间取最小的 sleepTime = Math.min(10, sleepTime); // 如果sleepTime为0表示已经到了截止时间 if (sleepTime == 0) { // 设置刷盘结果为超时 request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT); break; } try { // 睡眠等待刷盘任务的执行 Thread.sleep(sleepTime); } catch (InterruptedException e) { log.warn( "An exception occurred while waiting for flushing disk to complete. this may caused by shutdown"); break; } } } } }
异步刷盘
上面讲解了同步刷盘,接下来去看下异步刷盘,首先会判断是否使用了暂存池,如果未开启调用flushCommitLogService
的wakeup
唤醒刷盘线程,否则使用commitLogService
先将数据写入到FileChannel,然后统一进行刷盘:
public class CommitLog { private final FlushDiskWatcher flushDiskWatcher; public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // 是否是同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // ... } // 如果是异步刷盘 else { // 如果未使用暂存池 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { // 唤醒刷盘线程进行刷盘 flushCommitLogService.wakeup(); } else { // 如果使用暂存池,使用commitLogService,先将数据写入到FILECHANNEL,然后统一进行刷盘 commitLogService.wakeup(); } // 返回结果 return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } }
在CommitLog
的构造函数中可以看到,commitLogService使用的是CommitRealTimeService
进行实例化的,flushCommitLogService需要根据设置决定使用哪种类型进行实例化:
- 如果是同步刷盘,使用
GroupCommitService
,由前面的同步刷盘可知,使用的就是GroupCommitService进行刷盘的。 - 如果是异步刷盘,使用
FlushRealTimeService
。
所以接下来需要关注CommitRealTimeService
和FlushRealTimeService
:
public class CommitLog { private final FlushCommitLogService flushCommitLogService; // 刷盘Service private final FlushCommitLogService commitLogService; public CommitLog(final DefaultMessageStore defaultMessageStore) { // 如果设置的同步刷盘 if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 使用GroupCommitService this.flushCommitLogService = new GroupCommitService(); } else { // 使用FlushRealTimeService this.flushCommitLogService = new FlushRealTimeService(); } // commitLogService this.commitLogService = new CommitRealTimeService(); } }
CommitRealTimeService
在开启暂存池时,会使用CommitRealTimeService
,它继承了FlushCommitLogService
,所以会实现run方法,处理逻辑如下:
- 从配置信息中获取提交间隔、每次提交的最少页数和两次提交的最大间隔时间
- 如果当前时间大于上次提交时间+两次提交的最大间隔时间,意味着已经有比较长的一段时间没有进行提交了,需要尽快刷盘,此时将每次提交的最少页数设置为0不限制提交页数
- 调用
mappedFileQueue
的commit
方法进行提交,并返回提交的结果:- 如果结果为true表示未提交任何数据
- 如果结果为false表示进行了数据提交,需要等待刷盘
- 判断提交返回结果是否返回false,如果是调用
flushCommitLogService
的wakeup方法唤醒刷盘线程,进行刷盘 - 调用
waitForRunning
等待下一次提交处理
class CommitRealTimeService extends FlushCommitLogService { // 上次提交时间戳 private long lastCommitTimestamp = 0; @Override public void run() { CommitLog.log.info(this.getServiceName() + " service started"); // 如果未停止 while (!this.isStopped()) { // 获取提交间隔 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 一次提交的最少页数 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); // 两次提交的最大间隔时间 int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); // 开始时间 long begin = System.currentTimeMillis(); // 如果当前时间大于上次提交时间+提交的最大间隔时间 if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; // 提交时间 commitDataLeastPages = 0;// 最少提交页数设为0,表示不限制提交页数 } try { // 提交 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); // 提交结束时间 long end = System.currentTimeMillis(); // 如果返回false表示提交了一部分数据但是还未进行刷盘 if (!result) { // 再次更新提交时间戳 this.lastCommitTimestamp = end; // 唤醒flush线程进行刷盘 flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } // 等待下一次提交 this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); } }
提交
提交的方法在MappedFileQueue
的commit
方法中实现,处理逻辑如下:
- 根据记录的CommitLog文件提交位置的偏移量获取映射文件,如果获取不为空,调用MappedFile的commit方法进行提交,然后返回本次提交数据的偏移量
- 记录本次提交的偏移量:文件的偏移量 + 提交数据的偏移量
- 判断本次提交的偏移量是否等于上一次的提交偏移量,如果等于表示本次未提交任何数据,返回结果置为true,否则表示提交了数据,等待刷盘,返回结果为false
- 更新上一次提交偏移量
committedWhere
的值为本次的提交偏移量的值
public class MappedFileQueue { protected long flushedWhere = 0; // flush的位置偏移量 private long committedWhere = 0; // 提交的位置偏移量 public boolean commit(final int commitLeastPages) { boolean result = true; // 根据提交位置的偏移量获取映射文件 MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0); if (mappedFile != null) { // 调用mappedFile的commit方法进行提交,返回提交数据的偏移量 int offset = mappedFile.commit(commitLeastPages); // 记录本次提交的偏移量:文件的偏移量 + 提交数据的偏移量 long where = mappedFile.getFileFromOffset() + offset; // 设置返回结果,如果本次提交偏移量等于上一次的提交偏移量为true,表示什么也没干,否则表示提交了数据,等待刷盘 result = where == this.committedWhere; // 更新上一次提交偏移量的值为本次的 this.committedWhere = where; } return result; } }
MappedFile
MappedFile中记录CommitLog的写入位置wrotePosition、提交位置committedPosition以及flush位置flushedPosition,在commit方法中,调用了isAbleToCommit判断是否可以提交数据,判断的流程如下:
-
获取提交数据的位置偏移量和写入数据的位置偏移量
-
如果最少提交页数大于0,计算本次写入的页数是否大于或等于最少提交页数
本次写入数据的页数计算方法:写入位置/页大小 - flush位置/页大小
-
如果以上条件都满足,判断写入位置是否大于flush位置,如果大于表示有一部数据未flush可以进行提交
满足提交条件后,就会调用commit0
方法提交数据,将数据写入到fileChannel中:
public class MappedFile extends ReferenceResource { // 数据写入位置 protected final AtomicInteger wrotePosition = new AtomicInteger(0); // 数据提交位置 protected final AtomicInteger committedPosition = new AtomicInteger(0); // 数据flush位置 private final AtomicInteger flushedPosition = new AtomicInteger(0); // 提交数据 public int commit(final int commitLeastPages) { // 如果writeBuffer为空 if (writeBuffer == null) { // 不需要提交任何数据到,返回之前记录的写入位置 return this.wrotePosition.get(); } // 如果可以提交数据 if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { // 提交数据 commit0(); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } } // All dirty data has been committed to FileChannel. if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } // 返回提交位置 return this.committedPosition.get(); } // 是否可以提交数据 protected boolean isAbleToCommit(final int commitLeastPages) { // 获取提交数据的位置偏移量 int flush = this.committedPosition.get(); // 获取写入数据的位置偏移量 int write = this.wrotePosition.get(); if (this.isFull()) { return true; } // 如果最少提交页数大于0 if (commitLeastPages > 0) { // 写入位置/页大小 - flush位置/页大小 是否大于至少提交的页数 return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; } // 判断是否需要flush数据 return write > flush; } protected void commit0() { // 获取写入位置 int writePos = this.wrotePosition.get(); // 获取上次提交的位置 int lastCommittedPosition = this.committedPosition.get(); if (writePos - lastCommittedPosition > 0) { try { // 创建共享缓冲区 ByteBuffer byteBuffer = writeBuffer.slice(); // 设置上一次提交位置 byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); // 数据写入fileChannel this.fileChannel.write(byteBuffer); // 更新写入的位置 this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } } }
FlushRealTimeService
如果未开启暂存池,会直接使用FlushRealTimeService
进行刷盘,当然如果开启暂存池,写入一批数据后,同样会使用FlushRealTimeService
进行刷盘,FlushRealTimeService
同样继承了FlushCommitLogService
,是用于执行刷盘的线程,处理逻辑与提交刷盘数据逻辑相似,只不过不是提交数据,而是调用flush方法将提交的数据刷入磁盘:
- 从配置信息中获取flush间隔、每次flush的最少页数和两次flush的最大间隔时间
- 如果当前时间大于上次flush时间+两次flush的最大间隔时间,意味着已经有比较长的一段时间没有进行flush,此时将每次flush的最少页数设置为0不限制flush页数
- 调用
waitForRunning
等待被唤醒 - 如果被唤醒,调用
mappedFileQueue
的flush
方法进行刷盘
class FlushRealTimeService extends FlushCommitLogService { private long lastFlushTimestamp = 0; // 上一次flush的时间 private long printTimes = 0; public void run() { CommitLog.log.info(this.getServiceName() + " service started"); // 如果未停止 while (!this.isStopped()) { // boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); // 获取flush间隔 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); // flush至少包含的页数 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); // 两次flush的时间间隔 int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; long currentTimeMillis = System.currentTimeMillis(); // 如果当前毫秒数 大于上次flush时间 + 两次flush之间的间隔 if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; // 更新flush时间 flushPhysicQueueLeastPages = 0; // flush至少包含的页数置为0 printFlushProgress = (printTimes++ % 10) == 0; } try { // if (flushCommitLogTimed) { // 睡眠 Thread.sleep(interval); } else { // 等待flush被唤醒 this.waitForRunning(interval); } if (printFlushProgress) { // 打印刷盘进程 this.printFlushProgress(); } long begin = System.currentTimeMillis(); // 进行刷盘 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // 如果服务停止,确保数据被刷盘 boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { // 进行刷盘 result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end"); }
刷盘
刷盘的方法在MappedFileQueue
的flush
方法中实现,处理逻辑如下:
- 根据 flush的位置偏移量获取映射文件
- 调用
mappedFile
的flush方法进行刷盘,并返回刷盘后的位置偏移量 - 计算最新的flush偏移量
- 更新flushedWhere的值为最新的flush偏移量
public class MappedFileQueue { protected long flushedWhere = 0; // flush的位置偏移量 private long committedWhere = 0; // 提交的位置偏移量 // flush刷盘 public boolean flush(final int flushLeastPages) { boolean result = true; // 获取flush的位置偏移量映射文件 MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { // 获取时间戳 long tmpTimeStamp = mappedFile.getStoreTimestamp(); // 调用MappedFile的flush方法进行刷盘,返回刷盘后的偏移量 int offset = mappedFile.flush(flushLeastPages); // 计算最新的flush偏移量 long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; // 更新flush偏移量 this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } // 返回flush的偏移量 return result; } }
flush
的逻辑也与commit
方法的逻辑类似:
-
调用
isAbleToFlush
判断是否满足刷盘条件,获取上次flush位置偏移量和当前写入位置偏移量进行如下校验:-
文件是否已写满,即文件大小是否与写入数据位置相等,如果相等说明文件已经写满需要执行刷盘,满足刷盘条件
-
如果最少flush页数大于0,计算本次flush的页数是否大于或等于最少flush页数,如果满足可以进行刷盘
本次flush数据的页数计算方法:写入位置/页大小 - flush位置/页大小
-
如果写入位置偏移量是否大于flush位置偏移量,如果大于表示有数据未进行刷盘,满足刷盘条件
-
-
调用
fileChannel
的force或者mappedByteBuffer的force方法进行刷盘 -
记录本次flush的位置,并作为结果返回
public class MappedFile extends ReferenceResource { protected final AtomicInteger wrotePosition = new AtomicInteger(0); protected final AtomicInteger committedPosition = new AtomicInteger(0); private final AtomicInteger flushedPosition = new AtomicInteger(0); /** * 进行刷盘并返回flush后的偏移量 */ public int flush(final int flushLeastPages) { // 是否可以刷盘 if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { // 如果writeBuffer不为空 if (writeBuffer != null || this.fileChannel.position() != 0) { // 将数据刷到硬盘 this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } // 记录flush位置 this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } // 返回flush位置 return this.getFlushedPosition(); } // 是否可以刷盘 private boolean isAbleToFlush(final int flushLeastPages) { // 获取上次flush位置 int flush = this.flushedPosition.get(); // 写入位置偏移量 int write = getReadPosition(); if (this.isFull()) { return true; } // 如果flush的页数大于0,校验本次flush的页数是否满足条件 if (flushLeastPages > 0) { // 本次flush的页数:写入位置偏移量/OS_PAGE_SIZE - 上次flush位置偏移量/OS_PAGE_SIZE,是否大于flushLeastPages return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; } // 写入位置偏移量是否大于flush位置偏移量 return write > flush; } // 文件是否已写满 public boolean isFull() { // 文件大小是否与写入数据位置相等 return this.fileSize == this.wrotePosition.get(); } /** * 返回当前有效数据的位置 */ public int getReadPosition() { // 如果writeBuffer为空使用写入位置,否则使用提交位置 return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); } }