————以点赞消息案例为例
一、关于RabbitMQ回调机制知识点补充: https://www.cnblogs.com/Mr-Keep/p/19140274
在 RabbitMQ 中,生产者发送消息后,有可能遇到以下几种情况:
-
消息成功投递到交换机(Exchange)
-
消息未能成功投递到交换机(Exchange)
-
消息成功进入交换机但无法路由到队列(Queue)
如果生产者端没有回调确认机制,就可能出现严重的数据不一致:
举例: Redis 已经增加点赞数,但消息并未真正进入 MQ,数据库后续也无法更新,就出现了 “缓存超前、数据库缺失” 的问题。
为了解决这种问题,Spring AMQP 提供了:
-
RabbitTemplate.setConfirmCallback()
-
RabbitTemplate.setReturnsCallback()
来捕获和处理消息投递的成功与失败。
但是在复杂系统中,不同的业务消息(例如“下单”、“扣库存”、“发积分”)在投递失败时,需要采取不同的补偿逻辑。
弊端:如果你只写一份大而全的回调逻辑,代码就会充满大量的 if else 判断,非常难维护。
二、策略模式思想引入
策略模式的核心思想是:定义一系列算法(或行为),让它们可以相互替换,且算法的变化不会影响使用算法的客户。
-
“算法” ≈ “不同的消息回调处理逻辑”
-
“客户” ≈ “RabbitTemplate 的 ConfirmCallback 回调”
操作:通过(根据业务抽象)接口 + Map 注入,在运行时动态选择。
代码实现
1、定义统一的回调处理接口
public interface ConfirmCallbackService { /** * 投递失败后的回调处理 * @param message 投递的消息对象 */ void confirmCallback(Message message); }
例:定义点赞案例的实现类(可选):
public class LikeConfirmCallback implements ConfirmCallbackService{ /** * 注入RedisTemplate */ private final RedisTemplate<String,Integer> redisTemplate; /** * 执行失败后的反向操作 * @param message 投递的消息对象 */ @Override public void confirmCallback(Message message) { byte[] bytes = message.getBody(); //反向序列化为LikeDTO对象 try { LikeDTO dto = new ObjectMapper().readValue(bytes, LikeDTO.class); if(dto.getLikeStatus()){ redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+dto.getEid(), dto.getUid()); }else{ redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+dto.getEid(),dto.getUid()); } } catch (IOException e) { throw new RuntimeException(e); } } }
小技巧:
- 可选不单独定义类,而是让业务层本身实现ConfirmCallbackService接口,简化书写操作
- 分离成策略类则更利于模块化、解耦和扩展。
2、回调上下文: 策略分发器
@Component @RequiredArgsConstructor @Slf4j public class ConfirmCallbackContext { /** * 注入RabbitTemplate */ private final RabbitTemplate rabbitTemplate; /** * 注入所有ConfirmCallbackService的实现类 * 在不同的业务场景调用不同的实现来处理投递失败的业务逻辑 */ private final Map<String,ConfirmCallbackService> confirmCallbackServiceMap; /** * 统一调用回调处理 * 在容器初始化就执行这个方法 */ @PostConstruct public void confirmCallback(){ rabbitTemplate.setConfirmCallback((cdata,ack,cause)->{ ReturnedMessage returnedMessage = cdata.getReturned(); if(ack){ log.info("The message was delivered to the{}",returnedMessage); }else{ //获取业务实现的bean的id String beanName = returnedMessage.getReplyText(); //根据bean的名称从map中获取相应的实现类 ConfirmCallbackService callbackService = confirmCallbackServiceMap.get(beanName); callbackService.confirmCallback(returnedMessage.getMessage()); } }); } }
核心原理:
-
Spring Boot 会自动扫描所有实现 ConfirmCallbackService 的 Bean
-
Bean 名称作为 key,Bean 实例作为 value 注入到 Map<String, ConfirmCallbackService>
-
ConfirmCallbackContext 根据 replyText 动态找到对应的策略实现类
3.消息发送端封装
@Component @RequiredArgsConstructor public class RabbitManager<T> { private final RabbitTemplate rabbitTemplate; public void send(String exchange,String routingKey, String callbackBeanName,T data){ try { //创建cdata对象并设置一个id CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //将投递的数据转换为byte[] byte[] bytes = new ObjectMapper().writeValueAsBytes(data); //将bytes封装为Message对象 Message message = new Message(bytes); //创建一个投递失败时返回的消息对象 ReturnedMessage returnedMessage = new ReturnedMessage(message, 0, callbackBeanName, exchange,routingKey); //将ReturnedMesssage保存到cdata中 correlationData.setReturned(returnedMessage); //发送 rabbitTemplate.convertAndSend(exchange,routingKey,data,correlationData); } catch (Exception e) { throw new RuntimeException(e); } } }
** 关键点:**
callbackBeanName 会被放进 replyText 中,作为“回调策略的指针”。
4.点赞业务逻辑方法
4.1简化写法
@Override public LikeDTO likeEssay(Integer uid, Integer eid) { boolean likeStatus = false; //如果缓存中存在用户id则取消点赞,不存在则添加用户id记录点赞 if(isLike(eid, uid)) { //将用户ID从set集合中移除 redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid, uid); } else { likeStatus = true; //将用户ID添加到set集合中 redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid, uid); } //获取当前帖子在redis中的点赞总数 Long likeCount = redisTemplate.opsForSet().size(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid); //创建LikeDTO封装修改的数据并发布到消息队列 LikeDTO likeDTO = new LikeDTO(eid, uid, likeCount,likeStatus); //发送到mq异步更新到数据库 rabbitManager.send(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, "likeServiceImpl", likeDTO); return likeDTO; } /** * 消息投递失败后的处理 * @param message 失败后返回的消息 */ @Override public void confirmCallback(Message message) { byte[] bytes = message.getBody(); try { //反序列化为LikeDTO对象 LikeDTO dto = new ObjectMapper().readValue(bytes, LikeDTO.class); //执行反向操作 if(dto.getLikeStatus()) { redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + dto.getEid(), dto.getUid()); } else { redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + dto.getEid(), dto.getUid()); } } catch (IOException e) { throw new RuntimeException(e); } }
4.2 有业务实现类时
````
public LikeDTO likeEssay(Integer uid, Integer eid) {
boolean likeStatus = false;
//如果缓存中存在用户id则取消点赞,不存在则添加用户id记录点赞 if(isLike(uid,eid)){ //取消点赞 redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+eid,uid.toString()); likeMapper.deleteLike(eid,uid); }else{ likeStatus = true; //将用户ID添加到set集合中 redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+eid,uid.toString()); } //获取当前帖子在redis中的点赞总数 Long likeCount = redisTemplate.opsForSet().size(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid); //创建LikeDTO封装修改的数据并发布到消息队列 LikeDTO likeDTO = new LikeDTO(eid, uid, likeCount,likeStatus); //发送到mq异步更新到数据库 rabbitManager.send(RabbitmqConfig.EXCHANGE_NAME,RabbitmqConfig.ROUTING_KEY, "likeConfirmCallbackService",likeDTO); return likeDTO; }
最终目标:当点赞消息从生产者发送到 RabbitMQ 时,一旦投递失败,系统能自动执行反向补偿逻辑,确保 Redis 与数据库的一致性。