RabbitMQ快速使用代码手册

本篇博客的内容为RabbitMQ在开发过程中的快速上手使用,侧重于代码部分,几乎没有相关概念的介绍,相关概念请参考以下csdn博客,两篇都是我找的精华帖,供大家学习。本篇博客也持续更新~~~
内容代码部分由于word转md格式有些问题,可以直接查看我的有道云笔记,链接:https://note.youdao.com/s/Ab7Cjiu

参考文档

csdn博客:

基础部分:https://blog.csdn.net/qq_35387940/article/details/100514134

高级部分:https://blog.csdn.net/weixin_49076273/article/details/124991012

application.yml

server:  port: 8021  spring:  #给项目来个名字  application:  name: rabbitmq-provider  #配置rabbitMq 服务器  rabbitmq:  host: 127.0.0.1  port: 5672  username: root  password: root  #虚拟host 可以不设置,使用server默认host  virtual-host: JCcccHost  #确认消息已发送到交换机(Exchange)  #publisher-confirms: true  publisher-confirm-type: correlated  #确认消息已发送到队列(Queue)  publisher-returns: true 

完善更多信息

spring:  rabbitmq:  host: localhost  port: 5672  virtual-host: /  username: guest  password: guest  publisher-confirm-type: correlated  publisher-returns: true  template:  mandatory: true  retry:  #发布重试,默认false  enabled: true  #重试时间 默认1000ms  initial-interval: 1000  #重试最大次数 最大3  max-attempts: 3  #重试最大间隔时间  max-interval: 10000  #重试的时间隔乘数,比如配2,0 第一次等于10s,第二次等于20s,第三次等于40s  multiplier: 1  listener:  # 默认配置是simple  type: simple  simple:  # 手动ack Acknowledge mode of container. auto none  acknowledge-mode: manual  #消费者调用程序线程的最小数量  concurrency: 10  #消费者最大数量  max-concurrency: 10  #限制消费者每次只处理一条信息,处理完在继续下一条  prefetch: 1  #启动时是否默认启动容器  auto-startup: true  #被拒绝时重新进入队列  default-requeue-rejected: true 

相关注解说明

@RabbitListener 注解是指定某方法作为消息消费的方法,例如监听某 Queue
里面的消息。

@RabbitListener标注在方法上,直接监听指定的队列,此时接收的参数需要与发送市类型一致。

@Component  public class PointConsumer {  //监听的队列名  @RabbitListener(queues = "point.to.point")  public void processOne(String name) {  System.out.println("point.to.point:" + name);  }  } 

@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用

@RabbitListener 标注在类上面表示当有收到消息的时候,就交给
@RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。

@Component  @RabbitListener(queues = "consumer_queue")  public class Receiver {  @RabbitHandler  public void processMessage1(String message) {  System.out.println(message);  }  @RabbitHandler  public void processMessage2(byte[] message) {  System.out.println(new String(message));  }  } 

@Payload

可以获取消息中的 body 信息

@RabbitListener(queues = "debug")  public void processMessage1(@Payload String body) {  System.out.println("body:"+body);  } 

@Header,@Headers

可以获得消息中的 headers 信息

@RabbitListener(queues = "debug")  public void processMessage1(@Payload String body, @Header String token) {  System.out.println("body:"+body);  System.out.println("token:"+token);  }  @RabbitListener(queues = "debug")  public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {  System.out.println("body:"+body);  System.out.println("Headers:"+headers);  } 

快速使用

配置xml文件

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-amqp</artifactId>  </dependency> 

配置exchange、queue

注解快速创建版本

@Configuration  public class RabbitmqConfig {  //创建交换机  //通过ExchangeBuilder能创建direct、topic、Fanout类型的交换机  @Bean("bootExchange")  public Exchange bootExchange() {  return ExchangeBuilder.topicExchange("zx_topic_exchange").durable(true).build();  }  //创建队列  @Bean("bootQueue")  public Queue bootQueue() {  return QueueBuilder.durable("zx_queue").build();  }  /**  * 将队列与交换机绑定  *  * @param queue  * @param exchange  * @return  */  @Bean  public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {  return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();  }  } 

Direct

import org.springframework.amqp.core.Binding;  import org.springframework.amqp.core.BindingBuilder;  import org.springframework.amqp.core.DirectExchange;  import org.springframework.amqp.core.Queue;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;  /**  * @Author : JCccc  * @CreateTime : 2019/9/3  * @Description :  **/  @Configuration  public class DirectRabbitConfig {  //队列 起名:TestDirectQueue  @Bean  public Queue TestDirectQueue() {  // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效  // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable  // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。  // return new Queue("TestDirectQueue",true,true,false);  //一般设置一下队列的持久化就好,其余两个就是默认false  return new Queue("TestDirectQueue",true);  }  //Direct交换机 起名:TestDirectExchange  @Bean  DirectExchange TestDirectExchange() {  // return new DirectExchange("TestDirectExchange",true,true);  return new DirectExchange("TestDirectExchange",true,false);  }  //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting  @Bean  Binding bindingDirect() {  return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");  }  @Bean  DirectExchange lonelyDirectExchange() {  return new DirectExchange("lonelyDirectExchange");  }  } 

Fanout

import org.springframework.amqp.core.Binding;  import org.springframework.amqp.core.BindingBuilder;  import org.springframework.amqp.core.FanoutExchange;  import org.springframework.amqp.core.Queue;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;  /**  * @Author : JCccc  * @CreateTime : 2019/9/3  * @Description :  **/  @Configuration  public class FanoutRabbitConfig {  /**  * 创建三个队列 :fanout.A fanout.B fanout.C  * 将三个队列都绑定在交换机 fanoutExchange 上  * 因为是扇型交换机, 路由键无需配置,配置也不起作用  */  @Bean  public Queue queueA() {  return new Queue("fanout.A");  }  @Bean  public Queue queueB() {  return new Queue("fanout.B");  }  @Bean  public Queue queueC() {  return new Queue("fanout.C");  }  @Bean  FanoutExchange fanoutExchange() {  return new FanoutExchange("fanoutExchange");  }  @Bean  Binding bindingExchangeA() {  return BindingBuilder.bind(queueA()).to(fanoutExchange());  }  @Bean  Binding bindingExchangeB() {  return BindingBuilder.bind(queueB()).to(fanoutExchange());  }  @Bean  Binding bindingExchangeC() {  return BindingBuilder.bind(queueC()).to(fanoutExchange());  }  } 

Topic

import org.springframework.amqp.core.Binding;  import org.springframework.amqp.core.BindingBuilder;  import org.springframework.amqp.core.Queue;  import org.springframework.amqp.core.TopicExchange;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;  /**  * @Author : JCccc  * @CreateTime : 2019/9/3  * @Description :  **/  @Configuration  public class TopicRabbitConfig {  //绑定键  public final static String man = "topic.man";  public final static String woman = "topic.woman";  @Bean  public Queue firstQueue() {  return new Queue(TopicRabbitConfig.man);  }  @Bean  public Queue secondQueue() {  return new Queue(TopicRabbitConfig.woman);  }  @Bean  TopicExchange exchange() {  return new TopicExchange("topicExchange");  }  //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man  //这样只要是消息携带的路由键是topic.man,才会分发到该队列  @Bean  Binding bindingExchangeMessage() {  return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);  }  //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#  // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列  @Bean  Binding bindingExchangeMessage2() {  return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");  }  } 

生产者发送消息

直接发送给队列

//指定消息队列的名字,直接发送消息到消息队列中  @Test  public void testSimpleQueue() {  // 队列名称  String queueName = "simple.queue";  // 消息  String message = "hello, spring amqp!";  // 发送消息  rabbitTemplate.convertAndSend(queueName, message);  } 

发送给交换机,然后走不同的模式

////指定交换机的名字,将消息发送给交换机,然后不同模式下,消息队列根据key得到消息  @Test  public void testSendDirectExchange() {  // 交换机名称,有三种类型  String exchangeName = "itcast.direct";  // 消息  String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";  // 发送消息,red为队列的key,因此此队列会得到消息  rabbitTemplate.convertAndSend(exchangeName, "red", message);  } 

也可以将发送的消息封装到HashMap中然后发送给交换机

import org.springframework.amqp.rabbit.core.RabbitTemplate;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.web.bind.annotation.GetMapping;  import org.springframework.web.bind.annotation.RestController;  import java.time.LocalDateTime;  import java.time.format.DateTimeFormatter;  import java.util.HashMap;  import java.util.Map;  import java.util.UUID;  /**  * @Author : JCccc  * @CreateTime : 2019/9/3  * @Description :  **/  @RestController  public class SendMessageController {  @Autowired  RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法  @GetMapping("/sendDirectMessage")  public String sendDirectMessage() {  String messageId = String.valueOf(UUID.randomUUID());  String messageData = "test message, hello!";  String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));  Map<String,Object> map=new HashMap<>();  map.put("messageId",messageId);  map.put("messageData",messageData);  map.put("createTime",createTime);  //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange  rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);  return "ok";  }  } 

消费者接收消息

//使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。  @Component  public class MessageListener {  @RabbitListener(queues = "direct_queue")  public void receive(String id){  System.out.println("已完成短信发送业务(rabbitmq direct),id:"+id);  }  }  参数用Map接收也可以  @Component  @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue  public class DirectReceiver {  @RabbitHandler  public void process(Map testMessage) {  System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());  }  } 

高级特性

消息可靠性传递

有confirm和return两种

在application.yml中添加以下配置项:

server:  port: 8021  spring:  #给项目来个名字  application:  name: rabbitmq-provider  #配置rabbitMq 服务器  rabbitmq:  host: 127.0.0.1  port: 5672  username: root  password: root  #虚拟host 可以不设置,使用server默认host  virtual-host: JCcccHost  #确认消息已发送到交换机(Exchange)  #publisher-confirms: true  publisher-confirm-type: correlated  #确认消息已发送到队列(Queue)  publisher-returns: true 

有两种配置方法:

写到配置类中

写到工具类或者普通类中,但是这个类得实现那两个接口

写法一

编写消息确认回调函数

import org.springframework.amqp.core.Message;  import org.springframework.amqp.rabbit.connection.ConnectionFactory;  import org.springframework.amqp.rabbit.connection.CorrelationData;  import org.springframework.amqp.rabbit.core.RabbitTemplate;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;  @Configuration  public class RabbitConfig {  @Bean  public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){  RabbitTemplate rabbitTemplate = new RabbitTemplate();  rabbitTemplate.setConnectionFactory(connectionFactory);  //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数  rabbitTemplate.setMandatory(true);  rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {  @Override  public void confirm(CorrelationData correlationData, boolean ack, String cause) {  System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);  System.out.println("ConfirmCallback: "+"确认情况:"+ack);  System.out.println("ConfirmCallback: "+"原因:"+cause);  }  });  rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {  @Override  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {  System.out.println("ReturnCallback: "+"消息:"+message);  System.out.println("ReturnCallback: "+"回应码:"+replyCode);  System.out.println("ReturnCallback: "+"回应信息:"+replyText);  System.out.println("ReturnCallback: "+"交换机:"+exchange);  System.out.println("ReturnCallback: "+"路由键:"+routingKey);  }  });  return rabbitTemplate;  }  } 

写法二

@Component  @Slf4j  public class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {  @Resource  private RedisTemplate<String, String> redisTemplate;  @Resource  private RabbitTemplate rabbitTemplate;  private String finalId = null;  private SmsDTO smsDTO = null;  /**  * 发布者确认的回调  *  * @param correlationData 回调的相关数据。  * @param b ack为真,nack为假  * @param s 一个可选的原因,用于nack,如果可用,否则为空。  */  @Override  public void confirm(CorrelationData correlationData, boolean b, String s) {  // 消息发送成功,将redis中消息的状态(status)修改为1  if (b) {  redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX + finalId, "status", 1);  } else {  // 发送失败,放入redis失败集合中,并删除集合数据  log.error("短信消息投送失败:{}-->{}", correlationData, s);  redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);  redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId, this.smsDTO);  }  }  /**  * 发生异常时的消息返回提醒  *  * @param returnedMessage  */  @Override  public void returnedMessage(ReturnedMessage returnedMessage) {  log.error("发生异常,返回消息回调:{}", returnedMessage);  // 发送失败,放入redis失败集合中,并删除集合数据  redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);  redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId, this.smsDTO);  }  @PostConstruct  public void init() {  rabbitTemplate.setConfirmCallback(this);  rabbitTemplate.setReturnsCallback(this);  }  } 

消息确认机制

手动确认

yml配置  #手动确认 manual  listener:  simple:  acknowledge-mode: manual 

写法一

首先在消费者项目中创建MessageListenerConfig

import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;  import org.springframework.amqp.core.AcknowledgeMode;  import org.springframework.amqp.core.Queue;  import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;  @Configuration  public class MessageListenerConfig {  @Autowired  private CachingConnectionFactory connectionFactory;  @Autowired  private MyAckReceiver myAckReceiver;//消息接收处理类  @Bean  public SimpleMessageListenerContainer simpleMessageListenerContainer() {  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);  container.setConcurrentConsumers(1);  container.setMaxConcurrentConsumers(1);  container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息  //设置一个队列  container.setQueueNames("TestDirectQueue");  //如果同时设置多个如下: 前提是队列都是必须已经创建存在的  // container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");  //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues  //container.setQueues(new Queue("TestDirectQueue",true));  //container.addQueues(new Queue("TestDirectQueue2",true));  //container.addQueues(new Queue("TestDirectQueue3",true));  container.setMessageListener(myAckReceiver);  return container;  }  } 

然后创建手动确认监听类MyAckReceiver(手动确认模式需要实现ChannelAwareMessageListener)

import com.rabbitmq.client.Channel;  import org.springframework.amqp.core.Message;  import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;  import org.springframework.stereotype.Component;  import java.io.ByteArrayInputStream;  import java.io.ObjectInputStream;  import java.util.Map;  @Component  public class MyAckReceiver implements ChannelAwareMessageListener {  @Override  public void onMessage(Message message, Channel channel) throws Exception {  long deliveryTag = message.getMessageProperties().getDeliveryTag();  try {  byte[] body = message.getBody();  ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));  Map<String,String> msgMap = (Map<String,String>) ois.readObject();  String messageId = msgMap.get("messageId");  String messageData = msgMap.get("messageData");  String createTime = msgMap.get("createTime");  ois.close();  System.out.println(" MyAckReceiver messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);  System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());  channel.basicAck(deliveryTag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息  //channel.basicReject(deliveryTag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝  } catch (Exception e) {  channel.basicReject(deliveryTag, false);  e.printStackTrace();  }  }  } 

如果想实现不同的队列,有不同的监听确认处理机制,做不同的业务处理,那么这样做:

首先需要在配置类中绑定队列,然后只需要根据消息来自不同的队列名进行区分处理即可

import com.rabbitmq.client.Channel;  import org.springframework.amqp.core.Message;  import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;  import org.springframework.stereotype.Component;  import java.io.ByteArrayInputStream;  import java.io.ObjectInputStream;  import java.util.Map;  @Component  public class MyAckReceiver implements ChannelAwareMessageListener {  @Override  public void onMessage(Message message, Channel channel) throws Exception {  long deliveryTag = message.getMessageProperties().getDeliveryTag();  try {  byte[] body = message.getBody();  ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));  Map<String,String> msgMap = (Map<String,String>) ois.readObject();  String messageId = msgMap.get("messageId");  String messageData = msgMap.get("messageData");  String createTime = msgMap.get("createTime");  ois.close();  if ("TestDirectQueue".equals(message.getMessageProperties().getConsumerQueue())){  System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());  System.out.println("消息成功消费到 messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);  System.out.println("执行TestDirectQueue中的消息的业务处理流程......");  }  if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){  System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());  System.out.println("消息成功消费到 messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);  System.out.println("执行fanout.A中的消息的业务处理流程......");  }  channel.basicAck(deliveryTag, true);  //channel.basicReject(deliveryTag, true);//为true会重新放回队列  } catch (Exception e) {  channel.basicReject(deliveryTag, false);  e.printStackTrace();  }  }  } 

写法二

@Component  @Slf4j  public class SendSmsListener {  @Resource  private RedisTemplate<String, String> redisTemplate;  @Resource  private SendSmsUtils sendSmsUtils;  /**  * 监听发送短信普通队列  * @param smsDTO  * @param message  * @param channel  * @throws IOException  */  @RabbitListener(queues = SMS_QUEUE_NAME)  public void sendSmsListener(SmsDTO smsDTO, Message message, Channel channel) throws IOException {  String messageId = message.getMessageProperties().getMessageId();  int retryCount = (int) redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX + messageId, "retryCount");  if (retryCount > 3) {  //重试次数大于3,直接放到死信队列  log.error("短信消息重试超过3次:{}", messageId);  //basicReject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。  //该方法reject后,该消费者还是会消费到该条被reject的消息。  channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);  redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);  return;  }  try {  String phoneNum = smsDTO.getPhoneNum();  String code = smsDTO.getCode();  if(StringUtils.isAnyBlank(phoneNum,code)){  throw new RuntimeException("sendSmsListener参数为空");  }  // 发送消息  SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum, code);  SendStatus[] sendStatusSet = sendSmsResponse.getSendStatusSet();  SendStatus sendStatus = sendStatusSet[0];  if(!"Ok".equals(sendStatus.getCode()) ||!"send success".equals(sendStatus.getMessage())){  throw new RuntimeException("发送验证码失败");  }  //手动确认消息  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);  log.info("短信发送成功:{}",smsDTO);  redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);  } catch (Exception e) {  redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,"retryCount",retryCount+1);  channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);  }  }  /**  * 监听到发送短信死信队列  * @param sms  * @param message  * @param channel  * @throws IOException  */  @RabbitListener(queues = SMS_DELAY_QUEUE_NAME)  public void smsDelayQueueListener(SmsDTO sms, Message message, Channel channel) throws IOException {  try{  log.error("监听到死信队列消息==>{}",sms);  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);  }catch (Exception e){  channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);  }  }  } 

消费端限流

#配置RabbitMQ  spring:  rabbitmq:  host: 192.168.126.3  port: 5672  username: guest  password: guest  virtual-host: /  #开启自动确认 none 手动确认 manual  listener:  simple:  #消费端限流机制必须开启手动确认  acknowledge-mode: manual  #消费端最多拉取的消息条数,签收后不满该条数才会继续拉取  prefetch: 5 

消息存活时间TTL

可以设置队列的存活时间,也可以设置具体消息的存活时间

设置队列中所有消息的存活时间

return QueueBuilder

.durable(QUEUE_NAME)//队列持久化

.ttl(10000)//设置队列的所有消息存活10s

.build();

即在创建队列时,设置存活时间

设置某条消息的存活时间

//发送消息,并设置该消息的存活时间

@Test  public void testSendMessage()  {  //1.创建消息属性  MessageProperties messageProperties = new MessageProperties();  //2.设置存活时间  messageProperties.setExpiration("10000");  //3.创建消息对象  Message message = new Message("sendMessage...".getBytes(),messageProperties);  //4.发送消息  rabbitTemplate.convertAndSend("my_topic_exchange1","my_routing",message);  } 

若设置中间的消息的存活时间,当过期时,该消息不会被移除,但是该消息已经不会被消费了,需要等到该消息到队里顶端才会被移除。因为队列是头出,尾进,故而要移除它需要等到它在顶端时才可以。

在队列设置存活时间,也在单条消息设置存活时间,则以时间短的为准

死信队列

死信队列和普通队列没有任何区别,只需要将普通队列需要绑定死信交换机和死信队列就能够实现功能

import org.springframework.amqp.core.*;  import org.springframework.beans.factory.annotation.Qualifier;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;  @Configuration//Rabbit配置类  public class RabbitConfig4 {  private final String DEAD_EXCHANGE = "dead_exchange";  private final String DEAD_QUEUE = "dead_queue";  private final String NORMAL_EXCHANGE = "normal_exchange";  private final String NORMAL_QUEUE = "normal_queue";  //创建死信交换机  @Bean(DEAD_EXCHANGE)  public Exchange deadExchange()  {  return ExchangeBuilder  .topicExchange(DEAD_EXCHANGE)//交换机类型 ;参数为名字 topic为通配符模式的交换机  .durable(true)//是否持久化,true即存到磁盘,false只在内存上  .build();  }  //创建死信队列  @Bean(DEAD_QUEUE)  public Queue deadQueue()  {  return QueueBuilder  .durable(DEAD_QUEUE)//队列持久化  //.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源  .build();  }  //死信交换机绑定死信队列  @Bean  //@Qualifier注解,使用名称装配进行使用  public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE) Queue queue)  {  return BindingBuilder  .bind(queue)  .to(exchange)  .with("dead_routing")  .noargs();  }  //创建普通交换机  @Bean(NORMAL_EXCHANGE)  public Exchange normalExchange()  {  return ExchangeBuilder  .topicExchange(NORMAL_EXCHANGE)//交换机类型 ;参数为名字 topic为通配符模式的交换机  .durable(true)//是否持久化,true即存到磁盘,false只在内存上  .build();  }  //创建普通队列  @Bean(NORMAL_QUEUE)  public Queue normalQueue()  {  return QueueBuilder  .durable(NORMAL_QUEUE)//队列持久化  //.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源  .deadLetterExchange(DEAD_EXCHANGE)//绑定死信交换机  .deadLetterRoutingKey("dead_routing")//死信队列路由关键字  .ttl(10000)//消息存活10s  .maxLength(10)//队列最大长度为10  .build();  }  //普通交换机绑定普通队列  @Bean  //@Qualifier注解,使用名称装配进行使用  public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange, @Qualifier(NORMAL_QUEUE) Queue queue)  {  return BindingBuilder  .bind(queue)  .to(exchange)  .with("my_routing")  .noargs();  }  } 

延迟队列

RabbitMQ并未实现延迟队列功能,所以可以通过死信队列实现延迟队列的功能

即给普通队列设置存活时间30分钟,过期后发送至死信队列,在死信消费者监听死信队列消息,查看订单状态,是否支付,未支付则取消订单,回退库存即可。

消费者监听延迟队列

@Component  public class ExpireOrderConsumer {  //监听过期订单队列  @RabbitListener(queues = "expire_queue")  public void listenMessage(String orderId)  {  //模拟处理数据库等业务  System.out.println("查询"+orderId+"号订单的状态,如果已支付无需处理,如果未支付则回退库存");  }  }  控制层代码  @RestController  public class OrderController {  @Autowired  private RabbitTemplate rabbitTemplate;  @RequestMapping(value = "/place/{orderId}",method = RequestMethod.GET)  public String placeOrder(@PathVariable String orderId)  {  //模拟service层处理  System.out.println("处理订单数据...");  //将订单id发送到订单队列  rabbitTemplate.convertAndSend("order_exchange","order_routing",orderId);  return "下单成功,修改库存";  }  } 

发表评论

评论已关闭。

相关文章