开心一刻
今天坐在太阳下刷着手机
老妈走过来问我:这么好的天气,怎么没出去玩
我:我要是有钱,你都看不见我的影子
老妈:你就不知道带个碗,别要边玩?
我:......
优先级队列
说到队列,相信大家一定不陌生,是一种很基础的数据结构,它有一个很重要的特点:先进先出
但说到优先级队列,可能有些小伙伴还不清楚,因为接触的不多嘛
示例基于: RabbitMQ 3.9.11
业务场景
我手头上正好有一个项目,系统之间通过 RabbitMQ 通信,调度系统 是消息生产者, 文件生成系统 是消息消费者
默认情况下,先下发的消息会先被消费,也就是先进队列的消息会先出队列
业务高峰期,重要程度不同的文件都需要生成,那如何保证重要文件先生成了?
1、调整调度
1.1 将重要文件的调度提前,保证重要文件的消息先进入队列;但需要考虑调度能否提前,如果生成文件依赖的上游数据还未就绪了?
1.2 将普通文件的调度延后,有点围魏救赵的感觉,万一某一天不需要生成重要文件,那服务器岂不是有一段时间的空置期,而这段空置期本可以生成普通文件
总的来说就是不够灵活:有重要文件的时候先生成重要文件,没有重要文件的时候生成普通文件
2、提高服务器配置
这个就不用过多解释了把,加大 文件生成系统 的硬件配置,提高其文件生成能力
保证文件(不论重要还是普通)都能在调度的时间开始生成,也就无需区分重要与普通了
那么重要文件先生成这个命题就不成立了
想想都美,可实际情况,大家都懂的!
3、优先级队列
RabbitMQ 的 Priority Queue 非常契合这个业务场景,详情请往下看
队列优先级
相较于普通队列,优先级队列肯定有一个标志来标明它是一个优先级队列
这个标志就是参数: x-max-priority ,定义优先级的最大值
我们先来看下 RabbitMQ 控制台如何配置
相关参数配置好之后,点击 Add queue 即创建出了一个 优先级队列
创建完成之后,你会发现队列上有一个 Pri 标志,说明这是一个优先级队列
实际开发工程中,一般不会在 RabbitMQ 控制台创建队列,往往是服务启动的时候,通过服务自动创建 exchange 、 queue
实现也非常简单
@Configuration public class RabbitConfig { @Bean public DirectExchange directExchange() { return new DirectExchange(QSL_EXCHANGE, true, false); } @Bean public Queue queue() { Map<String, Object> args = new HashMap<>(); args.put("x-max-priority", 5); return new Queue(QSL_QUEUE, true, false, false, args); } @Bean public Binding bindingQueue() { return BindingBuilder.bind(queue()).to(directExchange()).with("com.qsl"); } }
View Code
服务启动成功后,我们可以在 RabbitMQ 控制台看到队列: com.qsl.queue ,其 x-max-priority 等于 5
消息优先级
消息属性 priority 可以指定消息的优先级
停止服务后,我们手动往队列 com.qsl.queue 中放一些带有优先级的消息
优先级分别是: 3,1,5,5,10,4 对应的消息体分别是: 3,1,5_1,5_2,10,4
此时队列中共有 6 个消息准备就绪
启动服务,进行消息消费,消费顺序如下
可以总结出一个规律:优先级高的先出队列,优先级相同的,先进先出
那优先级是 10 的那个消息是什么情况,它为什么不是第一个出队?
因为队列 com.qsl.queue 的最大优先级是 5,即使消息的优先级设置成 10,其实际优先级也只有 5,这样是不是就理解了?
实际开发工程中,基本不会在 RabbitMQ 控制台手动发消息,肯定是由服务发送消息
我们模拟下带有优先级的消息发送
是不是 so easy !
x-max-priority
值支持范围是 1 ~ 255 ,推荐使用 1 ~ 5 之间的值,如果需要更高的优先级则推荐 1 ~ 10
1 ~ 10 已经足够使用,不推荐使用更高的优先级,更高的优先级值需要更多的 CPU 和 内存资源
没有设置优先级的消息将被视为优先级为 0,优先级高于队列最大优先级的消息将被视为以队列最大优先级发布的消息
数据结构
底层数据结构:堆
具体请看:数据结构之堆 → 不要局限于堆排序
ACK超时
之前一直不知道这一点,直到有一次碰到了如下异常
一查才知道ACK超时了
超时异常
从消费者获取到消息(消息投递成功)开始,在超时时间(默认30分钟)内未确认回复,则关闭通道,并抛出 PRECONDITION_FAILED 通道异常
并且消息会重新进入队列,等待再次被消费
ACK超时的配置项: consumer_timeout ,默认值是 1800000 ,单位是毫秒,也就是 30 分钟
可用命令 rabbitmqctl eval 'application:get_env(rabbit,consumer_timeout).' 查看
判断是否ACK超时的调度间隔是一分钟,所以 consumer_timeout 不支持低于一分钟的值,也不建议低于五分钟的值
我们将 consumer_timeout 调整成 2 分钟,看看超时异常
有 2 种调整方式
1、修改 /etc/rabbitmq.conf
配置文件没有则新建,然后在配置文件中将 consumer_timeout 设置成 120000 (没有该配置项则新增)
然后重启 rabbitmq
2、动态修改
执行命令 rabbitmqctl eval 'application:set_env(rabbit,consumer_timeout,120000).' 即可
不需要重启 rabbitmq
需要注意的是,这种修改不是永久生效,一旦 rabbitmq 重启, consumer_timeout 将会恢复到默认值
我们用第 2 种方式进行调整
然后我们在消费端睡眠 3 分钟后进行ACK
最后在 rabbitmq 控制台手动发送一个消息,异常信息如下
2024-02-15 13:08:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1|com.qsl.rabbit.listener.TestListener|INFO|28|消费者接收到消息:6 2024-02-15 13:10:47|AMQP Connection 192.168.3.225:5672|org.springframework.amqp.rabbit.connection.CachingConnectionFactory|ERROR|1575|Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 120000 ms. This timeout value can be configured, see consumers doc guide to learn more, class-id=0, method-id=0) 2024-02-15 13:11:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1|com.qsl.rabbit.listener.TestListener|ERROR|33|消息确认异常: java.lang.IllegalStateException: Channel closed; cannot ack/nack at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1175) at com.sun.proxy.$Proxy50.basicAck(Unknown Source) at com.qsl.rabbit.listener.TestListener.onMessage(TestListener.java:31) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:53) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:220) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202) at java.lang.Thread.run(Thread.java:745) 2024-02-15 13:11:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|INFO|1436|Restarting Consumer@2e6f610d: tags=[[amq.ctag-hE7fVqLNKO44ytMHalsf2A]], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@192.168.3.225:5672/,1), conn: Proxy@3b1ed14b Shared Rabbit Connection: SimpleConnection@13275d8 [delegate=amqp://admin@192.168.3.225:5672/, localPort= 55710], acknowledgeMode=MANUAL local queue size=0 2024-02-15 13:11:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2|com.qsl.rabbit.listener.TestListener|INFO|28|消费者接收到消息:6
View Code
从 RabbitMQ 3.12 开始,可以为每个队列配置过期时长,而之前只能为每个 Rabbit 节点配置过期时长
如何处理
如果碰到ACK超时,那么我们该如何处理
1、增加超时时长
这往往是最容易想到的,默认 30 分钟不行就改成 60 分钟嘛
但并不是无脑增加超时时长,默认值往往是综合情况下比较优的一个值,并不推荐加长
2、异步处理
用线程池处理异步处理消息,及时进行消息ACK
但需要考虑拒绝策略,如果用的是: CallerRunsPolicy ,还是有可能触发ACK超时
3、幂等处理
消息消费做幂等处理,是规范,而不仅仅只是针对ACK超时
消息正在消费中,或者已经消费完成,这个消息就不应该再次被消费,可以打印日志然后直接ACK,而无需进行业务处理
4、自动ACK
虽然自动ACK可以简化消息确认的流程,但它也可能带来一些潜在的问题,例如:
消息丢失风险:自动ACK意味着一旦消费者接收到消息, RabbitMQ 就会将其从队列中移除。如果消费者在处理消息时发生故障或崩溃,未处理的消息可能会丢失
限流作用减弱:ACK机制可以帮助限流,即通过控制ACK的发送速率来限制消费者处理消息的速度。如果使用自动ACK,这种限流作用会减弱,可能导致消费者过快地消费消息,超出其实际处理能力
缺乏灵活性:自动ACK不允许消费者在处理完消息后再决定是否要确认消息,这限制了消费者的灵活性。例如,消费者可能需要根据消息内容或处理结果来决定是否重新入队或丢弃消息
等等
总之,自动ACK慎用
具体如何处理,需要结合具体业务,选择比较合适的方式
总结
优先级队列
通过配置 x-max-priority 参数标明队列是优先级队列
队列的优先级取值范围推荐 1 ~ 5 ,不推荐超过 10
通过属性 priority 可以指定消息的优先级,没有设置优先级的消息将被视为优先级为 0,优先级高于队列最大优先级的消息将被视为以队列最大优先级发布的消息
优先级高的消息先出队列(先被处理),优先级低的消息后出队列(后被处理),优先级相同的则是先进先出
ACK超时
ACK超时是一种保护机制,其实可以类比 HTTP 请求超时、数据库连接查询超时
RabbitMQ 的ACK超时默认是 30 分钟,可以修改配置项 consumer_timeout 进行调整
至于如何避免ACK超时,需要结合具体的业务选择合适的方式