solon 集成 rocketmq5 sdk

使用 rocketmq5 是比较简单的事情。也有些同学对 sdk 原始接口会陌生,会希望有个集成的示例。

<dependency>     <groupId>org.apache.rocketmq</groupId>     <artifactId>rocketmq-client-java</artifactId>     <version>${rocketmq5.version}</version> </dependency> 

完整的集成代码参考:

https://gitee.com/opensolon/solon-examples/tree/main/b.Integration/demoB002-rocketmq5

希望更加简化使用的同学,可以使用:

rocketmq5-solon-cloud-plugin (但定制性会变差)

1、看看配置怎么搞?

使用 Solon 初始器 生成一个 Solon Web 模板项目,然后添加上面的 rocketmq5 依赖。

  • 添加 yml 配置(具体的配置属性,参考:ClientConfigurationBuilder,ProducerBuilder, PushConsumerBuilder)
solon.app:   name: "demo-app"   group: "demo"  solon.logging:   logger:     root:       level: INFO  # 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考) solon.rocketmq:   properties:  #公共配置(配置项,参考:ClientConfigurationBuilder)     endpoints: "127.0.0.1:8081"     sessionCredentialsProvider:       "@type": "demoB002.SessionCredentialsProviderImpl" # solon 支持 "@type" 类型申明当前配置节的实例类型       accessKey: "xxx"       accessSecret: "xxx"       securityToken: "xxx"     requestTimeout: "10s"   producer: #生产者专属配置(配置项,参考:ProducerBuilder)     maxAttempts: 3   consumer: #消费者专属配置(配置项,参考:PushConsumerBuilder)     consumerGroup: "${solon.app.group}_${solon.app.name}"     consumptionThreadCount: 2     maxCacheMessageCount: 1     maxCacheMessageSizeInBytes: 1 
  • 添加 java 配置器
@Configuration public class RocketmqConfig {     private ClientServiceProvider clientProvider = ClientServiceProvider.loadService();          @Bean     public ClientConfiguration client(@Inject("${solon.rocketmq.properties}") Properties common){         ClientConfigurationBuilder builder = ClientConfiguration.newBuilder();         //注入属性         Utils.injectProperties(builder, common);          return builder.build();     }      @Bean     public Producer producer(@Inject("${solon.rocketmq.producer}") Properties producer,                              ClientConfiguration clientConfiguration) throws ClientException {         ProducerBuilder producerBuilder = clientProvider.newProducerBuilder();          //注入属性         if (producer.size() > 0) {             Utils.injectProperties(producerBuilder, producer);         }          producerBuilder.setClientConfiguration(clientConfiguration);          return producerBuilder.build();     }      @Bean     public PushConsumer consumer(@Inject("${solon.rocketmq.consumer}") Properties consumer,                                  ClientConfiguration clientConfiguration,                                  MessageListener messageListener) throws ClientException{          //按需选择 PushConsumerBuilder 或 SimpleConsumerBuilder         PushConsumerBuilder consumerBuilder = clientProvider.newPushConsumerBuilder();          //注入属性         Utils.injectProperties(consumerBuilder, consumer);          Map<String, FilterExpression> subscriptionExpressions = new HashMap<>();         subscriptionExpressions.put("topic.test",  new FilterExpression("*"));          consumerBuilder.setSubscriptionExpressions(subscriptionExpressions);         consumerBuilder.setClientConfiguration(clientConfiguration);         consumerBuilder.setMessageListener(messageListener);          return consumerBuilder.build();     } }  //这个实现类,(相对于 StaticSessionCredentialsProvider)方便配置自动注入 public class SessionCredentialsProviderImpl implements SessionCredentialsProvider {     private String accessKey;     private String accessSecret;     private String securityToken;      private SessionCredentials sessionCredentials;      @Override     public SessionCredentials getSessionCredentials() {         if (sessionCredentials == null) {             if (securityToken == null) {                 sessionCredentials = new SessionCredentials(accessKey, accessSecret);             } else {                 sessionCredentials = new SessionCredentials(accessKey, accessSecret, securityToken);             }         }          return sessionCredentials;     } } 

2、代码应用

  • 发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):
@Controller public class DemoController {     @Inject     private Producer producer;      @Mapping("/send")     public void send(String msg) throws ClientException {         //发送         producer.send(new MessageBuilderImpl()                 .setTopic("topic.test")                 .setBody(msg.getBytes())                 .build());     } } 
  • 监听(或消费),这里采用订阅回调的方式:(仅供参考)
@Component public class DemoMessageListener implements MessageListener {      @Override     public ConsumeResult consume(MessageView messageView) {         System.out.println(messageView);          return ConsumeResult.SUCCESS;     } } 

发表评论

评论已关闭。

相关文章