SpringBoot3集成RocketMq

标签:RocketMq5.Dashboard;

一、简介

RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;

二、环境部署

1、编译打包

1、下载5.0版本源码包 rocketmq-all-5.0.0-source-release.zip  2、解压后进入目录,编译打包 mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U 

SpringBoot3集成RocketMq

2、修改配置

在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh 

SpringBoot3集成RocketMq

distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh 

SpringBoot3集成RocketMq

3、服务启动

1、该目录下 distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/  2、启动NameServer sh mqnamesrv  输出日志 The Name Server boot success. serializeType=JSON  3、启动Broker+Proxy sh mqbroker -n localhost:9876 --enable-proxy  输出日志 rocketmq-proxy startup successfully  4、关闭服务 sh mqshutdown namesrv Send shutdown request to mqnamesrv(18636) OK  sh mqshutdown broker Send shutdown request to mqbroker with proxy enable OK(18647) 

4、控制台安装

1、下载master源码包 rocketmq-dashboard-master  2、解压后进入目录,编译打包 mvn clean package -Dmaven.test.skip=true  3、启动服务 java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar  4、输出日志 INFO main - Tomcat started on port(s): 8080 (http) with context path ''  5、访问服务:localhost:8080 

SpringBoot3集成RocketMq

三、工程搭建

1、工程结构

SpringBoot3集成RocketMq

2、依赖管理

rocketmq-starter组件中,实际上依赖的是rocketmq-client组件的5.0版本,由于两个新版框架间的兼容问题,需要添加相关配置解决该问题;

<dependency>     <groupId>org.apache.rocketmq</groupId>     <artifactId>rocketmq-spring-boot-starter</artifactId>     <version>${rocketmq-starter.version}</version> </dependency> 

3、配置文件

配置RocketMq服务地址,消息生产者和消费者;

rocketmq:   name-server: 127.0.0.1:9876   # 生产者   producer:     group: boot_group_1     # 消息发送超时时间     send-message-timeout: 3000     # 消息最大长度4M     max-message-size: 4096     # 消息发送失败重试次数     retry-times-when-send-failed: 3     # 异步消息发送失败重试次数     retry-times-when-send-async-failed: 2   # 消费者   consumer:     group: boot_group_1     # 每次提取的最大消息数     pull-batch-size: 5 

4、配置类

在配置类中主要定义两个Bean的加载,即RocketMQTemplateDefaultMQProducer,主要是提供消息发送的能力,即生产消息;

@Configuration public class RocketMqConfig {      @Value("${rocketmq.name-server}")     private String nameServer;      @Value("${rocketmq.producer.group}")     private String producerGroup;      @Value("${rocketmq.producer.send-message-timeout}")     private Integer sendMsgTimeout;      @Value("${rocketmq.producer.max-message-size}")     private Integer maxMessageSize;      @Value("${rocketmq.producer.retry-times-when-send-failed}")     private Integer retryTimesWhenSendFailed ;      @Value("${rocketmq.producer.retry-times-when-send-async-failed}")     private Integer retryTimesWhenSendAsyncFailed ;      @Bean     public RocketMQTemplate rocketMqTemplate(){         RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();         rocketMqTemplate.setProducer(defaultMqProducer());         return rocketMqTemplate;     }      @Bean     public DefaultMQProducer defaultMqProducer() {         DefaultMQProducer producer = new DefaultMQProducer();         producer.setNamesrvAddr(this.nameServer);         producer.setProducerGroup(this.producerGroup);         producer.setSendMsgTimeout(this.sendMsgTimeout);         producer.setMaxMessageSize(this.maxMessageSize);         producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);         producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);         return producer;     } } 

四、基础用法

1、消息生产

编写一个生产者接口类,分别使用RocketMQTemplateDefaultMQProducer实现消息发送的功能,然后可以通过Dashboard控制面板查看消息详情;

@RestController public class ProducerWeb {     private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class);      @Autowired     private RocketMQTemplate rocketMqTemplate;      @GetMapping("/send/msg1")     public String sendMsg1 (){         try {             // 构建消息主体             JsonMapper jsonMapper = new JsonMapper();             String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg"));             // 发送消息             rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody);         } catch (Exception e) {             e.printStackTrace();         }         return "OK" ;     }      @Autowired     private DefaultMQProducer defaultMqProducer ;      @GetMapping("/send/msg2")     public String sendMsg2 (){         try {             // 构建消息主体             JsonMapper jsonMapper = new JsonMapper();             String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg"));             // 构建消息对象             Message message = new Message();             message.setTopic("boot-mq-topic");             message.setTags("boot-mq-tag");             message.setKeys("boot-mq-key");             message.setBody(msgBody.getBytes());             // 发送消息,打印日志             SendResult sendResult = defaultMqProducer.send(message);             log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus());         } catch (Exception e) {             e.printStackTrace();         }         return "OK" ;     } } 

2、消息消费

编写消息监听类,实现RocketMQListener接口,通过RocketMQMessageListener注解控制监听的具体信息;

@Service @RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic") public class ConsumerListener implements RocketMQListener<String> {      private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);      @Override     public void onMessage(String message) {         log.info("n=====n message:{} n=====n",message);     } } 

SpringBoot3集成RocketMq

五、参考源码

文档仓库: https://gitee.com/cicadasmile/butte-java-note  源码仓库: https://gitee.com/cicadasmile/butte-spring-parent 

发表评论

评论已关闭。

相关文章