MQ系列15:MQ实现批量消息处理

MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的发送模式
MQ系列6:消息的消费
MQ系列7:消息通信,追求极致性能
MQ系列8:数据存储,消息队列的高可用保障
MQ系列9:高可用架构分析
MQ系列10:如何保证消息幂等性消费
MQ系列11:如何保证消息可靠性传输
MQ系列12:如何保证消息顺序性
MQ系列13:消息大量堆积如何为解决
MQ系列14:MQ如何做到消息延时处理

1 背景

在互联网业务的实际应用场景中,消息的批量处理是非常必要的,因为我们时刻面临着大量数据的并发执行。
例如,我们在一个业务交互的时候会有大量的分支行为需要异步去处理,但是这些动作又是在不同的业务粒度上的,所以我们需要多次调用MQ写入消息,可能有多次的连接和消息发送。
这个写MySQL数据库是一样的,多次建连和写入,跟一次建连和批量数据库,性能是完全不能比的。
所以我们需要有MQ有批量消息的能力来对我们的业务数据进行快速处理。

2 批量消息实现过程

Rocket MQ的批量消息,可以提高消息的吞吐能力和处理效率,降低下游系统的API调用频率,同时对消息服务的稳定性也有帮助。
MQ系列15:MQ实现批量消息处理

2.1 批量消息的特点

  • 批量消息具有相同的topic。
  • 批量消息具有相同的waitStoreMsgOK属性。
  • 批量消息不支持延迟消息。
  • 批量消息的大小不超过4M(4.4版本之后要求不超过1M)。

2.2 批量消息的使用场景

  • 消息的吞吐能力和处理效率:通过将多条消息打包成一批进行发送,可以减少网络传输开销和消息处理的时间,从而提高整体的消息处理效率。
  • 下游系统的API调用频率:通过将多条消息合并成一条批量消息进行发送,可以减少下游系统接收和处理消息的次数,从而降低API调用频率,减轻下游系统的负载压力。

2.3 批量消息的发送示例

Rocket MQ提供了批量发送消息的功能,可以通过调用DefaultMQProducer的send()方法,将多条消息以列表的形式发送给指定的topic。
以下是一个简单的示例代码:

DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName_1"); String topic = "BatchSendTest_1"; producer.start();   List<Message> msgs = new ArrayList<>();   msgs.add(new Message(topic, "Tag1", "OrderID-063105013", "Hello world".getBytes()));   msgs.add(new Message(topic, "Tag1", "OrderID-063105014", "Brand".getBytes()));   msgs.add(new Message(topic, "Tag1", "OrderID-063105015", "handsome boy ".getBytes()));   try {    producer.send(msgs); } catch (Exception e) {    e.printStackTrace();    // 处理异常 } finally {    // 如果不再发送消息,关闭生产者Producer   producer.shutdown(); } 

在以上示例代码中,创建了一个DefaultMQProducer实例,并调用其start()方法启动生产者。
然后构造了一个包含三条消息的列表,通过调用producer的send()方法将列表中的消息发送给指定的topic。
如果消息的总长度可能大于1MB时,这时候最好把消息进行分割,参考下面的代码:

public class ListSplitter implements Iterator<List<Message>> {     private final int SIZE_LIMIT = 1024 * 1024;     private final List<Message> messages;     private int currIndex;     public ListSplitter(List<Message> messages) {             this.messages = messages;     }     @Override public boolean hasNext() {         return currIndex < messages.size();     }     @Override public List<Message> next() {         int nextIndex = currIndex;         int totalSize = 0;         for (; nextIndex < messages.size(); nextIndex++) {             Message message = messages.get(nextIndex);             int tmpSize = message.getTopic().length() + message.getBody().length;             Map<String, String> properties = message.getProperties();             for (Map.Entry<String, String> entry : properties.entrySet()) {                 tmpSize += entry.getKey().length() + entry.getValue().length();             }             tmpSize = tmpSize + 20; //for log overhead             if (tmpSize > SIZE_LIMIT) {                 //it is unexpected that single message exceeds the SIZE_LIMIT                 //here just let it go, otherwise it will block the splitting process                 if (nextIndex - currIndex == 0) {                    //if the next sublist has no element, add this one and then break, otherwise just break                    nextIndex++;                   }                 break;             }             if (tmpSize + totalSize > SIZE_LIMIT) {                 break;             } else {                 totalSize += tmpSize;             }              }         List<Message> subList = messages.subList(currIndex, nextIndex);         currIndex = nextIndex;         return subList;     } } //then you could split the large list into small ones: ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) {    try {        List<Message>  listItem = splitter.next();        producer.send(listItem);    } catch (Exception e) {        e.printStackTrace();        // handle the error    } }  

可以看出来,Rocket MQ的批量消息可以提高消息的吞吐能力和处理效率,降低下游系统的API调用频率,是一种优化消息传输和处理的有效手段。

3 总结

  • 对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。
  • 批量发送消息须有相同的topic,相同的waitStoreMsgOK,且不能是延时消息。
发表评论

评论已关闭。

相关文章