RocketMQ设定了延迟级别可以让消息延迟消费,延迟消息会使用SCHEDULE_TOPIC_XXXX这个主题,每个延迟等级对应一个消息队列,并且与普通消息一样,会保存每个消息队列的消费进度(delayOffset.json中的offsetTable):
public class MessageStoreConfig { private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; }
延迟级别与延迟时间对应关系:
延迟级别0 ---> 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费
延迟级别1 ---> 延迟时间5s
延迟级别2 ---> 延迟时间10s
...
以此类推,最大的延迟时间为2h。
延迟消息
使用延迟消息时,只需设定延迟级别即可,Broker在存储的时候会判断是否设定了延迟级别,如果设置了延迟级别就按延迟消息来处理,由【消息的存储】文章可知,消息存储之前会进入到asyncPutMessage方法中,延迟消息的处理就是在这里做的,处理逻辑如下:
-
判断消息的延迟级别是否超过了最大延迟级别,如果超过了就使用最大延迟级别;
-
获取
RMQ_SYS_SCHEDULE_TOPIC,它是在TopicValidator中定义的常量,值为SCHEDULE_TOPIC_XXXX:public class TopicValidator { // ... public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"; } -
根据延迟级别选取对应的队列,一般会把相同延迟级别的消息放在同一个队列中;
-
将消息原本的TOPIC和队列ID设置到消息属性中;
-
更改消息队列的主题为
RMQ_SYS_SCHEDULE_TOPIC,所以延迟消息的主题最终被设置为RMQ_SYS_SCHEDULE_TOPIC,会将消息投递到延迟队列中;
public class CommitLog { public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { // ... // 获取事务类型 final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); // 如果未使用事务或者提交事务 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // 判断延迟级别 if (msg.getDelayTimeLevel() > 0) { // 如果超过了最大延迟级别 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } // 获取RMQ_SYS_SCHEDULE_TOPIC topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // 根据延迟级别选取对应的队列 int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // 将消息原本的TOPIC和队列ID设置到消息属性中 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); // 设置SCHEDULE_TOPIC msg.setTopic(topic); msg.setQueueId(queueId); } } // ... } }
延迟消息被投递到延迟队列中之后,会由定时任务去处理队列中的消息,接下来就去看下定时任务的处理过程。
注册定时任务
Broker启动的时候会调用ScheduleMessageService的start方法,start方法中为不同的延迟级别创建了对应的定时任务来处理延迟消息,然后从offsetTable中获取当前延迟等级对应那个消息队列的消费进度,如果未获取到,则使用0,从队列的第一条消息开始处理,然后创建定时任务DeliverDelayedMessageTimerTask,可以看到首次是延迟1000ms执行:
public class ScheduleMessageService extends ConfigManager { // 首次执行延迟的时间 private static final long FIRST_DELAY_TIME = 1000L; public void start() { if (started.compareAndSet(false, true)) { super.load(); this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_")); if (this.enableAsyncDeliver) { this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_")); } // 遍历所有的延迟级别 for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { // 如果获取的消费进度为空 offset = 0L; // 默认为0,从第一条消息开始处理 } if (timeDelay != null) { if (this.enableAsyncDeliver) { this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); } // 为每个延迟级别创建对应的定时任务 this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); } } // ... } } }
运行定时任务
DeliverDelayedMessageTimerTask是ScheduleMessageService的内部类,它实现了Runnable接口,在run方法中调用了executeOnTimeup方法来处理延迟消息:
public class ScheduleMessageService extends ConfigManager { class DeliverDelayedMessageTimerTask implements Runnable { @Override public void run() { try { if (isStarted()) { // 执行任务 this.executeOnTimeup(); } } catch (Exception e) { // XXX: warn and notify me log.error("ScheduleMessageService, executeOnTimeup exception", e); this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD); } } } }
executeOnTimeup方法的处理逻辑如下:
- 根据主题名称以及延迟等级获取
ConsumeQueue,如果获取为空,会重新创建一个任务提交到线程池中,延迟时间为DELAY_FOR_A_WHILE,延迟一段时间后重新执行; - 根据当前延迟消息队列的消费进度,从ConsumeQueue获取数据,如果获取为空,处理同上,重新创建一个任务延迟一段时间之后重新执行;
- 因为队列中的消息是按写入顺序进行存储的,所以根据偏移量获取到的第一条消息开始,向后处理:
(1)获取消息存储时间戳
(2)根据延迟等级和消息的存储时间戳计算消息的到期时间
(3)获取当前时间,使用当前时间减去消息的到期时间- 如果值大于0,表示还未到达指定的延迟时间,需要继续等待,重新创建一个任务延迟一段时间之后重新执行;
- 如果值小于等于0,表示已经到达了指定的延迟时间,会调用messageTimeup对消息处理,恢复消息原本的Topic;
- 根据是否开启了异步来决定同步投递消息还是异步投递消息,这一步会将消息投递到原本Topic中的消息队列,之后与普通消息的存储流程一致;
public class ScheduleMessageService extends ConfigManager { class DeliverDelayedMessageTimerTask implements Runnable { public void executeOnTimeup() { // 根据主题名称以及延迟等级获取ConsumeQueue ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); // 如果ConsumeQueue为空,新建定时任务等待下次执行 if (cq == null) { this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE); return; } // 根据偏移量从ConsumeQueue获取数据 SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ == null) { // ... // 如果获取为空,新建定时任务等待下次执行 this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE); return; } long nextOffset = this.offset; try { int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); // 开始处理延迟消息 for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 获取消息在CommitLog中的偏移量 long offsetPy = bufferCQ.getByteBuffer().getLong(); // 消息大小 int sizePy = bufferCQ.getByteBuffer().getInt(); // tag哈希值 long tagsCode = bufferCQ.getByteBuffer().getLong(); if (cq.isExtAddr(tagsCode)) { if (cq.getExt(tagsCode, cqExtUnit)) { tagsCode = cqExtUnit.getTagsCode(); } else { //can't find ext content.So re compute tags code. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", tagsCode, offsetPy, sizePy); // 获取消息存储时间戳 long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); // 根据延迟等级和消息的存储时间计算消息的到期时间 tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); } } // 获取当前时间 long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); // 计算消息的到期时间 long countdown = deliverTimestamp - now; // 如果大于0,表示还未到达指定的延迟时间,需要继续等待 if (countdown > 0) { // 新建定时任务等待下次执行 this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } // 走到这里,表示已经到了消息的延迟时间,从CommitLog取出消息 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt == null) { continue; } // 处理消息,这里会恢复消息原本的Topic MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt); if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", msgInner.getTopic(), msgInner); continue; } boolean deliverSuc; // 投递消息到原本的主题中 if (ScheduleMessageService.this.enableAsyncDeliver) { // 异步投递 deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy); } else { // 同步投递 deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy); } if (!deliverSuc) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } } // 计算下一条消息的偏移量 nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); } catch (Exception e) { log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e); } finally { bufferCQ.release(); } this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); } } private MessageExtBrokerInner messageTimeup(MessageExt msgExt) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setBody(msgExt.getBody()); // 设置消息体 msgInner.setFlag(msgExt.getFlag()); // 设置falg MessageAccessor.setProperties(msgInner, msgExt.getProperties()); // ... msgInner.setWaitStoreMsgOK(false); MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL); // 恢复原本的Topic msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID); int queueId = Integer.parseInt(queueIdStr); msgInner.setQueueId(queueId); return msgInner; } }