RocketMQ有两种获取消息的方式,分别为推模式和拉模式。
推模式
推模式在【RocketMQ】消息的拉取一文中已经讲过,虽然从名字上看起来是消息到达Broker后推送给消费者,实际上还是需要消费向Broker发送拉取请求获取消息内容,推模式对应的消息消费实现类为DefaultMQPushConsumerImpl
,回顾一下推模式下的消息消费过程:
- 消费者在启动的时候做一些初始化工作,它会创建MQClientInstance并进行启动;
MQClientInstance
中引用了消息拉取服务PullMessageService
和负载均衡服务RebalanceService
,它们都继承了ServiceThread,MQClientInstance在启动后也会对它们进行启动,所以消息拉取线程和负载均衡线程也就启动了;- 负载均衡服务启动后,会对该消费者订阅的主题进行负载均衡,为消费者分配消息队列,并创建
PullRequest
拉取请求,用于拉取消息; PullMessageService
中等待阻塞队列中PullRequest
拉取请求的到来,接着会调用DefaultMQPushConsumerImpl
的pullMessage
方法进行消息拉取;- 消费者向Broker发送拉取消息的请求,从Broker拉取消息;
- 消费者对Broker返回的响应数据进行处理,解析消息进行消费;
推模式下进行消息消费的例子:
@RunWith(MockitoJUnitRunner.class) public class DefaultMQPushConsumerTest { private String consumerGroup; private String topic = "FooBar"; private String brokerName = "BrokerA"; private MQClientInstance mQClientFactory; @Mock private MQClientAPIImpl mQClientAPIImpl; private static DefaultMQPushConsumer pushConsumer; @Before public void init() throws Exception { // ... // 消费者组 consumerGroup = "FooBarGroup" + System.currentTimeMillis(); // 实例化DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); // 设置拉取间隔 pushConsumer.setPullInterval(60 * 1000); // 注册消息监听器 pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { Optional.ofNullable(result).orElse(new ArrayList<MessageExt>()).stream().forEach(x-> { // 处理消息 System.out.println(new String(x.getBody())); }); return null; } }); // ... // 设置订阅的主题 pushConsumer.subscribe(topic, "*"); // 启动消费者 pushConsumer.start(); } }
消息推模式的详细过程可参考【RocketMQ】消息的拉取,接下来我们看一下拉模式。
拉模式
首先来看一下拉模式下进行消息消费的例子,拉模式下需要消费者不断调用poll方法获取消息,底层是一个阻塞队列,如果队列中没有数据,会进入等待直到队列中增加了数据:
private void testPull() { // 创建DefaultLitePullConsumer DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumerGroup");; try { litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.subscribe("LitePullConsumerTest", "*"); litePullConsumer.start(); litePullConsumer.setPollTimeoutMillis(20 * 1000); while(true) { // 获取消息 List<MessageExt> result = litePullConsumer.poll(); Optional.ofNullable(result).orElse(new ArrayList<MessageExt>()).stream().forEach(x-> { // 处理消息 System.out.println(new String(x.getBody())); }); } } catch (Exception e) { e.printStackTrace(); } finally { litePullConsumer.shutdown(); } }
推模式与拉模式的区别
对比上面推模式进行消费的例子,从使用方式上来讲,推模式不需要消费者主动去拉取消息,只需要注册消息监听器,当有消息到达时,触发consumeMessage方法进行消息消费,从表面上看就像是Broker主动推送给消费者一样,所以叫做推模式,尽管底层还是需要消费者发起拉取请求向Broker拉取消息。
拉模式在使用方式上,需要消费者主动调用poll方法获取消息,从表面上看消费者需要不断主动进行消息拉取,所以叫做拉模式。
拉模式实现原理
拉模式下对应的消息拉取实现类为DefaultLitePullConsumerImpl
,在DefaultLitePullConsumer
(DefaultMQPullConsumer
被标注了@Deprecated,已不推荐使用)的构造函数中,可以看到对其进行了实例化,并在start方进行了启动:
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer { // 拉模式下默认的消息拉取实现类 private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl; public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) { this.namespace = namespace; this.consumerGroup = consumerGroup; // 创建DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); } @Override public void start() throws MQClientException { setTraceDispatcher(); setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); // 启动DefaultLitePullConsumerImpl this.defaultLitePullConsumerImpl.start(); // ... } }
与消息推模式类似,DefaultLitePullConsumerImpl
的start的方法主要做一些初始化的工作:
- 初始化客户端实例对象mQClientFactory,对应实现类为
MQClientInstance
,拉取服务线程、负载均衡线程都是通过MQClientInstance
启动的; - 初始化负载均衡类,拉模式对应的负载均衡类为
RebalanceLitePullImpl
; - 创建消息拉取API对象
PullAPIWrapper
,用于向Broker发送拉取消息的请求; - 初始化消息拉取偏移量;
- 启动一些定时任务;
public class DefaultLitePullConsumerImpl implements MQConsumerInner { public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultLitePullConsumer.changeInstanceNameToPID(); } // 初始化MQClientInstance initMQClientFactory(); // 初始化负载均衡 initRebalanceImpl(); // 初始化消息拉取API对象 initPullAPIWrapper(); // 初始化拉取偏移量 initOffsetStore(); // 启动MQClientInstance mQClientFactory.start(); // 启动一些定时任务 startScheduleTask(); this.serviceState = ServiceState.RUNNING; log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup()); operateAfterRunning(); break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } } }
负载均衡
拉取模式对应的负载均衡类为RebalanceLitePullImpl
(推模式使用的是RebalanceService
),在initRebalanceImpl方法中设置了消费者组、消费模式、分配策略等信息:
public class DefaultLitePullConsumerImpl implements MQConsumerInner { // 实例化,拉模式使用的是RebalanceLitePullImpl private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this); private void initRebalanceImpl() { // 设置消费者组 this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup()); // 设置消费模式 this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel()); // 设置分配策略 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy()); // 设置mQClientFactory this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); } }
在【RocketMQ】消息的拉取一文中已经讲到过,消费者启动后会进行负载均衡,对每个主题进行负载均衡,拉模式下处理逻辑也是如此,所以这里跳过中间的过程,进入到rebalanceByTopic
方法,可以负载均衡之后如果消费者负载的ProcessQueue发生了变化,会调用messageQueueChanged方法触发变更事件:
public abstract class RebalanceImpl { private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { // ... } case CLUSTERING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); // ... if (mqSet != null && cidAll != null) { // ... try { // 分配消息队列 allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } // 更新处理队列 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); // 触发变更事件 this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } } }
触发消息队列变更事件
RebalanceLitePullImpl
的messageQueueChanged
方法中又调用了MessageQueueListener
的messageQueueChanged
方法触发消息队列改变事件:
public class RebalanceLitePullImpl extends RebalanceImpl { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener(); if (messageQueueListener != null) { try { // 触发改变事件 messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); } catch (Throwable e) { log.error("messageQueueChanged exception", e); } } } }
MessageQueueListenerImpl
是DefaultLitePullConsumerImpl
的内部类,在messageQueueChanged
方法中,不管是广播模式还是集群模式,都会调用updatePullTask
更新拉取任务:
public class DefaultLitePullConsumerImpl implements MQConsumerInner { class MessageQueueListenerImpl implements MessageQueueListener { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { MessageModel messageModel = defaultLitePullConsumer.getMessageModel(); switch (messageModel) { case BROADCASTING: updateAssignedMessageQueue(topic, mqAll); updatePullTask(topic, mqAll); // 更新拉取任务 break; case CLUSTERING: updateAssignedMessageQueue(topic, mqDivided); updatePullTask(topic, mqDivided); // 更新拉取任务 break; default: break; } } } }
更新拉取任务
在updatePullTask方法中,从拉取任务表taskTable中取出了所有的拉取任务进行遍历,taskTable中记录了之前分配的拉取任务,负载均衡之后可能发生变化,所以需要对其进行更新,这一步主要是处理原先分配给当前消费者的消息队列,在负载均衡之后不再由当前消费者负责,所以需要从taskTable中删除,之后调用startPullTask启动拉取任务:
public class DefaultLitePullConsumerImpl implements MQConsumerInner { private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable = new ConcurrentHashMap<MessageQueue, PullTaskImpl>(); private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) { // 从拉取任务表中获取之前分配的消息队列进行遍历 Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator(); while (it.hasNext()) { Map.Entry<MessageQueue, PullTaskImpl> next = it.next(); // 如果与重新进行负载均衡的主题一致 if (next.getKey().getTopic().equals(topic)) { // 如果重新分配的消息队列集合中不包含此消息独立 if (!mqNewSet.contains(next.getKey())) { next.getValue().setCancelled(true); // 从任务表移除 it.remove(); } } } // 启动拉取任务 startPullTask(mqNewSet); } }
提交拉取任务
startPullTask方法入参中传入的是负载均衡后重新分配的消息队列集合,在startPullTask中会对重新分配的集合进行遍历,如果taskTable中不包含某个消息队列,就构建PullTaskImpl对象,加入taskTable,这一步主要是处理负载均衡后新增的消息队列,为其构建PullTaskImpl加入到taskTable,之后将拉取消息的任务PullTaskImpl提交到线程池周期性的执行:
public class DefaultLitePullConsumerImpl implements MQConsumerInner { private void startPullTask(Collection<MessageQueue> mqSet) { // 遍历最新分配的消息队列集合 for (MessageQueue messageQueue : mqSet) { // 如果任务表中不包含 if (!this.taskTable.containsKey(messageQueue)) { // 创建拉取任务 PullTaskImpl pullTask = new PullTaskImpl(messageQueue); // 加入到任务表 this.taskTable.put(messageQueue, pullTask); // 将任务提交到线程池定时执行 this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS); } } } }
拉取消息
PullTaskImpl继承了Runnable,在run方法中的处理逻辑如下:
- 获取消息队列对应处理队列ProcessQueue;
- 获取消息拉取偏移量,也就是从何处开始拉取消息;
- 调用
pull
方法进行消息拉取; - 判断拉取结果,如果拉取到了消息,将拉取到的结果封装为
ConsumeRequest
进行提交,也就是放到了阻塞队列中,后续消费者从队列中获取数据进行消费;
public class PullTaskImpl implements Runnable { private final MessageQueue messageQueue; private volatile boolean cancelled = false; private Thread currentThread; @Override public void run() { // 如果未取消 if (!this.isCancelled()) { this.currentThread = Thread.currentThread(); // ... // 获取消息队列对应的ProcessQueue ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); // ... 跳过一系列校验 long offset = 0L; try { // 获取拉取偏移量 offset = nextPullOffset(messageQueue); } catch (Exception e) { log.error("Failed to get next pull offset", e); scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS); return; } if (this.isCancelled() || processQueue.isDropped()) { return; } long pullDelayTimeMills = 0; try { SubscriptionData subscriptionData; // 获取主题 String topic = this.messageQueue.getTopic(); // 获取主题对应的订阅信息SubscriptionData if (subscriptionType == SubscriptionType.SUBSCRIBE) { subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); } else { subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL); } // 拉取消息 PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize()); if (this.isCancelled() || processQueue.isDropped()) { return; } // 判断拉取结果 switch (pullResult.getPullStatus()) { case FOUND: // 如果获取到了数据 final Object objLock = messageQueueLock.fetchLockObject(messageQueue); synchronized (objLock) { // 加锁 if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) { processQueue.putMessage(pullResult.getMsgFoundList()); // 将拉取结果封装为ConsumeRequest,提交消费请求 submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue)); } } break; case OFFSET_ILLEGAL: log.warn("The pull request offset illegal, {}", pullResult.toString()); break; default: break; } updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue); } catch (InterruptedException interruptedException) { log.warn("Polling thread was interrupted.", interruptedException); } catch (Throwable e) { pullDelayTimeMills = pullTimeDelayMillsWhenException; log.error("An error occurred in pull message process.", e); } // ... } } }
在submitConsumeRequest
方法中可以看到将创建的ConsumeRequest
对象放入了阻塞队列consumeRequestCache
中:
public class DefaultLitePullConsumerImpl implements MQConsumerInner { // 阻塞队列 private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>(); private void submitConsumeRequest(ConsumeRequest consumeRequest) { try { // 放入阻塞队列consumeRequestCache中 consumeRequestCache.put(consumeRequest); } catch (InterruptedException e) { log.error("Submit consumeRequest error", e); } } }
消息消费
在前面的例子中,可以看到消费者是调用poll
方法获取数据的,进入到poll
方法中,可以看到是从consumeRequestCache中获取消费请求的,然后从中解析出消息内容返回:
public class DefaultLitePullConsumerImpl implements MQConsumerInner { public synchronized List<MessageExt> poll(long timeout) { try { // ... long endTime = System.currentTimeMillis() + timeout; // 从consumeRequestCache中获取数据进行处理 ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); // ... if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) { // 获取消息内容 List<MessageExt> messages = consumeRequest.getMessageExts(); long offset = consumeRequest.getProcessQueue().removeMessage(messages); assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset); this.resetTopic(messages); // 返回消息内容 return messages; } } catch (InterruptedException ignore) { } return Collections.emptyList(); } }
参考
RocketMQ版本:4.9.3