RocketMQ+Spring Boot的简单实现及其深入分析

Producer搭建

  1. 导入RocketMQ依赖和配置RocketMQ地址及producer的group:name
        <dependency>             <groupId>org.apache.rocketmq</groupId>             <artifactId>rocketmq-spring-boot-starter</artifactId>             <version>2.3.4</version>         </dependency> 

RocketMQ+Spring Boot的简单实现及其深入分析

  1. 创建消费接口

RocketMQ+Spring Boot的简单实现及其深入分析

1. 调用接口进行测试 

RocketMQ+Spring Boot的简单实现及其深入分析

    ## 发送消息模式的类型扩展          > `RocketTemplate`中有许多发送方法,其可应对大多数的场景     >  

RocketMQ+Spring Boot的简单实现及其深入分析

    ### syncSend()          > 同步发送,仅当发送过程完成时返回此方法.需严格保证顺序性,其会阻塞调用线程至到Broker获取响应     >      - 参数列表         - `destination`目标主题,格式为`topicName:tags`tags可选         - `payload`消息体,可以是任意对象,自动序列化         - `message`Spring Message对象,可自定义headers         - `timeout`发送超时时间毫秒,默认3000ms     - 返回对象         - SendResult:包含消息ID,发送状态,队列偏移值等等     - 用于大部分对发送结果严格的场景:如电商,金融等等          ### asyncSend()          > 异步发送,没有返回对象.异步传输一般用于响应时间敏感的业务场景.在发送完成会立即调用其参数列表中的sendCallBack方法     >      - 参数列表         - `String destination`         - `Message<?> message`         - `SendCallback sendCallback`:发送结果调用方法             - `onSuccess(SendResult result)`:发送成功回调             - `onException(Throwable e)`:发送失败回调     - 适用于:高吞吐,但对结果要求不高的场景如日志采集等等 

RocketMQ+Spring Boot的简单实现及其深入分析

    ### syncSendOrderly()          > 顺序发送     >      - 参数列表         - `SendResult syncSendOrderly(String destination, Message<?> message, String hashKey);`         - `SendResult syncSendOrderly(String destination, Object payload, String hashKey);`         - `hasyKey`:分片见,相同的hashKey会被路由到同一个队列             - 基本原理:`int queueId = Math.abs(hashKey.hashCode()) % queueCount;`          # SendMesssageInTransaction()          > 发送MQ分布式事务消息,其采用2PC(两端式协议)+补偿机制(事务回查)的分布式事务功能     >      - 半事务消息:暂不能投递的消息,消息生产者已经成功将消息发送到RocketMQ服务器中,但暂时为收到生产者对消息的二次确认.此时的消息会被标记为”暂不能投递”的状态.处于这种”暂不能投递”状态的消息被称为半事务消息     - 消息回查:由于一些网络问题,生产者自身的问题等等,导致某条事务消息二次丢失,RocketMQ通过扫描某条消息长期处于”半事务消息”时,其会向生产者组询查该消息的最终状态(commit或Rollback),这就是消息回查     - 在RocketMQ中发送食物消息需要三个核心组件         1. 事务消息发送:使用sendMessageInTransaction()方法 

RocketMQ+Spring Boot的简单实现及其深入分析

        2. 事务监听器:实现RocketMQLocalTransactionListener接口         3. 事务监听注册:通过@RocketMQTransactionListener注解注册 

RocketMQ+Spring Boot的简单实现及其深入分析

            - 返回对象:`TransactionSendResult`:含事务状态`LocalTransactionState`         - 采用这一套事务消息发送逻辑,本地的Service只需关心发送消息的逻辑,其余的事务逻辑交由给事务监听器处理 

RocketMQ+Spring Boot的简单实现及其深入分析

    ### 事务基本执行流程          - **第一阶段:发送半事务**         1. 生产者发送半事务消息:生产者将业务数据封装成数据,并将其发送给RocketMQ,此时消息被标记为”半事务消息”         2. RocketMQ确认接收消息:RocketMQ接收到消息并将其持久化到存储系统中,此时会向生产者发送一个确认消息(Ack)表示该消息已经被接收         3. 生产者执行本地事务逻辑:生产者接收到服务端的确认后,则开始本地业务逻辑执行.如更新数据库,修改订单等等     - **第二阶段:提交或回滚事务**         1. 生产者提交二次确认结果:根据本地事务执行结果,生产者向RocketMQ提交二次确认结果             1. 若本地事务执行成功:生产者提交`Commit`操作,服务器端将半事务标记为:”可投递状态”,并将其投递给消费者             2. 如果本地事务执行失败:生产者提交`Rollback`操作,RocketMQ则会回滚,不会将消息投递给消费者         2. 但0由于网络问题生产者自身应用问题导致重启,RocketMQ迟迟未收到生产者的二次确认,或收到的消息结果为`Unknown`未知状态.RocketMQ会发起事务回查.             1. RocketMQ会向生产者发送回查请求,要求查询其本地事务状态             2. 生产者根据本地事务状态再次提交二次确认结果     - **第三阶段:消费者进行消费**         1. 当RocketMQ中的消息被标记为”可投递”之后,消息会被投递到消费者.消费者按其消费逻辑进行消费操作.最后向RocketMQ发送消费结果(成功/失败)         2. 消息被消费后,RocketMQ会标记其消息为”已消费”,RocketMQ会默认保留所有消息.支持消费者回溯历史消息 

RocketMQ+Spring Boot的简单实现及其深入分析

    ### 幂等问题          > 幂等性,值对同一操作多次执行,结果与仅执行一次效果相同     >      - 出现幂等的原因:         1. **生产者重复发送**:生产者客户端有可能因为某些网络问题导致发送失败,届时生产者会尝试发送相同的消息从而会导致RocketMQ重复消费         2. **重试机制**:RocketMQ提供了消息重试机制,在消息发送中出现异常时.消费者会重新拉取相同的消息进行重试.若消费者方没有处理幂等性,则消息会被重复消费         3. **集群下的消息重复消费**:在RocketMQ下的集群,如果多个消费者订阅相同的主题,且每个消费者都独立消费消息,那么同一个消息就会被不同的消费者组重复消费          ### 使用Redssion实现幂等性          ```java      consumer.registerMessageListener(new MessageListenerConcurrently() {                 @Override                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {                     for (MessageExt msg : msgs) {                         String msgId = msg.getMsgId();                         String lockKey = "rocketmq:msg:" + msgId;                         RLock lock = redissonClient.getLock(lockKey);                         boolean acquired = false;                              try {                             acquired = lock.tryLock(1, 10, TimeUnit.SECONDS);                             if (acquired) {                                 System.out.println("Processing message: " + new String(msg.getBody()) + ", MsgId: " + msgId);                                 Thread.sleep(100); // 模拟业务处理                                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                             } else {                                 System.out.println("Duplicate message skipped: " + msgId);                                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                             }                         } catch (Exception e) {                             System.err.println("Error processing message: " + msgId + ", error: " + e.getMessage());                             return ConsumeConcurrentlyStatus.RECONSUME_LATER;                         } finally {                             if (acquired) {                                 lock.unlock();                             }                         }                     }                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                 }             });     ```          ```java     consumer.registerMessageListener(new MessageListenerOrderly() {         @Override         public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {             for (MessageExt msg : msgs) {                 String msgId = msg.getMsgId();                 String lockKey = "rocketmq:msg:" + msgId;                 RLock lock = redissonClient.getLock(lockKey);                 boolean acquired = false;                      try {                     acquired = lock.tryLock(1, 10, TimeUnit.SECONDS);                     if (acquired) {                         System.out.println("Processing message: " + new String(msg.getBody()) + ", MsgId: " + msgId);                         Thread.sleep(100);                         return ConsumeOrderlyStatus.SUCCESS;                     } else {                         System.out.println("Duplicate message skipped: " + msgId);                         return ConsumeOrderlyStatus.SUCCESS;                     }                 } catch (Exception e) {                     System.err.println("Error processing message: " + msgId + ", error: " + e.getMessage());                     return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;                 } finally {                     if (acquired) {                         lock.unlock();                     }                 }             }             return ConsumeOrderlyStatus.SUCCESS;         }     });     ```          ### sendAndReceive()          - 用于实现请求-响应模式的核心方法,其允许在分布式系统中实现类似RCP同步通信的能力 

RocketMQ+Spring Boot的简单实现及其深入分析

    - 核心特性         - 同步通信:阻塞调用线程直到收到响应         - 双向交互:实现生产者与消费者的双向通信         - 解耦设计:保持MQ解耦特性同时实现同步交互     - 参数列表                  Message<?> sendAndReceive(`String destination,Message<?> requestMessage,long timeout`) throws MessagingException              - 业务场景:实时查询库存信息 

Consumer搭建

  1. 引入依赖配置consumer的group:name
        <dependency>             <groupId>org.apache.rocketmq</groupId>             <artifactId>rocketmq-spring-boot-starter</artifactId>             <version>2.3.4</version>         </dependency> 

RocketMQ+Spring Boot的简单实现及其深入分析

  1. 创建消息监听器

实现RocketMQListener接口,重写其onMessage()方法完成消费逻辑

  • 使用@RocketMQMessageListener(consumerGroup=””,topic=””)注解:来指定消费者组,及目标topic

RocketMQ+Spring Boot的简单实现及其深入分析

发表评论

评论已关闭。

相关文章