【mq】从零开始实现 mq-12-消息的批量发送与回执


前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

【mq】从零开始实现 mq-04-启动检测与实现优化

【mq】从零开始实现 mq-05-实现优雅停机

【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07-负载均衡 load balance

【mq】从零开始实现 mq-08-配置优化 fluent

【mq】从零开始实现 mq-09-消费者拉取消息 pull message

【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack

【mq】从零开始实现 mq-11-消费者消息回执添加分组信息 pull message ack groupName

【mq】从零开始实现 mq-12-消息的批量发送与回执

批量消息

对于消息的发送,有时候可能需要一次发送多个,比如日志消息等。

批量操作可以提升性能。

本节老马就和大家一起添加一点批量特性。

【mq】从零开始实现 mq-12-消息的批量发送与回执

消息的批量发送

生产者实现

接口定义

/**  * 同步发送消息-批量  * @param mqMessageList 消息类型  * @return 结果  * @since 0.1.3  */ SendBatchResult sendBatch(final List<MqMessage> mqMessageList);  /**  * 单向发送消息-批量  * @param mqMessageList 消息类型  * @return 结果  * @since 0.1.3  */ SendBatchResult sendOneWayBatch(final List<MqMessage> mqMessageList); 

一次支持发送多个消息。

接口实现

生产者实现如下。

@Override public SendBatchResult sendBatch(List<MqMessage> mqMessageList) {     final List<String> messageIdList = this.fillMessageList(mqMessageList);     final MqMessageBatchReq batchReq = new MqMessageBatchReq();     batchReq.setMqMessageList(mqMessageList);     String traceId = IdHelper.uuid32();     batchReq.setTraceId(traceId);     batchReq.setMethodType(MethodType.P_SEND_MSG_BATCH);     return Retryer.<SendBatchResult>newInstance()             .maxAttempt(maxAttempt)             .callable(new Callable<SendBatchResult>() {                 @Override                 public SendBatchResult call() throws Exception {                     return doSendBatch(messageIdList, batchReq, false);                 }             }).retryCall(); }  @Override public SendBatchResult sendOneWayBatch(List<MqMessage> mqMessageList) {     List<String> messageIdList = this.fillMessageList(mqMessageList);     MqMessageBatchReq batchReq = new MqMessageBatchReq();     batchReq.setMqMessageList(mqMessageList);     String traceId = IdHelper.uuid32();     batchReq.setTraceId(traceId);     batchReq.setMethodType(MethodType.P_SEND_MSG_ONE_WAY_BATCH);     return doSendBatch(messageIdList, batchReq, true); }   private SendBatchResult doSendBatch(List<String> messageIdList,                                MqMessageBatchReq batchReq,                                boolean oneWay) {     log.info("[Producer] 批量发送消息 messageIdList: {}, batchReq: {}, oneWay: {}",             messageIdList, JSON.toJSON(batchReq), oneWay);     // 以第一个 sharding-key 为准。     // 后续的会被忽略     MqMessage mqMessage = batchReq.getMqMessageList().get(0);     Channel channel = getChannel(mqMessage.getShardingKey());     //one-way     if(oneWay) {         log.warn("[Producer] ONE-WAY send, ignore result");         return SendBatchResult.of(messageIdList, SendStatus.SUCCESS);     }     MqCommonResp resp = callServer(channel, batchReq, MqCommonResp.class);     if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {         return SendBatchResult.of(messageIdList, SendStatus.SUCCESS);     }     throw new MqException(ProducerRespCode.MSG_SEND_FAILED); } 

ps: 这里和单个发送有一个区别,那就是对于 channel 的选择。因为只能选择一个,所以不能兼顾每一个消息的 sharding-key。

Broker 的处理

消息分发

// 生产者消息发送-批量 if(MethodType.P_SEND_MSG_BATCH.equals(methodType)) {     return handleProducerSendMsgBatch(channelId, json); }  // 生产者消息发送-ONE WAY-批量 if(MethodType.P_SEND_MSG_ONE_WAY_BATCH.equals(methodType)) {     handleProducerSendMsgBatch(channelId, json);     return null; } 

具体实现

/**  * 处理生产者发送的消息  *  * @param channelId 通道标识  * @param json 消息体  * @since 0.1.3  */ private MqCommonResp handleProducerSendMsgBatch(String channelId, String json) {     MqMessageBatchReq batchReq = JSON.parseObject(json, MqMessageBatchReq.class);     final ServiceEntry serviceEntry = registerProducerService.getServiceEntry(channelId);     List<MqMessagePersistPut> putList = buildPersistPutList(batchReq, serviceEntry);      MqCommonResp commonResp = mqBrokerPersist.putBatch(putList);      // 遍历异步推送     for(MqMessagePersistPut persistPut : putList) {         this.asyncHandleMessage(persistPut);     }     return commonResp; } 

这里对消息列表进行持久化保存。

演示的持久化策略如下:

@Override public MqCommonResp putBatch(List<MqMessagePersistPut> putList) {     // 构建列表     for(MqMessagePersistPut put : putList) {         this.doPut(put);     }      MqCommonResp commonResp = new MqCommonResp();     commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());     commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());     return commonResp; } 

消息的批量ACK

说明

以前的实现方式是每一个消息消费完成之后,进行一次 ACK。

对于 pull 策略的消息消费,我们可以等当前批次结束,统一进行 ACK 回执。

消费实现

实现调整如下:

for(MqTopicTagDto tagDto : subscribeList) {     final String topicName = tagDto.getTopicName();     final String tagRegex = tagDto.getTagRegex();     MqConsumerPullResp resp = consumerBrokerService.pull(topicName, tagRegex, size);      if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {         List<MqMessage> mqMessageList = resp.getList();         if(CollectionUtil.isNotEmpty(mqMessageList)) {             List<MqConsumerUpdateStatusDto> statusDtoList = new ArrayList<>(mqMessageList.size());             for(MqMessage mqMessage : mqMessageList) {                 IMqConsumerListenerContext context = new MqConsumerListenerContext();                 final String messageId = mqMessage.getTraceId();                 ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context);                 log.info("消息:{} 消费结果 {}", messageId, consumerStatus);                  // 状态同步更新                 if(!ackBatchFlag) {                     MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus);                     log.info("消息:{} 状态回执结果 {}", messageId, JSON.toJSON(ackResp));                 } else {                     // 批量                     MqConsumerUpdateStatusDto statusDto = new MqConsumerUpdateStatusDto();                     statusDto.setMessageId(messageId);                     statusDto.setMessageStatus(consumerStatus.getCode());                     statusDto.setConsumerGroupName(groupName);                     statusDtoList.add(statusDto);                 }             }              // 批量执行             if(ackBatchFlag) {                 MqCommonResp ackResp = consumerBrokerService.consumerStatusAckBatch(statusDtoList);                 log.info("消息:{} 状态批量回执结果 {}", statusDtoList, JSON.toJSON(ackResp));                 statusDtoList = null;             }         }     } else {         log.error("拉取消息失败: {}", JSON.toJSON(resp));     } } 

如果 ackBatchFlag = false,则处理逻辑和以前一样。

如果 ackBatchFlag = true,则首先把消息放到 list 中,结束后统一执行。

broker 实现

消息分发

//消费者消费状态 ACK-批量 if(MethodType.C_CONSUMER_STATUS_BATCH.equals(methodType)) {     MqConsumerUpdateStatusBatchReq req = JSON.parseObject(json, MqConsumerUpdateStatusBatchReq.class);     final List<MqConsumerUpdateStatusDto> statusDtoList = req.getStatusList();     return mqBrokerPersist.updateStatusBatch(statusDtoList); } 

实现

默认的持久化实现,更新如下:

@Override public MqCommonResp updateStatusBatch(List<MqConsumerUpdateStatusDto> statusDtoList) {     for(MqConsumerUpdateStatusDto statusDto : statusDtoList) {         this.doUpdateStatus(statusDto.getMessageId(), statusDto.getConsumerGroupName(),                 statusDto.getMessageStatus());     }      MqCommonResp commonResp = new MqCommonResp();     commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());     commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());     return commonResp; } 

遍历每一个元素,进行状态的更新。

小结

异步和批量,是提升性能最常用的 2 种方式。

批量的实现相关来说是最简单,也是效果最显著的。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

拓展阅读

rpc-从零开始实现 rpc https://github.com/houbb/rpc

发表评论

相关文章