微服务项目使用RabbitMQ
很久未用RabbitMQ了,根据网上的Demo,大多数是一个服务包含所有的生产者和消费者和配置,当自己去搭建服务的时候,还需要一些思考各种包的划分.无法无脑CV大法,所以,下文,我根据实际项目抽离出一个比较完整的小Demo演示微服务项目使用RabbitMQ.注意:这个小Demo并没有做消息的可靠性相关操作!
公共服务
一般微服务有一个公共服务,用于存放一些配置类或者常量类等,这里我以service_util举例,这个服务下,应该会有以下四个包,如下:
├─config -- 配置类的包 │ RabbitMQConfig.java │ ├─MQConstant -- 常量的包 │ RabbitMQConstant.java │ ├─service -- 封装的服务的包 │ RabbitService.java │ └─vo -- Vo的包 OrderVo.java
RabbitMQConfig
这个类下,我们可以配置基本的RabbitMQ的消息的序列化,在消息的传输过程中,对象和Json的互相转化,详细代码如下:
package com.leixin.mq.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Classname RabbitMQConfig * @Description TODO * @Date 2023-08-07 21:25 * @Created by LeiXin */ /* 这个配置类的作用是将一个消息转换器添加到RabbitMQ中, 这样在消息发送和接收时, 就能够自动地将Java对象转换为JSON格式的消息, 以及将JSON格式的消息转换回Java对象。 */ @Configuration public class RabbitMQConfig { // 声明一个@Bean注解,表示这是一个Spring Bean,会由Spring容器进行管理和实例化 @Bean public MessageConverter getMessageConverter() { // 创建并返回一个Jackson2JsonMessageConverter实例作为消息转换器 // 这个转换器用于在消息发送和接收之间进行JSON格式的转换 return new Jackson2JsonMessageConverter(); } }
RabbitMQConstant
这个包用于指定消息队列的常量
package com.leixin.mq.MQConstant; /** * @Classname RabbitMQConstant * @Description TODO * @Date 2023-08-07 23:23 * @Created by LeiXin */ public class RabbitMQConstant { /** * 预约下单 */ public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order"; public static final String ROUTING_ORDER = "order"; //队列 public static final String QUEUE_ORDER = "queue.order"; /** * 短信 */ public static final String EXCHANGE_DIRECT_SMS = "exchange.direct.msm"; public static final String ROUTING_SMS_ITEM = "msm.item"; //队列 public static final String QUEUE_MSM_SMS = "queue.msm.item"; }
RabbitService
这个包用于封装消息的发送方式,之后发送消息只要使用RabbitService.
package com.leixin.mq.service; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @Classname RabbitService * @Description TODO * @Date 2023-08-07 23:07 * @Created by LeiXin */ @Service public class RabbitService { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 * @param exchange * @param routingKey * @param message * @return */ public boolean sendMessage(String exchange,String routingKey,Object message){ rabbitTemplate.convertAndSend(exchange,routingKey,message); return true; } }
OrderVo
这里注意Vo包都要放在公共包中,因为生产者和消费者的服务都使用这个对象进行消息的封装传输,这个OrderVo类,只是简单举个例子.
package com.leixin.mq.vo; /** * @Classname OrderVo * @Description TODO * @Date 2023-08-08 0:04 * @Created by LeiXin */ public class OrderVo { String id; String orderName; Integer count; public Integer getCount() { return count; } public void setCount(Integer count) { this.count = count; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getOrderName() { return orderName; } public void setOrderName(String orderName) { this.orderName = orderName; } }
依赖
<parent> <groupId>com.leixin</groupId> <artifactId>SpringBoot-RabbitMq</artifactId> <version>1.0-SNAPSHOT</version> </parent> <artifactId>service_util</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 提供 Spring Boot 对 AMQP(高级消息队列协议)的支持,包括 RabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- 提供 Java 的 Lombok 库,可以简化代码中的重复和样板代码,提高代码可读性和可维护性 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- 提供 Spring Boot 单元测试和集成测试所需的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
生产者
生产者用于发送消息,简单举例,就直接在Controller层发送消息
└─producer │ XxxApplication.java │ └─controller XxxController.java
XxxApplication
SpringBoot的启动类,注意,这个CompontScan主要是为了扫描到公共包的一些配置或者组件
@SpringBootApplication @ComponentScan("com.leixin") public class XxxApplication { public static void main(String[] args) { SpringApplication.run(XxxApplication.class, args); } }
XxxController
生产者的服务发送至消息队列
@RestController @RequestMapping("/test") public class XxxController { @Autowired private RabbitService rabbitService; @GetMapping public void testSendMessage(){ OrderVo orderVo = new OrderVo(); orderVo.setCount(100); rabbitService.sendMessage(RabbitMQConstant.EXCHANGE_DIRECT_ORDER ,RabbitMQConstant.ROUTING_ORDER, orderVo); } }
配置类
application.yml
# Spring配置部分 spring: rabbitmq: # RabbitMQ主机地址 host: localhost # RabbitMQ端口 port: 5672 # 连接RabbitMQ的用户名 username: guest # 连接RabbitMQ的密码 password: guest # 虚拟主机,默认为根虚拟主机"/" virtual-host: / # Spring Boot内嵌服务器配置部分 server: # 内嵌服务器监听的端口号 port: 8092
依赖
<dependencies> <dependency> <groupId>com.leixin</groupId> <artifactId>service_util</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies>
消费者
消费者主要用于处理一些消息,文件如下:
│ YyyApplication.java │ └─listener YyyMQListener.java
YyyMQListener
这里就是用于处理消息
package com.leixin.mq.listener; import com.leixin.mq.MQConstant.RabbitMQConstant; import com.leixin.mq.vo.OrderVo; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Classname YyyMQListener * @Description TODO * @Date 2023-08-07 23:54 * @Created by LeiXin */ @Component public class YyyMQListener { // 使用 @RabbitListener 注解标记这个方法作为消息监听器 // bindings 属性用于定义队列、交换机以及路由关系 @RabbitListener(bindings = { @QueueBinding( value = @Queue(name = RabbitMQConstant.QUEUE_ORDER, durable = "true"), // 创建队列 exchange = @Exchange(name = RabbitMQConstant.EXCHANGE_DIRECT_ORDER), // 创建交换机 key = RabbitMQConstant.ROUTING_ORDER ) }) // consume 方法用于处理收到的消息 public void consume(OrderVo order, Message message, Channel channel) { // 从传入的 OrderVo 对象中获取订单数量,并将其减少 Integer count = order.getCount(); count--; // 输出调试信息,展示订单数量的变化 System.out.println(count); // 在这里可以添加你的业务逻辑,处理订单相关的操作 // 例如,更新数据库中的订单状态、发送通知等 // 最后,可能需要手动确认消息的处理,通过 channel.basicAck 方法 } }
配置文件
application.yml
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / server: port: 8093
依赖
<dependencies> <dependency> <groupId>com.leixin</groupId> <artifactId>service_util</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies>