RocketMQ部署手册
单MasterRocketMQ集群
系统要求与准备条件
-
64位操作系统,推荐 Linux/Unix/macOS
-
64位 JDK 1.8+
-
Maven
tips
检验java环境与maven环境
java -version

mvn -v

下载安装Apache RocketMQ
RocketMQ 的安装包分为两种,二进制包和源码包,二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的。
-
rocketmq-all-5.0.0-ALPHA-bin-release.zip
源码包直接解压
-
rocketmq-all-5.0.0-ALPHA-source-release.zip
二进制包使用maven编译
$ unzip rocketmq-all-5.0.0-source-release.zip $ cd rocketmq-all-5.0.0-source-release/ $ mvn -Prelease-all -DskipTests clean install -U $ cd distribution/target/rocketmq-5.0.0/rocketmq-5.0.0

启动NameServer
### 启动namesrv $ nohup sh bin/mqnamesrv & ### 验证namesrv是否启动成功 $ tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...

我们可以在namesrv.log 中看到 'The Name Server boot success..', 表示NameServer 已成功启动。

启动Broker
### 先启动broker $ nohup sh bin/mqbroker -n localhost:9876 & ### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a $ tail -f ~/logs/rocketmqlogs/Broker.log The broker[broker-a,192.169.1.2:10911] boot success...

我们可以在 Broker.log 中看到“The broker[brokerName,ip:port] boot success..”,这表明 broker 已成功启动。

工具测试消息收发
在进行工具测试消息收发之前,我们需要告诉客户端NameServer的地址,RocketMQ有多种方式在客户端中设置NameServer地址,这里我们利用环境变量NAMESRV_ADDR

$ export NAMESRV_ADDR=localhost:9876 $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId= ... $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt...


安装可视化控制台
1.下载项目
在 GitHub 中搜索 rocketmq-externals,其中 rocketmq-console 就是 RocketMQ 可视化控制台,我们可以将源码克隆下来,然后自己 mvn package,然后运行 jar 包。
或者直接下载官方提供的 1.0.0 版本的 rocketmq-console
https://github.com/apache/rocketmq-externals/releases/tag/rocketmq-console-1.0.0
下载 zip 包或者 tar 包

-
修改配置文件application.properties
配置rocketmq.config.namesrvAddr属性的值,即nameserver的服务地址
rocketmq.config.namesrvAddr=localhost:9876

-
保存修改后的配置文件,返回rocketmq-console目录
-
使用maven打包命令打包
mvn clean package -Dmaven.test.skip=true -
打包完成后进入target目录

rocketmq-console-ng-2.0.0.jar即为打包后得到的jar包
-
启动程序
nohup java -jar rocketmq-console-ng-2.0.0.jar &

SDK测试消息收发
准备工作
启动 NameServer 和 broker
nohup sh bin/mqnamesrv >mqnamesrv-log.txt & nohup sh bin/mqbroker -n 127.0.0.1:9876 >mqbroker-log.txt &
启动控制台
mvn spring-boot:run
创建一个 topic名为 test_quick_topic

工具测试完成后,我们可以尝试使用 SDK 收发消息。这里以 Java SDK 为例介绍一下消息收发过程。
-
在IDEA中创建一个Java工程。
-
在 pom.xml 文件中添加以下依赖引入Java依赖库。
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.4</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.13.graal</version> </dependency> </dependencies>
生产者
- 同步投递10条消息
package TestProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; public class TestProducer01 { public static final String NAMESRV_ADDR = "127.0.0.1:9876"; public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name"); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); for (int i = 0; i <10; i++) { Message message = new Message("test_quick_topic",//主题 "tagA", //标签 "key" + i, //自定义key,唯一标识 ("第" + i+"次消息").getBytes()); //消息内容实体 (byte[]) SendResult result = producer.send(message); System.out.println("第" + i + "条消息发出,结果:" + result); } producer.shutdown(); } }

消费者
- 消费上面生产者生产的10条消息
package TestConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.List; public class TestConsumer01 { public static final String NAMESRV_ADDR = "127.0.0.1:9876"; public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name"); consumer.setNamesrvAddr(NAMESRV_ADDR); //从最后开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // consumer.subscribe("test_quick_topic","tagA"); //过滤:消费tag为tagA的消息 consumer.subscribe("test_quick_topic", "*"); //消费所有的 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = list.get(0); try { String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String keys = messageExt.getKeys(); String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ", body: " + msgBody); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("comsumer start"); } }

关闭服务器
$ sh bin/mqshutdown broker The mqbroker(36695) is running... Send shutdown request to mqbroker(36695) OK $ sh bin/mqshutdown namesrv The mqnamesrv(36664) is running... Send shutdown request to mqnamesrv(36664) OK