消息消费
当RocketMQ进行消息消费的时候,是通过ConsumeMessageConcurrentlyService
的submitConsumeRequest
方法,将消息提交到线程池中进行消费,具体的处理逻辑如下:
- 如果本次消息的个数小于等于批量消费的大小
consumeBatchSize
,构建消费请求ConsumeRequest
,直接提交到线程池中进行消费即可 - 如果本次消息的个数大于批量消费的大小
consumeBatchSize
,说明需要分批进行提交,每次构建consumeBatchSize个消息提交到线程池中进行消费 - 如果出现拒绝提交的异常,调用
submitConsumeRequestLater
方法延迟进行提交
RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求将消费任务提交到线程池处理即可,否则需要分批进行提交。
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { @Override public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); // 如果消息的个数小于等于批量消费的大小 if (msgs.size() <= consumeBatchSize) { // 构建消费请求 ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { // 加入到消费线程池中 this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { // 遍历消息 for (int total = 0; total < msgs.size(); ) { // 创建消息列表,大小为consumeBatchSize,用于批量提交使用 List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { // 加入到消息列表中 msgThis.add(msgs.get(total)); } else { break; } } // 创建ConsumeRequest ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { // 加入到消费线程池中 this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } // 如果出现拒绝提交异常,延迟进行提交 this.submitConsumeRequestLater(consumeRequest); } } } } }
消费任务运行
ConsumeRequest
是ConsumeMessageConcurrentlyService
的内部类,实现了Runnable
接口,在run方法中,对消费任务进行了处理:
-
判断消息所属的处理队列
processQueue
是否处于删除状态,如果已被删除,不进行处理 -
重置消息的重试主题
因为延迟消息的主题在后续处理的时候被设置为SCHEDULE_TOPIC_XXXX,所以这里需要重置。
-
如果设置了消息消费钩子函数,执行
executeHookBefore
钩子函数 -
获取消息监听器,调用消息监听器的consumeMessage进行消息消费,并返回消息的消费结果状态,状态有两种分别为CONSUME_SUCCESS和RECONSUME_LATER
CONSUME_SUCCESS:表示消息消费成功。
RECONSUME_LATER:表示消费失败,稍后延迟重新进行消费。
-
获取消费的时长,判断是否超时
-
如果设置了消息消费钩子函数,执行
executeHookAfter
钩子函数 -
再次判断消息所属的处理队列是否处于删除状态,如果不处于删除状态,调用
processConsumeResult
方法处理消费结果
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { class ConsumeRequest implements Runnable { private final List<MessageExt> msgs; private final ProcessQueue processQueue; // 处理队列 private final MessageQueue messageQueue; // 消息队列 @Override public void run() { // 如果处理队列已被删除 if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } // 获取消息监听器 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; // 重置消息重试主题名称 defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); ConsumeMessageContext consumeMessageContext = null; // 如果设置了钩子函数 if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { // ... // 执行钩子函数 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { // 设置消费开始时间戳 MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } // 通过消息监听器的consumeMessage进行消息消费,并返回消费结果状态 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue), e); hasException = true; } // 计算消费时长 long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { // 出现异常 returnType = ConsumeReturnType.EXCEPTION; } else { // 返回NULL returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { // 判断超时 returnType = ConsumeReturnType.TIME_OUT; // 返回类型置为超时 } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { // 如果延迟消费 returnType = ConsumeReturnType.FAILED; // 返回类置为失败 } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { // 如果成功状态 returnType = ConsumeReturnType.SUCCESS; // 返回类型为成功 } // ... // 如果消费状态为空 if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); // 状态置为延迟消费 status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } // 如果设置了钩子函数 if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); // 执行executeHookAfter方法 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); if (!processQueue.isDropped()) { // 处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } } } } // 重置消息重试主题 public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) { // 获取消费组的重试主题:%RETRY% + 消费组名称 final String groupTopic = MixAll.getRetryTopic(consumerGroup); for (MessageExt msg : msgs) { // 获取消息的重试主题名称 String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); // 如果重试主题不为空并且与消费组的重试主题一致 if (retryTopic != null && groupTopic.equals(msg.getTopic())) { // 设置重试主题 msg.setTopic(retryTopic); } if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) { msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); } } } } // 消费结果状态 public enum ConsumeConcurrentlyStatus { /** * 消费成功 */ CONSUME_SUCCESS, /** * 消费失败,延迟进行消费 */ RECONSUME_LATER; }
处理消费结果
一、设置ackIndex
ackIndex的值用来判断失败消息的个数,在processConsumeResult
方法中根据消费结果状态进行判断,对ackIndex的值进行设置,前面可知消费结果状态有以下两种:
- CONSUME_SUCCESS:消息消费成功,此时ackIndex设置为消息大小 - 1,表示消息都消费成功。
- RECONSUME_LATER:消息消费失败,返回延迟消费状态,此时ackIndex置为-1,表示消息都消费失败。
二、处理消费失败的消息
广播模式
广播模式下,如果消息消费失败,只将失败的消息打印出来不做其他处理。
集群模式
开启for循环,初始值为i = ackIndex + 1
,结束条件为i < consumeRequest.getMsgs().size()
,上面可知ackIndex
有两种情况:
- 消费成功:ackIndex值为消息大小-1,此时ackIndex + 1的值等于消息的个数大小,不满足for循环的执行条件,相当于消息都消费成功,不需要进行失败的消息处理。
- 延迟消费:ackIndex值为-1,此时ackIndex+1为0,满足for循环的执行条件,从第一条消息开始遍历到最后一条消息,调用
sendMessageBack
方法向Broker发送CONSUMER_SEND_MSG_BACK
消息,如果发送成功Broker会根据延迟等级,放入不同的延迟队列中,到达延迟时间后,消费者将会重新进行拉取,如果发送失败,加入到失败消息列表中,稍后重新提交消费任务进行处理。
三、移除消息,更新拉取偏移量
以上步骤处理完毕后,首先调用removeMessage
从处理队列中移除消息并返回拉取消息的偏移量,然后调用updateOffset
更新拉取偏移量。
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { // 获取ackIndex int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: // 如果消费成功 // 如果ackIndex大于等于消息的大小 if (ackIndex >= consumeRequest.getMsgs().size()) { // 设置为消息大小-1 ackIndex = consumeRequest.getMsgs().size() - 1; } // 计算消费成功的的个数 int ok = ackIndex + 1; // 计算消费失败的个数 int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: // 如果延迟消费 // ackIndex置为-1 ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } // 判断消费模式 switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: // 广播模式 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: // 集群模式 List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); // 遍历消费失败的消息 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { // 获取消息 MessageExt msg = consumeRequest.getMsgs().get(i); // 向Broker发送延迟消息 boolean result = this.sendMessageBack(msg, context); // 如果发送失败 if (!result) { // 消费次数+1 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); // 加入失败消息列表中 msgBackFailed.add(msg); } } // 如果不为空 if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); // 稍后重新进行消费 this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } // 从处理队列中移除消息 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { // 更新拉取偏移量 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } } }
发送CONSUMER_SEND_MSG_BACK消息
延迟级别
RocketMQ的延迟级别对应的延迟时间常量定义在MessageStoreConfig
的messageDelayLevel
变量中:
public class MessageStoreConfig { private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; }
延迟级别与延迟时间对应关系:
延迟级别0 ---> 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费
延迟级别1 ---> 延迟时间5s
延迟级别2 ---> 延迟时间10s
...
以此类推,最大的延迟时间为2h
在sendMessageBack
方法中,首先从上下文中获取了延迟级别(ConsumeConcurrentlyContext
中可以看到,延迟级别默认为0),并对主题加上Namespace,然后调用defaultMQPushConsumerImpl
的sendMessageBack
发送消息:
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) { // 获取延迟级别 int delayLevel = context.getDelayLevelWhenNextConsume(); // 对主题添加上Namespace msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic())); try { // 向Broker发送消息 this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName()); return true; } catch (Exception e) { log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); } return false; } } // 并发消费上下文 public class ConsumeConcurrentlyContext { /** * -1,不进行重试,加入DLQ队列 * 0, Broker控制重试频率 * >0, 客户端控制 */ private int delayLevelWhenNextConsume = 0; // 默认为0 }
DefaultMQPushConsumerImp
的sendMessageBack
方法中又调用了MQClientAPIImpl
的consumerSendMessageBack
方法进行发送:
public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { // 获取Broker地址 String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); // 调用consumerSendMessageBack方法发送消息 this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { // ... } finally { msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); } } }
在MQClientAPIImpl
的consumerSendMessageBack
方法中,可以看到设置的请求类型是CONSUMER_SEND_MSG_BACK,然后设置了消息的相关信息,向Broker发送请求:
public class MQClientAPIImpl { public void consumerSendMessageBack( final String addr, final MessageExt msg, final String consumerGroup, final int delayLevel, final long timeoutMillis, final int maxConsumeRetryTimes ) throws RemotingException, MQBrokerException, InterruptedException { // 创建请求头 ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); // 设置请求类型为CONSUMER_SEND_MSG_BACK RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); // 设置消费组 requestHeader.setGroup(consumerGroup); requestHeader.setOriginTopic(msg.getTopic()); // 设置消息物理偏移量 requestHeader.setOffset(msg.getCommitLogOffset()); // 设置延迟级别 requestHeader.setDelayLevel(delayLevel); // 设置消息ID requestHeader.setOriginMsgId(msg.getMsgId()); // 设置最大消费次数 requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); // 向Broker发送请求 RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } }
Broker对请求的处理
Broker对CONSUMER_SEND_MSG_BACK
类型的请求在SendMessageProcessor
中,处理逻辑如下:
- 根据消费组获取订阅信息配置,如果获取为空,记录错误信息,直接返回
- 获取消费组的重试主题,然后从重试队列中随机选取一个队列,并创建
TopicConfig
主题配置信息 - 根据消息的物理偏移量从commitlog中获取消息
- 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0:
- 如果条件满足,表示需要把消息放入到死信队列DLQ中,此时设置DLQ队列ID
- 如果不满足,判断延迟级别是否为0,如果为0,使用3 + 消息的消费次数作为新的延迟级别
- 新建消息MessageExtBrokerInner,设置消息的相关信息,此时相当于生成了一个全新的消息(会设置之前消息的ID),会重新添加到CommitLog中,消息主题的设置有两种情况:
- 达到了加入DLQ队列的条件,此时主题为DLQ主题(%DLQ% + 消费组名称),消息之后会添加到选取的DLQ队列中
- 未达到DLQ队列的条件,此时主题为重试主题(%RETRY% + 消费组名称),之后重新进行消费
- 调用
asyncPutMessage
添加消息,详细过程可参考之前的文章【消息的存储】
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { // 处理请求 public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: // 处理请求 return this.asyncConsumerSendMsgBack(ctx, request); default: // ... } } private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); // ... // 根据消费组获取订阅信息配置 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); // 如果为空,直接返回 if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return CompletableFuture.completedFuture(response); } // ... // 获取消费组的重试主题 String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); // 从重试队列中随机选取一个队列 int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums(); int topicSysFlag = 0; if (requestHeader.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } // 创建TopicConfig主题配置信息 TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); //... // 根据消息物理偏移量从commitLog文件中获取消息 MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); if (null == msgExt) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("look message by offset failed, " + requestHeader.getOffset()); return CompletableFuture.completedFuture(response); } // 获取消息的重试主题 final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (null == retryTopic) { MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic()); } msgExt.setWaitStoreMsgOK(false); // 延迟等级获取 int delayLevel = requestHeader.getDelayLevel(); // 获取最大消费重试次数 int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { Integer times = requestHeader.getMaxReconsumeTimes(); if (times != null) { maxReconsumeTimes = times; } } // 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0 if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { // 获取DLQ主题 newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); // 选取一个队列 queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP; // 创建DLQ的topicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE | PermName.PERM_READ, 0); // ... } else { // 如果延迟级别为0 if (0 == delayLevel) { // 更新延迟级别 delayLevel = 3 + msgExt.getReconsumeTimes(); } // 设置延迟级别 msgExt.setDelayTimeLevel(delayLevel); } // 新建消息 MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(newTopic); // 设置主题 msgInner.setBody(msgExt.getBody()); // 设置消息 msgInner.setFlag(msgExt.getFlag()); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); // 设置消息属性 msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags())); msgInner.setQueueId(queueIdInt); // 设置队列ID msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(msgExt.getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);// 设置消费次数 // 原始的消息ID String originMsgId = MessageAccessor.getOriginMessageId(msgExt); // 设置消息ID MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); // 添加重试消息 CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); return putMessageResult.thenApply((r) -> { if (r != null) { switch (r.getPutMessageStatus()) { case PUT_OK: // ... return response; default: break; } response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(r.getPutMessageStatus().name()); return response; } response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("putMessageResult is null"); return response; }); } }
延迟消息处理
由【消息的存储】文章可知,消息添加会进入到asyncPutMessage
方法中,首先获取了事务类型,如果未使用事务或者是提交事务的情况下,对延迟时间级别进行判断,如果延迟时间级别大于0,说明消息需要延迟消费,此时做如下处理:
-
判断消息的延迟级别是否超过了最大延迟级别,如果超过了就使用最大延迟级别
-
获取
RMQ_SYS_SCHEDULE_TOPIC
,它是在TopicValidator
中定义的常量,值为SCHEDULE_TOPIC_XXXX
:public class TopicValidator { // ... public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"; }
-
根据延迟级别选取对应的队列,一般会把相同延迟级别的消息放在同一个队列中
-
备份之前的TOPIC和队列ID
-
更改消息队列的主题为
RMQ_SYS_SCHEDULE_TOPIC
,所以延迟消息的主题最终被设置为RMQ_SYS_SCHEDULE_TOPIC
,放在对应的延迟队列中进行处理
public class CommitLog { public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { // ... // 获取事务类型 final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); // 如果未使用事务或者提交事务 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // 判断延迟级别 if (msg.getDelayTimeLevel() > 0) { // 如果超过了最大延迟级别 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } // 获取RMQ_SYS_SCHEDULE_TOPIC topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // 根据延迟级别选取对应的队列 int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // 备份之前的TOPIC和队列ID MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); // 设置SCHEDULE_TOPIC msg.setTopic(topic); msg.setQueueId(queueId); } } // ... } }
拉取进度持久化
RocketMQ消费模式分为广播模式和集群模式,广播模式下消费进度保存在每个消费者端,集群模式下消费进度保存在Broker端。
广播模式
更新进度
LocalFileOffsetStore
中使用了一个ConcurrentMap
类型的变量offsetTable存储消息队列对应的拉取偏移量,KEY为消息队列,value为该消息队列对应的拉取偏移量。
在更新拉取进度的时候,从offsetTable
中获取当前消息队列的拉取偏移量,如果为空,则新建并保存到offsetTable
中,否则获取之前已经保存的偏移量,对值进行更新,需要注意这里只是更新了offsetTable
中的数据,并没有持久化到磁盘,持久化的操作在persistAll方法中:
public class LocalFileOffsetStore implements OffsetStore { // offsetTable:KEY为消息队列,value为该消息队列的拉取偏移量 private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>(); @Override public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { // 获取之前的拉取进度 AtomicLong offsetOld = this.offsetTable.get(mq); if (null == offsetOld) { // 如果之前不存在,进行创建 offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); } // 如果不为空 if (null != offsetOld) { if (increaseOnly) { MixAll.compareAndIncreaseOnly(offsetOld, offset); } else { // 更新拉取偏移量 offsetOld.set(offset); } } } } }
加载进度
由于广播模式下消费进度保存在消费者端,所以需要从本地磁盘加载之前保存的消费进度文件。
LOCAL_OFFSET_STORE_DIR:消费进度文件所在的根路径
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty( "rocketmq.client.localOffsetStoreDir", System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
在LocalFileOffsetStore的构造函数中可以看到,对拉取偏移量的保存文件路径进行了设置,为LOCAL_OFFSET_STORE_DIR
+ 客户端ID + 消费组名称 + offsets.json,从名字上看,消费进度的数据格式是以JSON的形式进行保存的:
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator + this.groupName + File.separator + "offsets.json";
在load方法中,首先从本地读取 offsets.json文件,并序列化为OffsetSerializeWrapper
对象,然后将保存的消费进度加入到offsetTable
中:
public class LocalFileOffsetStore implements OffsetStore { // 文件路径 public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty( "rocketmq.client.localOffsetStoreDir", System.getProperty("user.home") + File.separator + ".rocketmq_offsets"); private final String storePath; // ... public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) { this.mQClientFactory = mQClientFactory; this.groupName = groupName; // 设置拉取进度文件的路径 this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator + this.groupName + File.separator + "offsets.json"; } @Override public void load() throws MQClientException { // 从本地读取拉取偏移量 OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset(); if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { // 加入到offsetTable中 offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); for (Entry<MessageQueue, AtomicLong> mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) { AtomicLong offset = mqEntry.getValue(); log.info("load consumer's offset, {} {} {}", this.groupName, mqEntry.getKey(), offset.get()); } } } // 从本地加载文件 private OffsetSerializeWrapper readLocalOffset() throws MQClientException { String content = null; try { // 读取文件 content = MixAll.file2String(this.storePath); } catch (IOException e) { log.warn("Load local offset store file exception", e); } if (null == content || content.length() == 0) { return this.readLocalOffsetBak(); } else { OffsetSerializeWrapper offsetSerializeWrapper = null; try { // 序列化 offsetSerializeWrapper = OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); } catch (Exception e) { log.warn("readLocalOffset Exception, and try to correct", e); return this.readLocalOffsetBak(); } return offsetSerializeWrapper; } } }
OffsetSerializeWrapper
OffsetSerializeWrapper中同样使用了ConcurrentMap,从磁盘的offsets.json文件中读取数据后,将JSON转为OffsetSerializeWrapper对象,就可以通过OffsetSerializeWrapper
的offsetTable
获取到之前保存的每个消息队列的消费进度,然后加入到LocalFileOffsetStore
的offsetTable
中:
public class OffsetSerializeWrapper extends RemotingSerializable { private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>(); public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() { return offsetTable; } public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) { this.offsetTable = offsetTable; } }
持久化进度
updateOffset
更新只是将内存中的数据进行了更改,并未保存到磁盘中,持久化的操作是在persistAll方法中实现的:
- 创建
OffsetSerializeWrapper
对象 - 遍历
LocalFileOffsetStore
的offsetTable,将数据加入到OffsetSerializeWrapper
的OffsetTable中 - 将
OffsetSerializeWrapper
转为JSON - 调用
string2File
方法将JSON数据保存到磁盘文件
public class LocalFileOffsetStore implements OffsetStore { @Override public void persistAll(Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) return;OffsetSerializeWrapper // 创建 OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); // 遍历offsetTable for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { if (mqs.contains(entry.getKey())) { // 获取拉取偏移量 AtomicLong offset = entry.getValue(); // 加入到OffsetSerializeWrapper的OffsetTable中 offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset); } } // 将对象转为JSON String jsonString = offsetSerializeWrapper.toJson(true); if (jsonString != null) { try { // 将JSON数据保存到磁盘文件 MixAll.string2File(jsonString, this.storePath); } catch (IOException e) { log.error("persistAll consumer offset Exception, " + this.storePath, e); } } } }
集群模式
集群模式下消费进度保存在Broker端。
更新进度
集群模式下的更新进度与广播模式下的更新类型,都是只更新了offsetTable
中的数据:
public class RemoteBrokerOffsetStore implements OffsetStore { private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>(); @Override public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { // 获取消息队列的进度 AtomicLong offsetOld = this.offsetTable.get(mq); if (null == offsetOld) { // 将消费进度保存在offsetTable中 offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); } if (null != offsetOld) { if (increaseOnly) { MixAll.compareAndIncreaseOnly(offsetOld, offset); } else { // 更新拉取偏移量 offsetOld.set(offset); } } } } }
加载
集群模式下加载消费进度需要从Broker获取,在消费者发送消息拉取请求的时候,Broker会计算消费偏移量,所以RemoteBrokerOffsetStore
的load方法为空,什么也没有干:
public class RemoteBrokerOffsetStore implements OffsetStore { @Override public void load() { } }
持久化
由于集群模式下消费进度保存在Broker端,所以persistAll
方法中调用了updateConsumeOffsetToBroker
向Broker发送请求进行消费进度保存:
public class RemoteBrokerOffsetStore implements OffsetStore { @Override public void persistAll(Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) return; final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>(); for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { MessageQueue mq = entry.getKey(); AtomicLong offset = entry.getValue(); if (offset != null) { if (mqs.contains(mq)) { try { // 向Broker发送请求更新拉取偏移量 this.updateConsumeOffsetToBroker(mq, offset.get()); log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", this.groupName, this.mQClientFactory.getClientId(), mq, offset.get()); } catch (Exception e) { log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } } else { unusedMQ.add(mq); } } } // ... } }
持久化的触发
MQClientInstance
在启动定时任务的方法startScheduledTask
中注册了定时任务,定时调用persistAllConsumerOffset
对拉取进度进行持久化,persistAllConsumerOffset
中又调用了MQConsumerInner
的persistConsumerOffset
方法:
public class MQClientInstance { private void startScheduledTask() { // ... // 注册定时任务,定时持久化拉取进度 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 持久化 MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); // ... } private void persistAllConsumerOffset() { Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); // 调用persistConsumerOffset进行持久化 impl.persistConsumerOffset(); } } }
DefaultMQPushConsumerImpl
是MQConsumerInner
的一个子类,以它为例可以看到在persistConsumerOffset
方法中调用了offsetStore的persistAll
方法进行持久化:
public class DefaultMQPushConsumerImpl implements MQConsumerInner { @Override public void persistConsumerOffset() { try { this.makeSureStateOK(); Set<MessageQueue> mqs = new HashSet<MessageQueue>(); Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); mqs.addAll(allocateMq); // 拉取进度持久化 this.offsetStore.persistAll(mqs); } catch (Exception e) { log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); } } }
总结
参考
丁威、周继锋《RocketMQ技术内幕》
RocketMQ版本:4.9.3