我工作中用MQ的10种场景

前言

最近有球友问我:MQ的使用场景有哪些?工作中一定要使用MQ吗?

记得刚工作那会儿,我总是想不明白:为什么明明直接调用接口就能完成的功能,非要引入MQ这么个"中间商"?

直到经历了系统崩溃、数据丢失、性能瓶颈等一系列问题后,我才真正理解了MQ的价值。

今天我想和大家分享我在实际工作中使用消息队列(MQ)的10种典型场景,希望对你会有所帮助。

一、为什么需要消息队列(MQ)?

在深入具体场景之前,我们先来思考一个基本问题:为什么要使用消息队列?

系统间的直接调用:
我工作中用MQ的10种场景

引入消息队列后:
我工作中用MQ的10种场景

接下来我们将通过10个具体场景,带大家来深入理解MQ的价值。

场景一:系统解耦

背景描述

在我早期参与的一个电商项目中,订单创建后需要通知多个系统:

// 早期的紧耦合设计 public class OrderService {     private InventoryService inventoryService;     private PointsService pointsService;     private EmailService emailService;     private AnalyticsService analyticsService;          public void createOrder(Order order) {         // 1. 保存订单         orderDao.save(order);                  // 2. 调用库存服务         inventoryService.updateInventory(order);                  // 3. 调用积分服务         pointsService.addPoints(order.getUserId(), order.getAmount());                  // 4. 发送邮件通知         emailService.sendOrderConfirmation(order);                  // 5. 记录分析数据         analyticsService.trackOrderCreated(order);                  // 更多服务...     } } 

这种架构存在严重问题:

  • 紧耦合:订单服务需要知道所有下游服务
  • 单点故障:任何一个下游服务挂掉都会导致订单创建失败
  • 性能瓶颈:同步调用导致响应时间慢

MQ解决方案

引入MQ后,架构变为:
我工作中用MQ的10种场景

代码实现

// 订单服务 - 生产者 @Service public class OrderService {     @Autowired     private RabbitTemplate rabbitTemplate;          public void createOrder(Order order) {         // 1. 保存订单         orderDao.save(order);                  // 2. 发送消息到MQ         rabbitTemplate.convertAndSend(             "order.exchange",             "order.created",             new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount())         );     } }  // 库存服务 - 消费者 @Component @RabbitListener(queues = "inventory.queue") public class InventoryConsumer {     @Autowired     private InventoryService inventoryService;          @RabbitHandler     public void handleOrderCreated(OrderCreatedEvent event) {         inventoryService.updateInventory(event.getOrderId());     } } 

技术要点

  1. 消息协议选择:根据业务需求选择RabbitMQ、Kafka或RocketMQ
  2. 消息格式:使用JSON或Protobuf等跨语言格式
  3. 错误处理:实现重试机制和死信队列

场景二:异步处理

背景描述

用户上传视频后需要执行转码、生成缩略图、内容审核等耗时操作,如果同步处理,用户需要等待很长时间。

MQ解决方案

// 视频服务 - 生产者 @Service public class VideoService {     @Autowired     private KafkaTemplate<String, Object> kafkaTemplate;          public UploadResponse uploadVideo(MultipartFile file, String userId) {         // 1. 保存原始视频         String videoId = saveOriginalVideo(file);                  // 2. 发送处理消息         kafkaTemplate.send("video-processing", new VideoProcessingEvent(videoId, userId));                  // 3. 立即返回响应         return new UploadResponse(videoId, "upload_success");     } }  // 视频处理服务 - 消费者 @Service public class VideoProcessingConsumer {     @KafkaListener(topics = "video-processing")     public void processVideo(VideoProcessingEvent event) {         // 异步执行耗时操作         videoProcessor.transcode(event.getVideoId());         videoProcessor.generateThumbnails(event.getVideoId());         contentModerationService.checkContent(event.getVideoId());                  // 发送处理完成通知         notificationService.notifyUser(event.getUserId(), event.getVideoId());     } } 

架构优势

  1. 快速响应:用户上传后立即得到响应
  2. 弹性扩展:可以根据处理压力动态调整消费者数量
  3. 故障隔离:处理服务故障不会影响上传功能

场景三:流量削峰

背景描述

电商秒杀活动时,瞬时流量可能是平时的百倍以上,直接冲击数据库和服务。

MQ解决方案

我工作中用MQ的10种场景

代码实现

// 秒杀服务 @Service public class SecKillService {     @Autowired     private RedisTemplate<String, Object> redisTemplate;          @Autowired     private RabbitTemplate rabbitTemplate;          public SecKillResponse secKill(SecKillRequest request) {         // 1. 校验用户资格         if (!checkUserQualification(request.getUserId())) {             return SecKillResponse.failed("用户无资格");         }                  // 2. 预减库存(Redis原子操作)         Long remaining = redisTemplate.opsForValue().decrement(             "sec_kill_stock:" + request.getItemId());                  if (remaining == null || remaining < 0) {             // 库存不足,恢复库存             redisTemplate.opsForValue().increment("sec_kill_stock:" + request.getItemId());             return SecKillResponse.failed("库存不足");         }                  // 3. 发送秒杀成功消息到MQ         rabbitTemplate.convertAndSend(             "sec_kill.exchange",             "sec_kill.success",             new SecKillSuccessEvent(request.getUserId(), request.getItemId())         );                  return SecKillResponse.success("秒杀成功");     } }  // 订单处理消费者 @Component @RabbitListener(queues = "sec_kill.order.queue") public class SecKillOrderConsumer {     @RabbitHandler     public void handleSecKillSuccess(SecKillSuccessEvent event) {         // 异步创建订单         orderService.createSecKillOrder(event.getUserId(), event.getItemId());     } } 

技术要点

  1. 库存预扣:使用Redis原子操作避免超卖
  2. 队列缓冲:MQ缓冲请求,避免直接冲击数据库
  3. 限流控制:在网关层进行限流,拒绝过多请求

场景四:数据同步

背景描述

在微服务架构中,不同服务有自己的数据库,需要保证数据一致性。

MQ解决方案

// 用户服务 - 数据变更时发送消息 @Service public class UserService {     @Transactional     public User updateUser(User user) {         // 1. 更新数据库         userDao.update(user);                  // 2. 发送消息(在事务内)         rocketMQTemplate.sendMessageInTransaction(             "user-update-topic",             MessageBuilder.withPayload(new UserUpdateEvent(user.getId(), user.getStatus()))                 .build(),             null         );                  return user;     } }  // 其他服务 - 消费用户更新消息 @Service @RocketMQMessageListener(topic = "user-update-topic", consumerGroup = "order-group") public class UserUpdateConsumer implements RocketMQListener<UserUpdateEvent> {     @Override     public void onMessage(UserUpdateEvent event) {         // 更新本地用户信息缓存         orderService.updateUserCache(event.getUserId(), event.getStatus());     } } 

一致性保证

  1. 本地事务表:将消息和业务数据放在同一个数据库事务中
  2. 事务消息:使用RocketMQ的事务消息机制
  3. 幂等消费:消费者实现幂等性,避免重复处理

场景五:日志收集

背景描述

分布式系统中,日志分散在各个节点,需要集中收集和分析。

MQ解决方案

我工作中用MQ的10种场景

代码实现

// 日志收集组件 @Component public class LogCollector {     @Autowired     private KafkaTemplate<String, String> kafkaTemplate;          public void collectLog(String appId, String level, String message, Map<String, Object> context) {         LogEvent logEvent = new LogEvent(appId, level, message, context, System.currentTimeMillis());                  // 发送到Kafka         kafkaTemplate.send("app-logs", appId, JsonUtils.toJson(logEvent));     } }  // 日志消费者 @Service public class LogConsumer {     @KafkaListener(topics = "app-logs", groupId = "log-es")     public void consumeLog(String message) {         LogEvent logEvent = JsonUtils.fromJson(message, LogEvent.class);                  // 存储到Elasticsearch         elasticsearchService.indexLog(logEvent);                  // 实时监控检查         if ("ERROR".equals(logEvent.getLevel())) {             alertService.checkAndAlert(logEvent);         }     } } 

技术优势

  1. 解耦:应用节点无需关心日志如何处理
  2. 缓冲:应对日志产生速率波动
  3. 多消费:同一份日志可以被多个消费者处理

场景六:消息广播

背景描述

系统配置更新后,需要通知所有服务节点更新本地配置。

MQ解决方案

// 配置服务 - 广播配置更新 @Service public class ConfigService {     @Autowired     private RedisTemplate<String, Object> redisTemplate;          public void updateConfig(String configKey, String configValue) {         // 1. 更新配置存储         configDao.updateConfig(configKey, configValue);                  // 2. 广播配置更新消息         redisTemplate.convertAndSend("config-update-channel",              new ConfigUpdateEvent(configKey, configValue));     } }  // 服务节点 - 订阅配置更新 @Component public class ConfigUpdateListener {     @Autowired     private LocalConfigCache localConfigCache;          @RedisListener(channel = "config-update-channel")     public void handleConfigUpdate(ConfigUpdateEvent event) {         // 更新本地配置缓存         localConfigCache.updateConfig(event.getKey(), event.getValue());     } } 

应用场景

  1. 功能开关:动态开启或关闭功能
  2. 参数调整:调整超时时间、限流阈值等
  3. 黑白名单:更新黑白名单配置

场景七:顺序消息

背景描述

在某些业务场景中,消息的处理顺序很重要,如订单状态变更。

MQ解决方案

// 订单状态变更服务 @Service public class OrderStateService {     @Autowired     private RocketMQTemplate rocketMQTemplate;          public void changeOrderState(String orderId, String oldState, String newState) {         OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);                  // 发送顺序消息,使用orderId作为sharding key         rocketMQTemplate.syncSendOrderly(             "order-state-topic",              event,              orderId  // 保证同一订单的消息按顺序处理         );     } }  // 订单状态消费者 @Service @RocketMQMessageListener(     topic = "order-state-topic",     consumerGroup = "order-state-group",     consumeMode = ConsumeMode.ORDERLY  // 顺序消费 ) public class OrderStateConsumer implements RocketMQListener<OrderStateEvent> {     @Override     public void onMessage(OrderStateEvent event) {         // 按顺序处理订单状态变更         orderService.processStateChange(event);     } } 

顺序保证机制

  1. 分区顺序:同一分区内的消息保证顺序
  2. 顺序投递:MQ保证消息按发送顺序投递
  3. 顺序处理:消费者顺序处理消息

场景八:延迟消息

背景描述

需要实现定时任务,如订单超时未支付自动取消。

MQ解决方案

// 订单服务 - 发送延迟消息 @Service public class OrderService {     @Autowired     private RabbitTemplate rabbitTemplate;          public void createOrder(Order order) {         // 保存订单         orderDao.save(order);                  // 发送延迟消息,30分钟后检查支付状态         rabbitTemplate.convertAndSend(             "order.delay.exchange",             "order.create",             new OrderCreateEvent(order.getId()),             message -> {                 message.getMessageProperties().setDelay(30 * 60 * 1000); // 30分钟                 return message;             }         );     } }  // 订单超时检查消费者 @Component @RabbitListener(queues = "order.delay.queue") public class OrderTimeoutConsumer {     @RabbitHandler     public void checkOrderPayment(OrderCreateEvent event) {         Order order = orderDao.findById(event.getOrderId());         if ("UNPAID".equals(order.getStatus())) {             // 超时未支付,取消订单             orderService.cancelOrder(order.getId(), "超时未支付");         }     } } 

替代方案对比

方案 优点 缺点
数据库轮询 实现简单 实时性差,数据库压力大
延时队列 实时性好 实现复杂,消息堆积问题
定时任务 可控性强 分布式协调复杂

场景九:消息重试

背景描述

处理消息时可能遇到临时故障,需要重试机制保证最终处理成功。

MQ解决方案

// 消息消费者 with 重试机制 @Service @Slf4j public class RetryableConsumer {     @Autowired     private RabbitTemplate rabbitTemplate;          @RabbitListener(queues = "business.queue")     public void processMessage(Message message, Channel channel) {         try {             // 业务处理             businessService.process(message);                          // 确认消息             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);                      } catch (TemporaryException e) {             // 临时异常,重试             log.warn("处理失败,准备重试", e);                          // 拒绝消息,requeue=true             channel.basicNack(                 message.getMessageProperties().getDeliveryTag(),                 false,                 true  // 重新入队             );                      } catch (PermanentException e) {             // 永久异常,进入死信队列             log.error("处理失败,进入死信队列", e);                          channel.basicNack(                 message.getMessageProperties().getDeliveryTag(),                 false,                 false  // 不重新入队             );         }     } } 

重试策略

  1. 立即重试:临时故障立即重试
  2. 延迟重试:逐步增加重试间隔
  3. 死信队列:最终无法处理的消息进入死信队列

场景十:事务消息

背景描述

分布式系统中,需要保证多个服务的数据一致性。

MQ解决方案

// 事务消息生产者 @Service public class TransactionalMessageService {     @Autowired     private RocketMQTemplate rocketMQTemplate;          @Transactional     public void createOrderWithTransaction(Order order) {         // 1. 保存订单(数据库事务)         orderDao.save(order);                  // 2. 发送事务消息         TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(             "order-tx-topic",             MessageBuilder.withPayload(new OrderCreatedEvent(order.getId()))                 .build(),             order  // 事务参数         );                  if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {             throw new RuntimeException("事务消息发送失败");         }     } }  // 事务消息监听器 @Component @RocketMQTransactionListener public class OrderTransactionListener implements RocketMQLocalTransactionListener {     @Autowired     private OrderDao orderDao;          @Override     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {         try {             // 检查本地事务状态             Order order = (Order) arg;             Order existOrder = orderDao.findById(order.getId());                          if (existOrder != null && "CREATED".equals(existOrder.getStatus())) {                 return RocketMQLocalTransactionState.COMMIT_MESSAGE;             } else {                 return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;             }         } catch (Exception e) {             return RocketMQLocalTransactionState.UNKNOWN;         }     }          @Override     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {         // 回查本地事务状态         String orderId = (String) msg.getHeaders().get("order_id");         Order order = orderDao.findById(orderId);                  if (order != null && "CREATED".equals(order.getStatus())) {             return RocketMQLocalTransactionState.COMMIT_MESSAGE;         } else {             return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;         }     } } 

事务消息流程

我工作中用MQ的10种场景

总结

通过以上10个场景,我们可以总结出MQ使用的核心原则:

适用场景

  1. 异步处理:提升系统响应速度
  2. 系统解耦:降低系统间依赖
  3. 流量削峰:应对突发流量
  4. 数据同步:保证最终一致性
  5. 分布式事务:解决数据一致性问题

技术选型建议

场景 推荐MQ 原因
高吞吐 Kafka 高吞吐量,持久化存储
事务消息 RocketMQ 完整的事务消息机制
复杂路由 RabbitMQ 灵活的路由配置
延迟消息 RabbitMQ 原生支持延迟队列

最佳实践

  1. 消息幂等性:消费者必须实现幂等处理
  2. 死信队列:处理失败的消息要有兜底方案
  3. 监控告警:完善的消息堆积监控和告警
  4. 性能优化:根据业务特点调整MQ参数

最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下我的同名公众号:苏三说技术,您的支持是我坚持写作最大的动力。

求一键三连:点赞、转发、在看。

关注公众号:【苏三说技术】,在公众号中回复:进大厂,可以免费获取我最近整理的10万字的面试宝典,好多小伙伴靠这个宝典拿到了多家大厂的offer。

本文收录于我的技术网站:http://www.susan.net.cn

发表评论

评论已关闭。

相关文章