SpringBoot整合RabbitMQ实战附加死信交换机

前言

使用springboot,实现以下功能,有两个队列1、2,往里面发送消息,如果处理失败发生异常,可以重试3次,重试3次均失败,那么就将消息发送到死信队列进行统一处理,例如记录数据库、报警等
完整demo项目代码https://gitee.com/daenmax/rabbit-mq-demo

环境

Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.4
1.双击C:Program FilesRabbitMQ Serverrabbitmq_server-3.10.4sbinrabbitmq-server.bat启动MQ服务
2.然后访问http://localhost:15672/,默认账号密码均为guest,
3.手动添加一个虚拟主机为admin_host,手动创建一个用户账号密码均为admin

pom.xml

        <!-- RabbitMQ -->         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-amqp</artifactId>             <version>2.7.0</version>         </dependency> 

配置

spring:   rabbitmq:     host: 127.0.0.1     port: 5672     username: admin     password: admin     virtual-host: admin_host     publisher-confirm-type: correlated     publisher-returns: true     listener:       simple:         acknowledge-mode: manual         retry:           enabled: true    #开启失败重试           max-attempts: 3    #最大重试次数           initial-interval: 1000  #重试间隔时间 毫秒 

配置文件

RabbitConfig

package com.example.rabitmqdemo.mydemo.config;  import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;  import java.util.HashMap; import java.util.Map;   /**  * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,  * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。  * Queue:消息的载体,每个消息都会被投到一个或多个队列。  * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.  * Routing Key:路由关键字,exchange根据这个关键字进行消息投递。  * vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。  * Producer:消息生产者,就是投递消息的程序.  * Consumer:消息消费者,就是接受消息的程序.  * Channel:消息通道,在客户端的每个连接里,可建立多个channel.  */ @Slf4j @Component public class RabbitConfig {     //业务交换机     public static final String EXCHANGE_PHCP = "phcp";     //业务队列1     public static final String QUEUE_COMPANY = "company";     //业务队列1的key     public static final String ROUTINGKEY_COMPANY = "companyKey";     //业务队列2     public static final String QUEUE_PROJECT = "project";     //业务队列2的key     public static final String ROUTINGKEY_PROJECT = "projectKey";      //死信交换机     public static final String EXCHANGE_PHCP_DEAD = "phcp_dead";     //死信队列1     public static final String QUEUE_COMPANY_DEAD = "company_dead";     //死信队列2     public static final String QUEUE_PROJECT_DEAD = "project_dead";     //死信队列1的key     public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead";     //死信队列2的key     public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead";   //    /** //     * 解决重复确认报错问题,如果没有报错的话,就不用启用这个 //     * //     * @param connectionFactory //     * @return //     */ //    @Bean //    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { //        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); //        factory.setConnectionFactory(connectionFactory); //        factory.setMessageConverter(new Jackson2JsonMessageConverter()); //        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //        return factory; //    }      /**      * 声明业务交换机      * 1. 设置交换机类型      * 2. 将队列绑定到交换机      * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念      * HeadersExchange :通过添加属性key-value匹配      * DirectExchange:按照routingkey分发到指定队列      * TopicExchange:多关键字匹配      */     @Bean("exchangePhcp")     public DirectExchange exchangePhcp() {         return new DirectExchange(EXCHANGE_PHCP);     }      /**      * 声明死信交换机      * 1. 设置交换机类型      * 2. 将队列绑定到交换机      * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念      * HeadersExchange :通过添加属性key-value匹配      * DirectExchange:按照routingkey分发到指定队列      * TopicExchange:多关键字匹配      */     @Bean("exchangePhcpDead")     public DirectExchange exchangePhcpDead() {         return new DirectExchange(EXCHANGE_PHCP_DEAD);     }      /**      * 声明业务队列1      *      * @return      */     @Bean("queueCompany")     public Queue queueCompany() {         Map<String,Object> arguments = new HashMap<>(2);         arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);         //绑定该队列到死信交换机的队列1         arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD);         return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build();     }     /**      * 声明业务队列2      *      * @return      */     @Bean("queueProject")     public Queue queueProject() {         Map<String,Object> arguments = new HashMap<>(2);         arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);         //绑定该队列到死信交换机的队列2         arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD);         return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build();     }      /**      * 声明死信队列1      *      * @return      */     @Bean("queueCompanyDead")     public Queue queueCompanyDead() {         return new Queue(QUEUE_COMPANY_DEAD);     }     /**      * 声明死信队列2      *      * @return      */     @Bean("queueProjectDead")     public Queue queueProjectDead() {         return new Queue(QUEUE_PROJECT_DEAD);     }      /**      * 绑定业务队列1和业务交换机      * @param queue      * @param directExchange      * @return      */     @Bean     public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {         return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY);     }      /**      * 绑定业务队列2和业务交换机      * @param queue      * @param directExchange      * @return      */     @Bean     public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {         return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT);     }      /**      * 绑定死信队列1和死信交换机      * @param queue      * @param directExchange      * @return      */     @Bean     public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {         return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD);     }      /**      * 绑定死信队列2和死信交换机      * @param queue      * @param directExchange      * @return      */     @Bean     public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {         return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD);     } } 

生产者

RabbltProducer

package com.example.rabitmqdemo.mydemo.producer;  import com.example.rabitmqdemo.mydemo.config.RabbitConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;  import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.UUID;  @Component @Slf4j public class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{     @Resource     private RabbitTemplate rabbitTemplate;      /**      * 初始化消息确认函数      */     @PostConstruct     public void init() {         rabbitTemplate.setConfirmCallback(this);         rabbitTemplate.setReturnsCallback(this);         rabbitTemplate.setMandatory(true);      }      /**      * 发送消息服务器确认函数      * @param correlationData      * @param ack      * @param cause      */     @Override     public void confirm(CorrelationData correlationData, boolean ack, String cause) {         if (ack) {             System.out.println("消息发送成功" + correlationData);         } else {             System.out.println("消息发送失败:" + cause);         }     }      /**      * 消息发送失败,消息回调函数      * @param returnedMessage      */     @Override     public void returnedMessage(ReturnedMessage returnedMessage) {         String str = new String(returnedMessage.getMessage().getBody());         System.out.println("消息发送失败:" + str);     }      /**      * 处理消息发送到队列1      * @param str      */     public void sendCompany(String str){         MessageProperties messageProperties = new MessageProperties();         messageProperties.setContentType("application/json");         Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());         this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData);         //也可以用下面的方式         //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());         //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData);     }      /**      * 处理消息发送到队列2      * @param str      */     public void sendProject(String str){         MessageProperties messageProperties = new MessageProperties();         messageProperties.setContentType("application/json");         Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());         this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData);         //也可以用下面的方式         //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());         //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData);     } }  

业务消费者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;  import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;  import java.io.IOException;  /**  * 监听业务交换机  * @author JeWang  */ @Component @Slf4j public class RabbitConsumer {     /**      * 监听业务队列1      * @param message      * @param channel      * @throws IOException      */     @RabbitListener(queues = "company")     public void company(Message message, Channel channel) throws IOException {         try{             System.out.println("次数" + message.getMessageProperties().getDeliveryTag());             channel.basicQos(1);             Thread.sleep(2000);             String s = new String(message.getBody());             log.info("处理消息"+s);             //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机             //String str = null;             //str.split("1");             //处理成功,确认应答             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);         }catch (Exception e){             log.error("处理消息时发生异常:"+e.getMessage());             Boolean redelivered = message.getMessageProperties().getRedelivered();             if(redelivered){                 log.error("异常重试次数已到达设置次数,将发送到死信交换机");                 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);             }else {                 log.error("消息即将返回队列处理重试");                 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);             }         }     }     /**      * 监听业务队列2      * @param message      * @param channel      * @throws IOException      */     @RabbitListener(queues = "project")     public void project(Message message, Channel channel) throws IOException {         try{             System.out.println("次数" + message.getMessageProperties().getDeliveryTag());             channel.basicQos(1);             Thread.sleep(2000);             String s = new String(message.getBody());             log.info("处理消息"+s);             //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机             //String str = null;             //str.split("1");             //处理成功,确认应答             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);         }catch (Exception e){             log.error("处理消息时发生异常:"+e.getMessage());             Boolean redelivered = message.getMessageProperties().getRedelivered();             if(redelivered){                 log.error("异常重试次数已到达设置次数,将发送到死信交换机");                 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);             }else {                 log.error("消息即将返回队列处理重试");                 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);             }         }     } }   

死信消费者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;  import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;  import java.io.IOException;  /**  * 监听死信交换机  * @author JeWang  */ @Component @Slf4j public class RabbitConsumerDead {     /**      * 处理死信队列1      * @param message      * @param channel      * @throws IOException      */     @RabbitListener(queues = "company_dead")     public void company_dead(Message message, Channel channel) throws IOException {         try{             channel.basicQos(1);             String s = new String(message.getBody());             log.info("处理死信"+s);             //在此处记录到数据库、报警之类的操作             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);         }catch (Exception e){             log.error("接收异常:"+e.getMessage());         }     }     /**      * 处理死信队列2      * @param message      * @param channel      * @throws IOException      */     @RabbitListener(queues = "project_dead")     public void project_dead(Message message, Channel channel) throws IOException {         try{             channel.basicQos(1);             String s = new String(message.getBody());             log.info("处理死信"+s);             //在此处记录到数据库、报警之类的操作             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);         }catch (Exception e){             log.error("接收异常:"+e.getMessage());         }     } } 

测试

MqController

package com.example.rabitmqdemo.mydemo.controller; import com.example.rabitmqdemo.mydemo.producer.RabbltProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;  import javax.annotation.Resource;  @RequestMapping("/def") @RestController @Slf4j public class MsgController {     @Resource     private RabbltProducer rabbltProducer;          @RequestMapping("/handleCompany")     public void handleCompany(@RequestBody String jsonStr){         rabbltProducer.sendCompany(jsonStr);     }  } 

发表评论

相关文章