使用 rocketmq5 是比较简单的事情。也有些同学对 sdk 原始接口会陌生,会希望有个集成的示例。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>${rocketmq5.version}</version> </dependency>
完整的集成代码参考:
https://gitee.com/opensolon/solon-examples/tree/main/b.Integration/demoB002-rocketmq5
希望更加简化使用的同学,可以使用:
rocketmq5-solon-cloud-plugin (但定制性会变差)
1、看看配置怎么搞?
使用 Solon 初始器 生成一个 Solon Web 模板项目,然后添加上面的 rocketmq5 依赖。
- 添加 yml 配置(具体的配置属性,参考:ClientConfigurationBuilder,ProducerBuilder, PushConsumerBuilder)
solon.app: name: "demo-app" group: "demo" solon.logging: logger: root: level: INFO # 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考) solon.rocketmq: properties: #公共配置(配置项,参考:ClientConfigurationBuilder) endpoints: "127.0.0.1:8081" sessionCredentialsProvider: "@type": "demoB002.SessionCredentialsProviderImpl" # solon 支持 "@type" 类型申明当前配置节的实例类型 accessKey: "xxx" accessSecret: "xxx" securityToken: "xxx" requestTimeout: "10s" producer: #生产者专属配置(配置项,参考:ProducerBuilder) maxAttempts: 3 consumer: #消费者专属配置(配置项,参考:PushConsumerBuilder) consumerGroup: "${solon.app.group}_${solon.app.name}" consumptionThreadCount: 2 maxCacheMessageCount: 1 maxCacheMessageSizeInBytes: 1
- 添加 java 配置器
@Configuration public class RocketmqConfig { private ClientServiceProvider clientProvider = ClientServiceProvider.loadService(); @Bean public ClientConfiguration client(@Inject("${solon.rocketmq.properties}") Properties common){ ClientConfigurationBuilder builder = ClientConfiguration.newBuilder(); //注入属性 Utils.injectProperties(builder, common); return builder.build(); } @Bean public Producer producer(@Inject("${solon.rocketmq.producer}") Properties producer, ClientConfiguration clientConfiguration) throws ClientException { ProducerBuilder producerBuilder = clientProvider.newProducerBuilder(); //注入属性 if (producer.size() > 0) { Utils.injectProperties(producerBuilder, producer); } producerBuilder.setClientConfiguration(clientConfiguration); return producerBuilder.build(); } @Bean public PushConsumer consumer(@Inject("${solon.rocketmq.consumer}") Properties consumer, ClientConfiguration clientConfiguration, MessageListener messageListener) throws ClientException{ //按需选择 PushConsumerBuilder 或 SimpleConsumerBuilder PushConsumerBuilder consumerBuilder = clientProvider.newPushConsumerBuilder(); //注入属性 Utils.injectProperties(consumerBuilder, consumer); Map<String, FilterExpression> subscriptionExpressions = new HashMap<>(); subscriptionExpressions.put("topic.test", new FilterExpression("*")); consumerBuilder.setSubscriptionExpressions(subscriptionExpressions); consumerBuilder.setClientConfiguration(clientConfiguration); consumerBuilder.setMessageListener(messageListener); return consumerBuilder.build(); } } //这个实现类,(相对于 StaticSessionCredentialsProvider)方便配置自动注入 public class SessionCredentialsProviderImpl implements SessionCredentialsProvider { private String accessKey; private String accessSecret; private String securityToken; private SessionCredentials sessionCredentials; @Override public SessionCredentials getSessionCredentials() { if (sessionCredentials == null) { if (securityToken == null) { sessionCredentials = new SessionCredentials(accessKey, accessSecret); } else { sessionCredentials = new SessionCredentials(accessKey, accessSecret, securityToken); } } return sessionCredentials; } }
2、代码应用
- 发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):
@Controller public class DemoController { @Inject private Producer producer; @Mapping("/send") public void send(String msg) throws ClientException { //发送 producer.send(new MessageBuilderImpl() .setTopic("topic.test") .setBody(msg.getBytes()) .build()); } }
- 监听(或消费),这里采用订阅回调的方式:(仅供参考)
@Component public class DemoMessageListener implements MessageListener { @Override public ConsumeResult consume(MessageView messageView) { System.out.println(messageView); return ConsumeResult.SUCCESS; } }