【RocketMQ】消息拉模式分析

RocketMQ有两种获取消息的方式,分别为推模式和拉模式。

推模式
推模式在【RocketMQ】消息的拉取一文中已经讲过,虽然从名字上看起来是消息到达Broker后推送给消费者,实际上还是需要消费向Broker发送拉取请求获取消息内容,推模式对应的消息消费实现类为DefaultMQPushConsumerImpl,回顾一下推模式下的消息消费过程:

  1. 消费者在启动的时候做一些初始化工作,它会创建MQClientInstance并进行启动;
  2. MQClientInstance中引用了消息拉取服务PullMessageService和负载均衡服务RebalanceService,它们都继承了ServiceThread,MQClientInstance在启动后也会对它们进行启动,所以消息拉取线程和负载均衡线程也就启动了;
  3. 负载均衡服务启动后,会对该消费者订阅的主题进行负载均衡,为消费者分配消息队列,并创建PullRequest拉取请求,用于拉取消息;
  4. PullMessageService中等待阻塞队列中PullRequest拉取请求的到来,接着会调用DefaultMQPushConsumerImplpullMessage方法进行消息拉取;
  5. 消费者向Broker发送拉取消息的请求,从Broker拉取消息;
  6. 消费者对Broker返回的响应数据进行处理,解析消息进行消费;

推模式下进行消息消费的例子:

@RunWith(MockitoJUnitRunner.class) public class DefaultMQPushConsumerTest {     private String consumerGroup;     private String topic = "FooBar";     private String brokerName = "BrokerA";     private MQClientInstance mQClientFactory;      @Mock     private MQClientAPIImpl mQClientAPIImpl;     private static DefaultMQPushConsumer pushConsumer;      @Before     public void init() throws Exception {         // ...         // 消费者组         consumerGroup = "FooBarGroup" + System.currentTimeMillis();         // 实例化DefaultMQPushConsumer         pushConsumer = new DefaultMQPushConsumer(consumerGroup);         pushConsumer.setNamesrvAddr("127.0.0.1:9876");         // 设置拉取间隔         pushConsumer.setPullInterval(60 * 1000);         // 注册消息监听器         pushConsumer.registerMessageListener(new MessageListenerConcurrently() {             @Override             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,                 ConsumeConcurrentlyContext context) {                 Optional.ofNullable(result).orElse(new ArrayList<MessageExt>()).stream().forEach(x-> {                     // 处理消息                     System.out.println(new String(x.getBody()));                 });                 return null;             }         });         // ...         // 设置订阅的主题         pushConsumer.subscribe(topic, "*");         // 启动消费者         pushConsumer.start();     } } 

消息推模式的详细过程可参考【RocketMQ】消息的拉取,接下来我们看一下拉模式。

拉模式
首先来看一下拉模式下进行消息消费的例子,拉模式下需要消费者不断调用poll方法获取消息,底层是一个阻塞队列,如果队列中没有数据,会进入等待直到队列中增加了数据:

 private void testPull() {         // 创建DefaultLitePullConsumer         DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumerGroup");;         try {             litePullConsumer.setNamesrvAddr("127.0.0.1:9876");             litePullConsumer.subscribe("LitePullConsumerTest", "*");             litePullConsumer.start();             litePullConsumer.setPollTimeoutMillis(20 * 1000);             while(true) {                 // 获取消息                 List<MessageExt> result = litePullConsumer.poll();                 Optional.ofNullable(result).orElse(new ArrayList<MessageExt>()).stream().forEach(x-> {                     // 处理消息                     System.out.println(new String(x.getBody()));                 });             }         } catch (Exception e) {             e.printStackTrace();         } finally {             litePullConsumer.shutdown();         }     } 

推模式与拉模式的区别
对比上面推模式进行消费的例子,从使用方式上来讲,推模式不需要消费者主动去拉取消息,只需要注册消息监听器,当有消息到达时,触发consumeMessage方法进行消息消费,从表面上看就像是Broker主动推送给消费者一样,所以叫做推模式,尽管底层还是需要消费者发起拉取请求向Broker拉取消息

拉模式在使用方式上,需要消费者主动调用poll方法获取消息,从表面上看消费者需要不断主动进行消息拉取,所以叫做拉模式。

拉模式实现原理

拉模式下对应的消息拉取实现类为DefaultLitePullConsumerImpl,在DefaultLitePullConsumerDefaultMQPullConsumer被标注了@Deprecated,已不推荐使用)的构造函数中,可以看到对其进行了实例化,并在start方进行了启动:

public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {     // 拉模式下默认的消息拉取实现类     private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;      public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {         this.namespace = namespace;         this.consumerGroup = consumerGroup;         // 创建DefaultLitePullConsumerImpl         defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);     }      @Override     public void start() throws MQClientException {         setTraceDispatcher();         setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));         // 启动DefaultLitePullConsumerImpl         this.defaultLitePullConsumerImpl.start();         // ...     } } 

与消息推模式类似,DefaultLitePullConsumerImpl的start的方法主要做一些初始化的工作:

  1. 初始化客户端实例对象mQClientFactory,对应实现类为MQClientInstance,拉取服务线程、负载均衡线程都是通过MQClientInstance启动的;
  2. 初始化负载均衡类,拉模式对应的负载均衡类为RebalanceLitePullImpl
  3. 创建消息拉取API对象PullAPIWrapper,用于向Broker发送拉取消息的请求;
  4. 初始化消息拉取偏移量;
  5. 启动一些定时任务;
public class DefaultLitePullConsumerImpl implements MQConsumerInner {     public synchronized void start() throws MQClientException {         switch (this.serviceState) {             case CREATE_JUST:                 this.serviceState = ServiceState.START_FAILED;                 this.checkConfig();                 if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {                     this.defaultLitePullConsumer.changeInstanceNameToPID();                 }                 // 初始化MQClientInstance                 initMQClientFactory();                 // 初始化负载均衡                 initRebalanceImpl();                 // 初始化消息拉取API对象                 initPullAPIWrapper();                 // 初始化拉取偏移量                 initOffsetStore();                 // 启动MQClientInstance                 mQClientFactory.start();                 // 启动一些定时任务                 startScheduleTask();                 this.serviceState = ServiceState.RUNNING;                 log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());                 operateAfterRunning();                 break;             case RUNNING:             case START_FAILED:             case SHUTDOWN_ALREADY:                 throw new MQClientException("The PullConsumer service state not OK, maybe started once, "                     + this.serviceState                     + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),                     null);             default:                 break;         }     } } 

负载均衡

拉取模式对应的负载均衡类为RebalanceLitePullImpl(推模式使用的是RebalanceService),在initRebalanceImpl方法中设置了消费者组、消费模式、分配策略等信息:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {          // 实例化,拉模式使用的是RebalanceLitePullImpl     private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);      private void initRebalanceImpl() {         // 设置消费者组         this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());         // 设置消费模式         this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());         // 设置分配策略         this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());         // 设置mQClientFactory         this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);     } } 

【RocketMQ】消息的拉取一文中已经讲到过,消费者启动后会进行负载均衡,对每个主题进行负载均衡,拉模式下处理逻辑也是如此,所以这里跳过中间的过程,进入到rebalanceByTopic方法,可以负载均衡之后如果消费者负载的ProcessQueue发生了变化,会调用messageQueueChanged方法触发变更事件:

public abstract class RebalanceImpl {      private void rebalanceByTopic(final String topic, final boolean isOrder) {         switch (messageModel) {             case BROADCASTING: {                 // ...             }             case CLUSTERING: {                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);                 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);                 // ...                 if (mqSet != null && cidAll != null) {                     // ...                     try {                         // 分配消息队列                         allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);                     } catch (Throwable e) {                         log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),                             e);                         return;                     }                      Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();                     if (allocateResult != null) {                         allocateResultSet.addAll(allocateResult);                     }                     // 更新处理队列                     boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);                     if (changed) {                         log.info(                             "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",                             strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),                             allocateResultSet.size(), allocateResultSet);                         // 触发变更事件                         this.messageQueueChanged(topic, mqSet, allocateResultSet);                     }                 }                 break;             }             default:                 break;         }     } } 

触发消息队列变更事件

RebalanceLitePullImplmessageQueueChanged方法中又调用了MessageQueueListenermessageQueueChanged方法触发消息队列改变事件:

public class RebalanceLitePullImpl extends RebalanceImpl {     @Override     public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {         MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();         if (messageQueueListener != null) {             try {                 // 触发改变事件                 messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);             } catch (Throwable e) {                 log.error("messageQueueChanged exception", e);             }         }     } } 

MessageQueueListenerImplDefaultLitePullConsumerImpl的内部类,在messageQueueChanged方法中,不管是广播模式还是集群模式,都会调用updatePullTask更新拉取任务:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {     class MessageQueueListenerImpl implements MessageQueueListener {         @Override         public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {             MessageModel messageModel = defaultLitePullConsumer.getMessageModel();             switch (messageModel) {                 case BROADCASTING:                     updateAssignedMessageQueue(topic, mqAll);                     updatePullTask(topic, mqAll); // 更新拉取任务                     break;                 case CLUSTERING:                     updateAssignedMessageQueue(topic, mqDivided);                     updatePullTask(topic, mqDivided); // 更新拉取任务                     break;                 default:                     break;             }         }     } } 

更新拉取任务

在updatePullTask方法中,从拉取任务表taskTable中取出了所有的拉取任务进行遍历,taskTable中记录了之前分配的拉取任务,负载均衡之后可能发生变化,所以需要对其进行更新,这一步主要是处理原先分配给当前消费者的消息队列,在负载均衡之后不再由当前消费者负责,所以需要从taskTable中删除,之后调用startPullTask启动拉取任务:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {     private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =         new ConcurrentHashMap<MessageQueue, PullTaskImpl>();      private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {         // 从拉取任务表中获取之前分配的消息队列进行遍历         Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();         while (it.hasNext()) {             Map.Entry<MessageQueue, PullTaskImpl> next = it.next();             // 如果与重新进行负载均衡的主题一致             if (next.getKey().getTopic().equals(topic)) {                 // 如果重新分配的消息队列集合中不包含此消息独立                 if (!mqNewSet.contains(next.getKey())) {                     next.getValue().setCancelled(true);                     // 从任务表移除                     it.remove();                 }             }         }         // 启动拉取任务         startPullTask(mqNewSet);     } } 

提交拉取任务

startPullTask方法入参中传入的是负载均衡后重新分配的消息队列集合,在startPullTask中会对重新分配的集合进行遍历,如果taskTable中不包含某个消息队列,就构建PullTaskImpl对象,加入taskTable,这一步主要是处理负载均衡后新增的消息队列,为其构建PullTaskImpl加入到taskTable,之后将拉取消息的任务PullTaskImpl提交到线程池周期性的执行:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {      private void startPullTask(Collection<MessageQueue> mqSet) {         // 遍历最新分配的消息队列集合         for (MessageQueue messageQueue : mqSet) {             // 如果任务表中不包含             if (!this.taskTable.containsKey(messageQueue)) {                 // 创建拉取任务                 PullTaskImpl pullTask = new PullTaskImpl(messageQueue);                 // 加入到任务表                 this.taskTable.put(messageQueue, pullTask);                 // 将任务提交到线程池定时执行                 this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);             }         }     } } 

拉取消息

PullTaskImpl继承了Runnable,在run方法中的处理逻辑如下:

  1. 获取消息队列对应处理队列ProcessQueue;
  2. 获取消息拉取偏移量,也就是从何处开始拉取消息;
  3. 调用pull方法进行消息拉取;
  4. 判断拉取结果,如果拉取到了消息,将拉取到的结果封装为ConsumeRequest进行提交,也就是放到了阻塞队列中,后续消费者从队列中获取数据进行消费;
   public class PullTaskImpl implements Runnable {         private final MessageQueue messageQueue;         private volatile boolean cancelled = false;         private Thread currentThread;          @Override         public void run() {             // 如果未取消             if (!this.isCancelled()) {                 this.currentThread = Thread.currentThread();                 // ...                 // 获取消息队列对应的ProcessQueue                 ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);                 // ...  跳过一系列校验                 long offset = 0L;                 try {                     // 获取拉取偏移量                     offset = nextPullOffset(messageQueue);                 } catch (Exception e) {                     log.error("Failed to get next pull offset", e);                     scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);                     return;                 }                  if (this.isCancelled() || processQueue.isDropped()) {                     return;                 }                 long pullDelayTimeMills = 0;                 try {                     SubscriptionData subscriptionData;                     // 获取主题                     String topic = this.messageQueue.getTopic();                     // 获取主题对应的订阅信息SubscriptionData                     if (subscriptionType == SubscriptionType.SUBSCRIBE) {                         subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);                     } else {                         subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);                     }                     // 拉取消息                     PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());                     if (this.isCancelled() || processQueue.isDropped()) {                         return;                     }                     // 判断拉取结果                     switch (pullResult.getPullStatus()) {                         case FOUND: // 如果获取到了数据                             final Object objLock = messageQueueLock.fetchLockObject(messageQueue);                             synchronized (objLock) { // 加锁                                 if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {                                     processQueue.putMessage(pullResult.getMsgFoundList());                                     // 将拉取结果封装为ConsumeRequest,提交消费请求                                     submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));                                 }                             }                             break;                         case OFFSET_ILLEGAL:                             log.warn("The pull request offset illegal, {}", pullResult.toString());                             break;                         default:                             break;                     }                     updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);                 } catch (InterruptedException interruptedException) {                     log.warn("Polling thread was interrupted.", interruptedException);                 } catch (Throwable e) {                     pullDelayTimeMills = pullTimeDelayMillsWhenException;                     log.error("An error occurred in pull message process.", e);                 }                 // ...             }         }     } 

submitConsumeRequest方法中可以看到将创建的ConsumeRequest对象放入了阻塞队列consumeRequestCache中:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {     // 阻塞队列     private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();      private void submitConsumeRequest(ConsumeRequest consumeRequest) {         try {             // 放入阻塞队列consumeRequestCache中             consumeRequestCache.put(consumeRequest);         } catch (InterruptedException e) {             log.error("Submit consumeRequest error", e);         }     } } 

消息消费

在前面的例子中,可以看到消费者是调用poll方法获取数据的,进入到poll方法中,可以看到是从consumeRequestCache中获取消费请求的,然后从中解析出消息内容返回:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {          public synchronized List<MessageExt> poll(long timeout) {         try {             // ...             long endTime = System.currentTimeMillis() + timeout;             // 从consumeRequestCache中获取数据进行处理             ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);             // ...             if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {                 // 获取消息内容                 List<MessageExt> messages = consumeRequest.getMessageExts();                 long offset = consumeRequest.getProcessQueue().removeMessage(messages);                 assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);                 this.resetTopic(messages);                 // 返回消息内容                 return messages;             }         } catch (InterruptedException ignore) {         }         return Collections.emptyList();     } } 

参考

RocketMQ源码分析之pull模式consumer

RocketMQ版本:4.9.3

发表评论

相关文章