订单初版—4.取消订单链路中的技术问题说明文档
技术分享
8个月前 (07-11)
0
999+
大纲
1.超时自动取消订单功能的业务背景
2.超时自动取消订单和支付的并发问题
3.分布式锁解决超时订单取消和支付的并发
4.定时任务解决延时消费的消息的丢失问题
5.超时自动取消订单代码流程
6.RocketMQ延迟消息实现原理
7.取消订单务场景和接口被调用的情况
8.取消订单的业务链路
9.为什么拦截履约要同步而释放资产是异步
10.支付回调分别互斥预支付和取消订单
11.支付回调和取消订单互斥下的两种场景分析
12.拦截履约的具体业务流程
13.拦截履约和取消订单的Seata事务原理分析
14.取消订单全链路Seata回滚原理与并发分析
15.支付退款时的双异步设计原因(提升性能 + 解耦)
16.释放资产消息的高扩展性设计(多路发送消息)
17.取消订单链路中数据库事务与MQ消息不一致问题
18.释放资产多路MQ故障重试与幂等方案
19.双异步支付退款不一致问题分析
1.超时自动取消订单功能的业务背景
用户提交订单,订单系统生成订单后,由于种种原因,用户并没有立即点击去支付或者完成支付。此时默认该订单会在生成30分钟后,自动进行支付检查。也就是如果生成订单超过30分钟还没进行支付,那么就会自动取消订单。
超时自动取消订单的技术方案:RocketMQ延迟消息 + Redisson分布式锁 + XXL-JOB分布式任务调度。
2.超时自动取消订单和支付的并发问题
当用户提交的订单在30分钟后才发起支付时,此时就有可能出现取消订单和支付出现并发的问题。
3.分布式锁解决超时订单取消和支付的并发
在如下位置加同一把分布式锁即可解决超时订单取消和支付的并发问题:
一.订单系统处理预支付请求时添加分布式锁
二.订单系统处理支付回调请求时添加分布式锁
三.订单系统消费延时30m的订单消息进行超时检查时添加分布式锁
当处理预支付和消费延时订单消息进行超时检查时产生并发:如果处理预支付先获取锁,则超时检查会被阻塞,之后发现已支付不取消。如果超时检查先获取锁,则预支付被阻塞,之后预支付失败。
当处理支付回调和消费延时订单消息进行超时检查时产生并发:如果支付回调先获取锁,则超时检查会被阻塞,之后发现已支付不取消。如果超时检查先获取锁,则支付回调被阻塞,之后支付回调获取锁,发现订单已被超时检查取消,于是需要进行退款处理通知用户。
4.定时任务解决延时消费的消息的丢失问题
如果发送到MQ的延时30m消费的消息丢失,此时会通过定时任务来处理。该定时任务会对MySQL进行扫描,把创建时间超30m + 没支付的订单找出,然后再对这些超时订单进行取消。定时任务一般使用分布式调度框架XXL-JOB来实现。
以上便是超时自动取消订单的技术方案:RocketMQ延迟消息 + Redisson分布式锁 + XXL-JOB分布式任务调度。
5.超时自动取消订单代码流程
(1)生成订单时发送延时30m才能被消费的消息
(2)订单超时检查监听器
(3)对订单进行正式取消的业务逻辑
(4)定时任务检查创建时间超30m但没取消的订单
(1)生成订单时发送延时30m才能被消费的消息
@Service public class OrderServiceImpl implements OrderService { ... //提交订单/生成订单接口 //@param createOrderRequest 提交订单请求入参 @GlobalTransactional(rollbackFor = Exception.class) @Override public CreateOrderDTO createOrder(CreateOrderRequest createOrderRequest) { //1.入参检查 checkCreateOrderRequestParam(createOrderRequest); //2.风控检查 checkRisk(createOrderRequest); //3.获取商品信息 List<ProductSkuDTO> productSkuList = listProductSkus(createOrderRequest); //4.计算订单价格 CalculateOrderAmountDTO calculateOrderAmountDTO = calculateOrderAmount(createOrderRequest, productSkuList); //5.验证订单实付金额 checkRealPayAmount(createOrderRequest, calculateOrderAmountDTO); //6.锁定优惠券 lockUserCoupon(createOrderRequest); //7.锁定商品库存 lockProductStock(createOrderRequest); //8.生成订单到数据库 addNewOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO); //9.发送订单延迟消息用于支付超时自动关单 sendPayOrderTimeoutDelayMessage(createOrderRequest); //返回订单信息 CreateOrderDTO createOrderDTO = new CreateOrderDTO(); createOrderDTO.setOrderId(createOrderRequest.getOrderId()); return createOrderDTO; } //发送支付订单超时延迟消息,用于支付超时自动关单 private void sendPayOrderTimeoutDelayMessage(CreateOrderRequest createOrderRequest) { PayOrderTimeoutDelayMessage message = new PayOrderTimeoutDelayMessage(); message.setOrderId(createOrderRequest.getOrderId()); message.setBusinessIdentifier(createOrderRequest.getBusinessIdentifier()); message.setCancelType(OrderCancelTypeEnum.TIMEOUT_CANCELED.getCode()); message.setUserId(createOrderRequest.getUserId()); message.setOrderType(createOrderRequest.getOrderType()); message.setOrderStatus(OrderStatusEnum.CREATED.getCode()); String msgJson = JsonUtil.object2Json(message); defaultProducer.sendMessage( RocketMqConstant.PAY_ORDER_TIMEOUT_DELAY_TOPIC, msgJson, RocketDelayedLevel.DELAYED_30m, "支付订单超时延迟消息" ); } ... } @Component public class DefaultProducer { private final DefaultMQProducer producer; @Autowired public DefaultProducer(RocketMQProperties rocketMQProperties) { producer = new DefaultMQProducer(RocketMqConstant.ORDER_DEFAULT_PRODUCER_GROUP); producer.setNamesrvAddr(rocketMQProperties.getNameServer()); start(); } //对象在使用之前必须要调用一次,只能初始化一次 public void start() { try { this.producer.start(); } catch (MQClientException e) { log.error("producer start error", e); } } ... //发送消息 public void sendMessage(String topic, String message, Integer delayTimeLevel, String type) { Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8)); try { if (delayTimeLevel > 0) { msg.setDelayTimeLevel(delayTimeLevel); } SendResult send = producer.send(msg); if (SendStatus.SEND_OK == send.getSendStatus()) { log.info("发送MQ消息成功, type:{}, message:{}", type, message); } else { throw new OrderBizException(send.getSendStatus().toString()); } } catch (Exception e) { log.error("发送MQ消息失败:", e); throw new OrderBizException(OrderErrorCodeEnum.SEND_MQ_FAILED); } } ... }
(2)订单超时检查监听器
@Configuration public class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; ... //支付订单超时延迟消息消费者 @Bean("payOrderTimeoutConsumer") public DefaultMQPushConsumer payOrderTimeoutConsumer(PayOrderTimeoutListener payOrderTimeoutListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PAY_ORDER_TIMEOUT_DELAY_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(PAY_ORDER_TIMEOUT_DELAY_TOPIC, "*"); consumer.registerMessageListener(payOrderTimeoutListener); consumer.start(); return consumer; } ... } //监听支付订单超时延迟消息 @Component public class PayOrderTimeoutListener implements MessageListenerConcurrently { @Autowired private OrderAfterSaleService orderAfterSaleService; @Autowired private OrderInfoDAO orderInfoDAO; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); PayOrderTimeoutDelayMessage payOrderTimeoutDelayMessage = JSON.parseObject(message, PayOrderTimeoutDelayMessage.class); //消费延迟消息,执行关单逻辑 CancelOrderRequest cancelOrderRequest = new CancelOrderRequest(); cancelOrderRequest.setOrderId(payOrderTimeoutDelayMessage.getOrderId()); cancelOrderRequest.setBusinessIdentifier(payOrderTimeoutDelayMessage.getBusinessIdentifier()); cancelOrderRequest.setCancelType(payOrderTimeoutDelayMessage.getOrderType()); cancelOrderRequest.setUserId(payOrderTimeoutDelayMessage.getUserId()); cancelOrderRequest.setOrderType(payOrderTimeoutDelayMessage.getOrderType()); cancelOrderRequest.setOrderStatus(payOrderTimeoutDelayMessage.getOrderStatus()); //查询当前数据库的订单实时状态 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(payOrderTimeoutDelayMessage.getOrderId()); Integer orderStatusDatabase = orderInfoDO.getOrderStatus(); if (!OrderStatusEnum.CREATED.getCode().equals(orderStatusDatabase)) { //订单实时状态不等于已创建 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //当前时间 小于 订单实际支付截止时间 if (new Date().before(orderInfoDO.getExpireTime())) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } orderAfterSaleService.cancelOrder(cancelOrderRequest); log.info("关闭订单,orderId:{}", cancelOrderRequest.getOrderId()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }
(3)对订单进行正式取消的业务逻辑
@Service public class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... //取消订单/超时未支付取消 @Override public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) { //入参检查 checkCancelOrderRequestParam(cancelOrderRequest); //分布式锁,这里的锁和预支付及支付回调使用的分布式锁一样 String orderId = cancelOrderRequest.getOrderId(); String key = RedisLockKeyConstants.ORDER_PAY_KEY + orderId; try { boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } //执行取消订单 executeCancelOrder(cancelOrderRequest, orderId); return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } } @Override @GlobalTransactional(rollbackFor = Exception.class) public void executeCancelOrder(CancelOrderRequest cancelOrderRequest, String orderId) { //1.组装数据 CancelOrderAssembleRequest cancelOrderAssembleRequest = buildAssembleRequest(orderId, cancelOrderRequest); //幂等校验:防止多个线程同时操作取消同一笔订单 if (OrderStatusEnum.CANCELED.getCode().equals(cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus())) { return; } //2.检验订单支付状态 checkOrderPayStatus(cancelOrderAssembleRequest); //3.更新订单状态和记录订单操作日志 updateOrderStatusAndSaveOperationLog(cancelOrderAssembleRequest); //超时未支付的订单不用继续再往下执行取消履约和释放资产 if (OrderStatusEnum.PAID.getCode() > cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus()) { return; } //4.履约取消 cancelFulfill(cancelOrderAssembleRequest); //5.发送释放资产消息到MQ defaultProducer.sendMessage(RocketMqConstant.RELEASE_ASSETS_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest), "释放资产"); } //更新订单状态和记录订单操作日志 private void updateOrderStatusAndSaveOperationLog(CancelOrderAssembleRequest cancelOrderAssembleRequest) { //更新订单表 OrderInfoDO orderInfoDO = cancelOrderAssembleRequest.getOrderInfoDTO().clone(OrderInfoDO.class); orderInfoDO.setCancelType(cancelOrderAssembleRequest.getCancelType().toString()); orderInfoDO.setOrderStatus(OrderStatusEnum.CANCELED.getCode()); orderInfoDO.setCancelTime(new Date()); orderInfoDAO.updateOrderInfo(orderInfoDO); log.info("更新订单信息OrderInfo状态: orderId:{},status:{}", orderInfoDO.getOrderId(), orderInfoDO.getOrderStatus()); //新增订单操作操作日志表 Integer cancelType = Integer.valueOf(orderInfoDO.getCancelType()); String orderId = orderInfoDO.getOrderId(); OrderOperateLogDO orderOperateLogDO = new OrderOperateLogDO(); orderOperateLogDO.setOrderId(orderId); orderOperateLogDO.setPreStatus(cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus()); orderOperateLogDO.setCurrentStatus(OrderStatusEnum.CANCELED.getCode()); orderOperateLogDO.setOperateType(OrderOperateTypeEnum.AUTO_CANCEL_ORDER.getCode()); if (OrderCancelTypeEnum.USER_CANCELED.getCode().equals(cancelType)) { orderOperateLogDO.setOperateType(OrderOperateTypeEnum.MANUAL_CANCEL_ORDER.getCode()); orderOperateLogDO.setRemark(OrderOperateTypeEnum.MANUAL_CANCEL_ORDER.getMsg() + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus()); } if (OrderCancelTypeEnum.TIMEOUT_CANCELED.getCode().equals(cancelType)) { orderOperateLogDO.setOperateType(OrderOperateTypeEnum.AUTO_CANCEL_ORDER.getCode()); orderOperateLogDO.setRemark(OrderOperateTypeEnum.AUTO_CANCEL_ORDER.getMsg() + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus()); } orderOperateLogDAO.save(orderOperateLogDO); log.info("新增订单操作日志OrderOperateLog状态,orderId:{}, PreStatus:{},CurrentStatus:{}", orderInfoDO.getOrderId(), orderOperateLogDO.getPreStatus(), orderOperateLogDO.getCurrentStatus()); } ... }
(4)定时任务检查创建时间超30m但没取消的订单
//自动取消超时订单任务 @Component public class AutoCancelExpiredOrderTask { @Autowired private OrderInfoDAO orderInfoDAO; @Autowired private OrderAfterSaleService orderAfterSaleService; @Autowired private OrderProperties orderProperties; @Autowired private RedisLock redisLock; //执行任务逻辑 @Scheduled(fixedRate = 30 * 60 * 1000) public void execute() { //扫描当前时间 - 订单超时时间 -> 前的一小段时间范围(时间范围用配置中心配置) //比如当前时间11:40,订单超时时间是30分钟,扫描11:09:00 -> 11:10:00这一分钟的未支付订单, //缺点:有一个订单超过了30 + 1 = 31分钟,都没有被处理(取消),这笔订单就永久待支付 for (OrderInfoDO order : orderInfoDAO.listAllUnPaid()) { if (new Date().getTime() - order.getExpireTime().getTime() >= orderProperties.getExpireTime()) { //分布式锁 String orderId = order.getOrderId(); String key = RedisLockKeyConstants.ORDER_PAY_KEY + orderId; try { boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } //超过30min未支付 CancelOrderRequest request = new CancelOrderRequest(); request.setOrderId(order.getOrderId()); request.setUserId(order.getUserId()); request.setBusinessIdentifier(order.getBusinessIdentifier()); request.setOrderType(order.getOrderType()); request.setCancelType(OrderCancelTypeEnum.TIMEOUT_CANCELED.getCode()); request.setOrderStatus(order.getOrderStatus()); try { orderAfterSaleService.cancelOrder(request); } catch (Exception e) { log.error("AutoCancelExpiredOrderTask execute error:", e); } } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } } } } }
6.RocketMQ延迟消息实现原理
发送需要延迟消费的消息到RocketMQ的某Topic时,RocketMQ会将该消息进行改写,投递到重试Topic,而非指定的某Topic。RocketMQ将该消息转发到重试Topic对应的ConsumerQueue后,RocketMQ内部会有一个内部的延迟调度组件ScheduleService,该组件便会消费重试Topic对应的ConsumerQueue里的消息,并判断里面的消息是否到达延迟时间。如果到达延迟时间,则会重新改写消息Topic,投递到开始指定的某Topic。
7.取消订单的业务场景和接口被调用的情况
(1)发生取消订单的三种场景
(2)调用取消订单接口的三种情况
(1)发生取消订单的三种场景
场景一:用户生成订单后还没支付,就取消订单。取消订单可能是用户手动取消,也可能是超时检查自动取消,此时只需要释放锁定的优惠券和库存即可。
场景二:用户生成订单后完成支付,还没拣货出库,用户就取消订单。此时不仅需要释放锁定的优惠券和库存,还需取消履约以及退款。
场景三:用户生成订单后完成支付,已拣货出库,开始物流配送。此时用户想要取消订单,只能等收到货以后,发起售后退货申请。
(2)调用取消订单接口的三种情况
情况一:⼿动取消,订单出库状态前都可取消
情况二:正向⽣成订单后,会发送延迟30分钟消费的订单消息到MQ。订单系统在30分钟后消费到该MQ消息时,若发现仍未⽀付则取消该订单
情况三:定时扫描,超过30分钟未⽀付才取消
8.取消订单的业务链路和代码流程
(1)实现概述
(2)对取消订单的处理
(3)对取消履约+更新订单状态+新增订单日志的处理
(4)对释放资产的处理
(5)对退款的退款前处理
(6)对退款的实际退款处理
(1)实现概述
一.实现流程图
二.技术要点
要点一:更新订单状态、取消履约、发送释放资产是⼀个分布式事务,需要将Seata的AT模式替换成RocketMQ的事务消息。
要点二:超时订单的处理⽅案是⽣成订单后发送⼀个延迟30分钟后才被消费的消息到MQ。订单系统消费该延迟消息时,会验证订单是否已支付,否就调⽤取消订单的接⼝。
(2)对取消订单的处理
//订单中心-逆向售后业务接口 @DubboService(version = "1.0.0", interfaceClass = AfterSaleApi.class, retries = 0) public class AfterSaleApiImpl implements AfterSaleApi { @Autowired private OrderLackService orderLackItemService; @Autowired private OrderAfterSaleService orderAfterSaleService; //取消订单/超时未支付取消 @Override public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) { try { return orderAfterSaleService.cancelOrder(cancelOrderRequest); } catch (OrderBizException e) { log.error("biz error", e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error", e); return JsonResult.buildError(e.getMessage()); } } ... } //取消订单入参 @Data public class CancelOrderRequest extends AbstractObject implements Serializable { private static final long serialVersionUID = -4035579903997700971L; private String orderId;//订单号 private Integer businessIdentifier;//订单渠道来源 private Integer cancelType;//订单取消类型 0-手动取消 1-超时未支付 private String userId;//用户id private Integer orderType;//订单类型 private Integer orderStatus;//订单状态 private Integer oldOrderStatus;//原订单状态 } @Service public class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... //取消订单/超时未支付取消 @Override public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) { //入参检查 checkCancelOrderRequestParam(cancelOrderRequest); //分布式锁 String orderId = cancelOrderRequest.getOrderId(); String key = RedisLockKeyConstants.CANCEL_KEY + orderId; try { boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } //执行取消订单 return executeCancelOrder(cancelOrderRequest, orderId); } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } } @Override public JsonResult<Boolean> executeCancelOrder(CancelOrderRequest cancelOrderRequest, String orderId) { //1.组装数据 OrderInfoDO orderInfoDO = findOrderInfo(orderId, cancelOrderRequest.getCancelType()); CancelOrderAssembleRequest cancelOrderAssembleRequest = buildAssembleRequest(orderId, cancelOrderRequest, orderInfoDO); if (cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus() >= OrderStatusEnum.OUT_STOCK.getCode()) { throw new OrderBizException(OrderErrorCodeEnum.CURRENT_ORDER_STATUS_CANNOT_CANCEL); } TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { //2.执行履约取消、更新订单状态、新增订单日志操作 afterSaleManager.cancelOrderFulfillmentAndUpdateOrderStatus(cancelOrderAssembleRequest); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //查询订单状态是否已更新为"已取消" OrderInfoDO orderInfoByDatabase = orderInfoDAO.getByOrderId(orderId); if (OrderStatusEnum.CANCELED.getCode().equals(orderInfoByDatabase.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); try { Message message = new Message(RocketMqConstant.RELEASE_ASSETS_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest).getBytes(StandardCharsets.UTF_8)); //3.发送事务消息 释放权益资产 TransactionSendResult result = producer.sendMessageInTransaction(message, cancelOrderAssembleRequest); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_PROCESS_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } ... }
(3)对取消履约+更新订单状态+新增订单日志的处理
@Service public class AfterSaleManagerImpl implements AfterSaleManager { ... @Override public void cancelOrderFulfillmentAndUpdateOrderStatus(CancelOrderAssembleRequest cancelOrderAssembleRequest) { //履约取消 cancelFulfill(cancelOrderAssembleRequest); //更新订单状态和记录订单操作日志 updateOrderStatusAndSaveOperationLog(cancelOrderAssembleRequest); } //调用履约拦截订单 private void cancelFulfill(CancelOrderAssembleRequest cancelOrderAssembleRequest) { OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); if (OrderStatusEnum.CREATED.getCode().equals(orderInfoDTO.getOrderStatus())) { return; } CancelFulfillRequest cancelFulfillRequest = orderInfoDTO.clone(CancelFulfillRequest.class); JsonResult<Boolean> jsonResult = fulfillApi.cancelFulfill(cancelFulfillRequest); if (!jsonResult.getSuccess()) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_FULFILL_ERROR); } } //更新订单状态和记录订单操作日志 private void updateOrderStatusAndSaveOperationLog(CancelOrderAssembleRequest cancelOrderAssembleRequest) { //更新订单表 OrderInfoDO orderInfoDO = cancelOrderAssembleRequest.getOrderInfoDTO().clone(OrderInfoDO.class); orderInfoDO.setCancelType(cancelOrderAssembleRequest.getCancelType().toString()); orderInfoDO.setOrderStatus(OrderStatusEnum.CANCELED.getCode()); orderInfoDO.setCancelTime(new Date()); orderInfoDAO.updateOrderInfo(orderInfoDO); log.info("更新订单信息OrderInfo状态: orderId:{},status:{}", orderInfoDO.getOrderId(), orderInfoDO.getOrderStatus()); //新增订单操作操作日志表 Integer cancelType = Integer.valueOf(orderInfoDO.getCancelType()); String orderId = orderInfoDO.getOrderId(); OrderOperateLogDO orderOperateLogDO = new OrderOperateLogDO(); orderOperateLogDO.setOrderId(orderId); orderOperateLogDO.setPreStatus(cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus()); orderOperateLogDO.setCurrentStatus(OrderStatusEnum.CANCELED.getCode()); orderOperateLogDO.setOperateType(OrderOperateTypeEnum.AUTO_CANCEL_ORDER.getCode()); if (OrderCancelTypeEnum.USER_CANCELED.getCode().equals(cancelType)) { orderOperateLogDO.setOperateType(OrderOperateTypeEnum.MANUAL_CANCEL_ORDER.getCode()); orderOperateLogDO.setRemark(OrderOperateTypeEnum.MANUAL_CANCEL_ORDER.getMsg() + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus()); } if (OrderCancelTypeEnum.TIMEOUT_CANCELED.getCode().equals(cancelType)) { orderOperateLogDO.setOperateType(OrderOperateTypeEnum.AUTO_CANCEL_ORDER.getCode()); orderOperateLogDO.setRemark(OrderOperateTypeEnum.AUTO_CANCEL_ORDER.getMsg() + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus()); } orderOperateLogDAO.save(orderOperateLogDO); log.info("新增订单操作日志OrderOperateLog状态,orderId:{}, PreStatus:{},CurrentStatus:{}", orderInfoDO.getOrderId(), orderOperateLogDO.getPreStatus(), orderOperateLogDO.getCurrentStatus()); } ... }
(4)对释放资产的处理
@Configuration public class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; ... //释放资产消息消费者 @Bean("releaseAssetsConsumer") public DefaultMQPushConsumer releaseAssetsConsumer(ReleaseAssetsListener releaseAssetsListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RELEASE_ASSETS_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RELEASE_ASSETS_TOPIC, "*"); consumer.registerMessageListener(releaseAssetsListener); consumer.start(); return consumer; } } //消费MQ的释放资产消息 @Component public class ReleaseAssetsListener implements MessageListenerConcurrently { @Autowired private DefaultProducer defaultProducer; @Autowired private OrderItemDAO orderItemDAO; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { //1.消费到释放资产message String message = new String(messageExt.getBody()); log.info("ReleaseAssetsListener message:{}", message); CancelOrderAssembleRequest cancelOrderAssembleRequest = JSONObject.parseObject(message, CancelOrderAssembleRequest.class); OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); //2.发送取消订单退款请求MQ if (orderInfoDTO.getOrderStatus() > OrderStatusEnum.CREATED.getCode()) { defaultProducer.sendMessage(RocketMqConstant.CANCEL_REFUND_REQUEST_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest), "取消订单退款"); } //3.发送释放库存MQ ReleaseProductStockRequest releaseProductStockRequest = buildReleaseProductStock(orderInfoDTO, orderItemDAO); defaultProducer.sendMessage(RocketMqConstant.CANCEL_RELEASE_INVENTORY_TOPIC, JSONObject.toJSONString(releaseProductStockRequest), "取消订单释放库存"); //4.发送释放优惠券MQ if (!Strings.isNullOrEmpty(orderInfoDTO.getCouponId())) { ReleaseUserCouponRequest releaseUserCouponRequest = buildReleaseUserCoupon(orderInfoDTO); defaultProducer.sendMessage(RocketMqConstant.CANCEL_RELEASE_PROPERTY_TOPIC, JSONObject.toJSONString(releaseUserCouponRequest), "取消订单释放优惠券"); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } //组装释放优惠券数据 private ReleaseUserCouponRequest buildReleaseUserCoupon(OrderInfoDTO orderInfoDTO) { ReleaseUserCouponRequest releaseUserCouponRequest = new ReleaseUserCouponRequest(); releaseUserCouponRequest.setCouponId(orderInfoDTO.getCouponId()); releaseUserCouponRequest.setUserId(orderInfoDTO.getUserId()); releaseUserCouponRequest.setOrderId(orderInfoDTO.getOrderId()); return releaseUserCouponRequest; } //组装释放库存数据 private ReleaseProductStockRequest buildReleaseProductStock(OrderInfoDTO orderInfoDTO, OrderItemDAO orderItemDAO) { String orderId = orderInfoDTO.getOrderId(); List<ReleaseProductStockRequest.OrderItemRequest> orderItemRequestList = new ArrayList<>(); //查询订单条目 ReleaseProductStockRequest.OrderItemRequest orderItemRequest; List<OrderItemDO> orderItemDOList = orderItemDAO.listByOrderId(orderId); for (OrderItemDO orderItemDO : orderItemDOList) { orderItemRequest = new ReleaseProductStockRequest.OrderItemRequest(); orderItemRequest.setSkuCode(orderItemDO.getSkuCode()); orderItemRequest.setSaleQuantity(orderItemDO.getSaleQuantity()); orderItemRequestList.add(orderItemRequest); } ReleaseProductStockRequest releaseProductStockRequest = new ReleaseProductStockRequest(); releaseProductStockRequest.setOrderId(orderInfoDTO.getOrderId()); releaseProductStockRequest.setOrderItemRequestList(orderItemRequestList); return releaseProductStockRequest; } } @Configuration public class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; ... //消费退款请求消息 消费者 @Bean("cancelRefundConsumer") public DefaultMQPushConsumer cancelRefundConsumer(CancelRefundListener cancelRefundListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.REQUEST_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.CANCEL_REFUND_REQUEST_TOPIC, "*"); consumer.registerMessageListener(cancelRefundListener); consumer.start(); return consumer; } } //消费取消订单退款请求的消息 @Component public class CancelRefundListener implements MessageListenerConcurrently { @Autowired private OrderAfterSaleService orderAfterSaleService; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); CancelOrderAssembleRequest cancelOrderAssembleRequest = JSONObject.parseObject(message, CancelOrderAssembleRequest.class); log.info("CancelRefundConsumer message:{}", message); //执行 取消订单/超时未支付取消 前的操作 JsonResult<Boolean> jsonResult = orderAfterSaleService.processCancelOrder(cancelOrderAssembleRequest); if (!jsonResult.getSuccess()) { throw new OrderBizException(OrderErrorCodeEnum.CONSUME_MQ_FAILED); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } @Configuration public class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; //释放库存消息消费者 @Bean("releaseInventoryConsumer") public DefaultMQPushConsumer releaseInventoryConsumer(ReleaseInventoryListener releaseInventoryListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.RELEASE_INVENTORY_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.CANCEL_RELEASE_INVENTORY_TOPIC, "*"); consumer.registerMessageListener(releaseInventoryListener); consumer.start(); return consumer; } ... } //消费释放库存的消息 @Component public class ReleaseInventoryListener implements MessageListenerConcurrently { @DubboReference(version = "1.0.0") private InventoryApi inventoryApi; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt msg : list) { String content = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("ReleaseInventoryConsumer message:{}", content); CancelOrderReleaseProductStockRequest cancelOrderReleaseProductStockRequest = JSONObject.parseObject(content, CancelOrderReleaseProductStockRequest.class); JsonResult<Boolean> jsonResult = inventoryApi.cancelOrderReleaseProductStock(cancelOrderReleaseProductStockRequest); if (!jsonResult.getSuccess()) { throw new InventoryBizException(InventoryErrorCodeEnum.CONSUME_MQ_FAILED); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } @Configuration public class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; //释放资产权益消息消费者 @Bean("releaseInventoryConsumer") public DefaultMQPushConsumer releaseInventoryConsumer(ReleasePropertyListener releasePropertyListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.RELEASE_PROPERTY_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.CANCEL_RELEASE_PROPERTY_TOPIC, "*"); consumer.registerMessageListener(releasePropertyListener); consumer.start(); return consumer; } ... } //消费释放优惠券的消息 @Component public class ReleasePropertyListener implements MessageListenerConcurrently { @DubboReference(version = "1.0.0") private MarketApi marketApi; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt msg : list) { String content = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("ReleasePropertyConsumer message:{}", content); CancelOrderReleaseUserCouponRequest cancelOrderReleaseUserCouponRequest = JSONObject.parseObject(content, CancelOrderReleaseUserCouponRequest.class); JsonResult<Boolean> jsonResult = marketApi.cancelOrderReleaseCoupon(cancelOrderReleaseUserCouponRequest); if (!jsonResult.getSuccess()) { throw new MarketBizException(MarketErrorCodeEnum.CONSUME_MQ_FAILED); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }
(5)对退款的退款前处理
@Service public class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... @Override public JsonResult<Boolean> processCancelOrder(CancelOrderAssembleRequest cancelOrderAssembleRequest) { String orderId = cancelOrderAssembleRequest.getOrderId(); //分布式锁 String key = RedisLockKeyConstants.REFUND_KEY + orderId; try { boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.PROCESS_REFUND_REPEAT); } //执行退款前的准备工作 //生成售后订单号 OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); OrderInfoDO orderInfoDO = orderInfoDTO.clone(OrderInfoDO.class); String afterSaleId = orderNoManager.genOrderId(OrderNoTypeEnum.AFTER_SALE.getCode(), orderInfoDO.getUserId()); //1.计算 取消订单 退款金额 CancelOrderRefundAmountDTO cancelOrderRefundAmountDTO = calculatingCancelOrderRefundAmount(cancelOrderAssembleRequest); cancelOrderAssembleRequest.setCancelOrderRefundAmountDTO(cancelOrderRefundAmountDTO); TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { //2.取消订单操作 记录售后信息 afterSaleManager.insertCancelOrderAfterSale(cancelOrderAssembleRequest, AfterSaleStatusEnum.REVIEW_PASS.getCode(), orderInfoDO, afterSaleId); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { // 查询售后数据是否插入成功 AfterSaleInfoDO afterSaleInfoDO = afterSaleInfoDAO.getOneByAfterSaleId(Long.valueOf(afterSaleId)); List<AfterSaleItemDO> afterSaleItemDOList = afterSaleItemDAO.listByAfterSaleId(Long.valueOf(afterSaleId)); List<AfterSaleLogDO> afterSaleLogDOList = afterSaleLogDAO.listByAfterSaleId(Long.valueOf(afterSaleId)); List<AfterSaleRefundDO> afterSaleRefundDOList = afterSaleRefundDAO.listByAfterSaleId(Long.valueOf(afterSaleId)); if (afterSaleInfoDO != null && afterSaleItemDOList.isEmpty() && afterSaleLogDOList.isEmpty() && afterSaleRefundDOList.isEmpty()) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); try { //3.组装事务MQ消息 ActualRefundMessage actualRefundMessage = new ActualRefundMessage(); actualRefundMessage.setOrderId(cancelOrderAssembleRequest.getOrderId()); actualRefundMessage.setLastReturnGoods(cancelOrderAssembleRequest.isLastReturnGoods()); actualRefundMessage.setAfterSaleId(Long.valueOf(afterSaleId)); Message message = new Message(RocketMqConstant.ACTUAL_REFUND_TOPIC, JSONObject.toJSONString(actualRefundMessage).getBytes(StandardCharsets.UTF_8)); //4.发送事务MQ消息 TransactionSendResult result = producer.sendMessageInTransaction(message, actualRefundMessage); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.PROCESS_REFUND_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } finally { redisLock.unlock(key); } } ... } @Service public class AfterSaleManagerImpl implements AfterSaleManager { ... //取消订单操作 记录售后信息 @Transactional(rollbackFor = Exception.class) @Override public void insertCancelOrderAfterSale(CancelOrderAssembleRequest cancelOrderAssembleRequest, Integer afterSaleStatus, OrderInfoDO orderInfoDO, String afterSaleId) { OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); //取消订单过程中的 申请退款金额 和 实际退款金额 都是实付退款金额 金额相同 AfterSaleInfoDO afterSaleInfoDO = new AfterSaleInfoDO(); afterSaleInfoDO.setApplyRefundAmount(orderInfoDO.getPayAmount()); afterSaleInfoDO.setRealRefundAmount(orderInfoDO.getPayAmount()); //1.新增售后订单表 Integer cancelOrderAfterSaleStatus = AfterSaleStatusEnum.REVIEW_PASS.getCode(); insertCancelOrderAfterSaleInfoTable(orderInfoDO, cancelOrderAfterSaleStatus, afterSaleInfoDO, afterSaleId); cancelOrderAssembleRequest.setAfterSaleId(afterSaleId); //2.新增售后条目表 String orderId = cancelOrderAssembleRequest.getOrderId(); List<OrderItemDTO> orderItemDTOList = cancelOrderAssembleRequest.getOrderItemDTOList(); insertAfterSaleItemTable(orderId, orderItemDTOList, afterSaleId); //3.新增售后变更表 insertCancelOrderAfterSaleLogTable(afterSaleId, orderInfoDTO, AfterSaleStatusEnum.UN_CREATED.getCode(), afterSaleStatus); //4.新增售后支付表 AfterSaleRefundDO afterSaleRefundDO = insertAfterSaleRefundTable(orderInfoDTO, afterSaleId, afterSaleInfoDO); cancelOrderAssembleRequest.setAfterSaleRefundId(afterSaleRefundDO.getId()); } ... }
(6)对退款的实际退款处理
@Configuration public class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; ... @Bean("actualRefundConsumer") public DefaultMQPushConsumer actualRefundConsumer(ActualRefundListener actualRefundListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ACTUAL_REFUND_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(ACTUAL_REFUND_TOPIC, "*"); consumer.registerMessageListener(actualRefundListener); consumer.start(); return consumer; } } //实际退款消费者 @Component public class ActualRefundListener implements MessageListenerConcurrently { @Autowired private OrderAfterSaleService orderAfterSaleService; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); ActualRefundMessage actualRefundMessage = JSONObject.parseObject(message, ActualRefundMessage.class); log.info("ActualRefundConsumer message:{}", message); JsonResult<Boolean> jsonResult = orderAfterSaleService.refundMoney(actualRefundMessage); if (!jsonResult.getSuccess()) { throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage()); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } @Service public class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... //执行退款 @Override public JsonResult<Boolean> refundMoney(ActualRefundMessage actualRefundMessage) { Long afterSaleId = actualRefundMessage.getAfterSaleId(); String key = RedisLockKeyConstants.REFUND_KEY + afterSaleId; try { boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.REFUND_MONEY_REPEAT); } AfterSaleInfoDO afterSaleInfoDO = afterSaleInfoDAO.getOneByAfterSaleId(actualRefundMessage.getAfterSaleId()); AfterSaleRefundDO afterSaleRefundDO = afterSaleRefundDAO.findOrderAfterSaleStatus(String.valueOf(afterSaleId)); //1.封装调用支付退款接口的数据 PayRefundRequest payRefundRequest = buildPayRefundRequest(actualRefundMessage, afterSaleRefundDO); //2.执行退款 if (!payApi.executeRefund(payRefundRequest)) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_REFUND_AMOUNT_FAILED); } //3.本次售后的订单条目是当前订单的最后一笔,发送事务MQ退优惠券,此时isLastReturnGoods标记是true if (actualRefundMessage.isLastReturnGoods()) { TransactionMQProducer producer = defaultProducer.getProducer(); //组装事务MQ消息体 ReleaseUserCouponRequest releaseUserCouponRequest = buildLastOrderReleasesCouponMessage(producer, afterSaleInfoDO, afterSaleId, actualRefundMessage); try { //4.发送事务消息 释放优惠券 Message message = new Message(RocketMqConstant.CANCEL_RELEASE_PROPERTY_TOPIC, JSONObject.toJSONString(releaseUserCouponRequest).getBytes(StandardCharsets.UTF_8)); TransactionSendResult result = producer.sendMessageInTransaction(message, releaseUserCouponRequest); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.REFUND_MONEY_RELEASE_COUPON_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } else { //当前售后条目非本订单的最后一笔 和 取消订单,在此更新售后状态后流程结束 //更新售后单状态 updateAfterSaleStatus(afterSaleInfoDO, AfterSaleStatusEnum.REVIEW_PASS.getCode(), AfterSaleStatusEnum.REFUNDING.getCode()); return JsonResult.buildSuccess(true); } } catch (OrderBizException e) { log.error("system error", e); return JsonResult.buildError(e.getMessage()); } finally { redisLock.unlock(key); } } private ReleaseUserCouponRequest buildLastOrderReleasesCouponMessage(TransactionMQProducer producer, AfterSaleInfoDO afterSaleInfoDO, Long afterSaleId, ActualRefundMessage actualRefundMessage) { producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { //更新售后单状态 updateAfterSaleStatus(afterSaleInfoDO, AfterSaleStatusEnum.REVIEW_PASS.getCode(), AfterSaleStatusEnum.REFUNDING.getCode()); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { //查询售后单状态是"退款中" AfterSaleInfoDO afterSaleInfoDO = afterSaleInfoDAO.getOneByAfterSaleId(afterSaleId); if (AfterSaleStatusEnum.REFUNDING.getCode().equals(afterSaleInfoDO.getAfterSaleStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); //组装释放优惠券权益消息数据 String orderId = actualRefundMessage.getOrderId(); OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); ReleaseUserCouponRequest releaseUserCouponRequest = new ReleaseUserCouponRequest(); releaseUserCouponRequest.setCouponId(orderInfoDO.getCouponId()); releaseUserCouponRequest.setUserId(orderInfoDO.getUserId()); return releaseUserCouponRequest; } ... }
9.为什么拦截履约要同步而释放资产是异步
(1)在正向订单流程中触发履约采用异步柔性事务
(2)在逆向订单流程中拦截履约采用同步刚性事务
(1)在正向订单流程中触发履约采用异步柔性事务
生成订单写库、锁优惠券、锁库存这三个动作会绑定在一个刚性事务里,从而实现"生成订单写库 + 锁优惠券 + 锁库存"的强同步效果。但是当支付成功完成支付回调后,推送订单消息进行履约时,则通过MQ异步推送。也就是说:"生单 + 锁优惠券 + 锁库存"采用了同步刚性事务,而"推送订单消息触发履约"则采用了异步柔性事务。
原因一:在正向订单流程中,生单会使用到优惠券及涉及扣减库存。如果不采用同步刚性事务,则可能会出现虽然订单已经生成了,但是用户却还是能看到其优惠券的状态还处于未使用,或库存还没扣减。这样就可能对用户造成困惑,数据产生了短暂的不一致问题。严重可能会导致用户重复使用优惠券,以及库存超卖问题。
所以"生单 + 锁优惠券 + 锁库存"采用同步刚性事务,当生单操作一旦完成,用户就可以看到优惠券已使用 + 库存已扣减。
原因二:推送订单消息触发履约可以采用异步柔性事务,是因为用户支付完一个订单后,并没有必要马上看到订单已在履约处理,所以"推送订单进行履约"是天然可以被用户接受延迟的。
(2)在逆向订单流程中拦截履约采用同步刚性事务
与正向订单流程刚好相反:"修改订单状态 + 拦截订单不被履约"采用了同步刚性事务,而"释放优惠券 + 释放库存"则采用了异步柔性事务。因为取消订单的第一要务必然是拦截履约。
如果"修改订单状态 + 拦截订单不被履约"采用了异步柔性事务,那么可能出现订单状态已经改为已取消,但是由于异步拦截履约慢了,导致订单都已经被打包发货了,这样就可能发生纠纷和资损了。
如果订单还没被支付就进行了取消,此时订单状态被修改后,可以不用立刻关注库存和优惠券。在用户看来,其还没释放的优惠券最多不能马上恢复使用,影响不严重。最差也不会出现优惠券被用户重复使用的恶劣情况,以及库存超卖问题。
10.支付回调分别互斥预支付和取消订单
首先,预支付和支付回调会加同一把锁:ORDER_PAY_KEY。然后,支付回调和取消订单也会加同一把锁:CANCEL_KEY。所以支付回调会加两把锁:ORDER_PAY_KEY和CANCEL_KEY,这可以通过Redisson的MutiLock来实现。
@Service public class OrderServiceImpl implements OrderService { ... //预支付订单 @Override @Transactional(rollbackFor = Exception.class) public PrePayOrderDTO prePayOrder(PrePayOrderRequest prePayOrderRequest) { //入参检查 checkPrePayOrderRequestParam(prePayOrderRequest); String orderId = prePayOrderRequest.getOrderId(); Integer payAmount = prePayOrderRequest.getPayAmount(); //加分布式锁(与订单支付回调时加的是同一把锁) String key = RedisLockKeyConstants.ORDER_PAY_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PRE_PAY_ERROR); } try { ... } finally { //释放分布式锁 redisLock.unlock(key); } } ... //支付回调 //支付回调有2把分布式锁的原因说明:同一笔订单在同一时间只能支付or取消 //不可以同时对一笔订单,既发起支付,又发起取消 @Override public void payCallback(PayCallbackRequest payCallbackRequest) { //入参检查 checkPayCallbackRequestParam(payCallbackRequest); String orderId = payCallbackRequest.getOrderId(); Integer payAmount = payCallbackRequest.getPayAmount(); Integer payType = payCallbackRequest.getPayType(); List<String> redisKeyList = Lists.newArrayList(); //加支付分布式锁避免支付系统并发回调 String orderPayKey = RedisLockKeyConstants.ORDER_PAY_KEY + orderId; //加取消订单分布式锁避免支付和取消订单同时操作同一笔订单 String cancelOrderKey = RedisLockKeyConstants.CANCEL_KEY + orderId; redisKeyList.add(orderPayKey); redisKeyList.add(cancelOrderKey); boolean lock = redisLock.multiLock(redisKeyList); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_ERROR); } try { ... } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { //释放分布式锁 redisLock.unMultiLock(redisKeyList); } } ... } @Service public class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... //取消订单/超时未支付取消 @Override @GlobalTransactional(rollbackFor = Exception.class) public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) { //入参检查 checkCancelOrderRequestParam(cancelOrderRequest); //分布式锁 String orderId = cancelOrderRequest.getOrderId(); String key = RedisLockKeyConstants.CANCEL_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } try { //执行取消订单 return executeCancelOrder(cancelOrderRequest, orderId); } catch (Exception e) { log.error("biz error", e); throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } } ... }
11.支付回调和取消订单互斥下的两种场景分析
支付回调的处理过程中,会对异常场景进行判断:
一.如果订单状态是"已创建"
那么会更新订单状态为已支付,并发送事务消息。
二.如果订单状态不是"已创建"
那么判断订单状态是否是取消状态,如果是取消状态,则继续判断订单是否未支付。若是未支付,则进行退款,并抛出异常。若不是未支付,则按支付方式抛出不同的异常。如果不是取消状态,则判断支付回调是否是同种支付方式。如果是,则返回。如果不是,则进行退款,并抛出异常。
@Service public class OrderServiceImpl implements OrderService { ... //支付回调 //支付回调有2把分布式锁的原因说明:同一笔订单在同一时间只能支付or取消 //不可以同时对一笔订单,既发起支付,又发起取消 @Override public void payCallback(PayCallbackRequest payCallbackRequest) { //1.入参检查 checkPayCallbackRequestParam(payCallbackRequest); String orderId = payCallbackRequest.getOrderId(); Integer payAmount = payCallbackRequest.getPayAmount(); Integer payType = payCallbackRequest.getPayType(); List<String> redisKeyList = Lists.newArrayList(); //2.加支付分布式锁避免支付系统并发回调 String orderPayKey = RedisLockKeyConstants.ORDER_PAY_KEY + orderId; //加取消订单分布式锁避免支付和取消订单同时操作同一笔订单 String cancelOrderKey = RedisLockKeyConstants.CANCEL_KEY + orderId; redisKeyList.add(orderPayKey); redisKeyList.add(cancelOrderKey); boolean lock = redisLock.multiLock(redisKeyList); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_ERROR); } try { //从数据库中查询出当前订单信息 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId); //3.校验参数 if (orderInfoDO == null || orderPaymentDetailDO == null) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL); } if (!payAmount.equals(orderInfoDO.getPayAmount())) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CALLBACK_PAY_AMOUNT_ERROR); } //4.异常场景判断 Integer orderStatus = orderInfoDO.getOrderStatus(); if (OrderStatusEnum.CREATED.getCode().equals(orderStatus)) { //如果订单状态是"已创建",直接更新订单状态为已支付,并发送事务消息 TransactionMQProducer transactionMQProducer = defaultProducer.getProducer(); transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { orderManager.updateOrderStatusPaid(payCallbackRequest, orderInfoDO, orderPaymentDetailDO); return LocalTransactionState.COMMIT_MESSAGE; } catch (BaseBizException e) { throw e; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //检查订单是否是已支付 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); if (orderInfoDO != null && OrderStatusEnum.PAID.getCode().equals(orderInfoDO.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); //发送 "订单已完成支付" 消息 sendPaidOrderSuccessMessage(transactionMQProducer, orderInfoDO); } else { //如果订单状态不是 "已创建" if (OrderStatusEnum.CANCELED.getCode().equals(orderStatus)) { //如果订单状态是取消状态 Integer payStatus = orderPaymentDetailDO.getPayStatus(); if (PayStatusEnum.UNPAID.getCode().equals(payStatus)) { //调用退款 executeOrderRefund(orderInfoDO, orderPaymentDetailDO); throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_ERROR); } else if (PayStatusEnum.PAID.getCode().equals(payStatus)) { if (payType.equals(orderPaymentDetailDO.getPayType())) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_SAME_ERROR); } else { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_NO_SAME_ERROR); } } } else { //如果订单状态不是取消状态 if (PayStatusEnum.PAID.getCode().equals(orderPaymentDetailDO.getPayStatus())) { if (payType.equals(orderPaymentDetailDO.getPayType())) { return; } //调用退款 executeOrderRefund(orderInfoDO, orderPaymentDetailDO); throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_REPEAT_ERROR); } } } } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { //释放分布式锁 redisLock.unMultiLock(redisKeyList); } } //发送订单已完成支付消息,触发订单进行履约 private void sendPaidOrderSuccessMessage(TransactionMQProducer transactionMQProducer, OrderInfoDO orderInfoDO) throws MQClientException { String orderId = orderInfoDO.getOrderId(); PaidOrderSuccessMessage message = new PaidOrderSuccessMessage(); message.setOrderId(orderId); String topic = RocketMqConstant.PAID_ORDER_SUCCESS_TOPIC; byte[] body = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8); Message mq = new Message(topic, body); TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(mq, orderInfoDO); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_SEND_MQ_ERROR); } } ... } //支付系统回调请求对象 @Data public class PayCallbackRequest extends AbstractObject implements Serializable { private static final long serialVersionUID = 3685085492927992753L; private String orderId;//订单ID private String payAccount;//支付账户 private Integer payAmount;//支付金额 private String outTradeNo;//支付系统交易单号 private Integer payType;//支付方式 private String merchantId;//商户号 private String payChannel;//支付渠道 private String appid;//微信平台 appid }
12.拦截履约的具体业务流程
流程图如下:
代码如下:
一.取消订单入口
@Service public class OrderAfterSaleServiceImpl implements OrderAfterSaleService { @Autowired private AfterSaleManager afterSaleManager; ... //取消订单/超时未支付取消 @Override @GlobalTransactional(rollbackFor = Exception.class) public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) { //入参检查 checkCancelOrderRequestParam(cancelOrderRequest); //分布式锁 String orderId = cancelOrderRequest.getOrderId(); String key = RedisLockKeyConstants.CANCEL_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } try { //执行取消订单 return executeCancelOrder(cancelOrderRequest, orderId); } catch (Exception e) { log.error("biz error", e); throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } } @Override public JsonResult<Boolean> executeCancelOrder(CancelOrderRequest cancelOrderRequest, String orderId) { //1.组装数据 OrderInfoDO orderInfoDO = findOrderInfo(orderId, cancelOrderRequest.getCancelType()); CancelOrderAssembleRequest cancelOrderAssembleRequest = buildAssembleRequest(orderId, cancelOrderRequest, orderInfoDO); if (cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus() >= OrderStatusEnum.OUT_STOCK.getCode()) { throw new OrderBizException(OrderErrorCodeEnum.CURRENT_ORDER_STATUS_CANNOT_CANCEL); } TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { //2.执行履约取消、更新订单状态、新增订单日志操作 afterSaleManager.cancelOrderFulfillmentAndUpdateOrderStatus(cancelOrderAssembleRequest); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //查询订单状态是否已更新为"已取消" OrderInfoDO orderInfoByDatabase = orderInfoDAO.getByOrderId(orderId); if (OrderStatusEnum.CANCELED.getCode().equals(orderInfoByDatabase.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); try { Message message = new Message(RocketMqConstant.RELEASE_ASSETS_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest).getBytes(StandardCharsets.UTF_8)); //3.发送事务消息 释放权益资产 TransactionSendResult result = producer.sendMessageInTransaction(message, cancelOrderAssembleRequest); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_PROCESS_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } ... }
二.拦截履约处理
@Service public class AfterSaleManagerImpl implements AfterSaleManager { @DubboReference(version = "1.0.0") private FulfillApi fulfillApi; ... @Override public void cancelOrderFulfillmentAndUpdateOrderStatus(CancelOrderAssembleRequest cancelOrderAssembleRequest) { //履约取消 cancelFulfill(cancelOrderAssembleRequest); //更新订单状态和记录订单操作日志 updateOrderStatusAndSaveOperationLog(cancelOrderAssembleRequest); } //调用履约拦截订单 private void cancelFulfill(CancelOrderAssembleRequest cancelOrderAssembleRequest) { OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); if (OrderStatusEnum.CREATED.getCode().equals(orderInfoDTO.getOrderStatus())) { return; } CancelFulfillRequest cancelFulfillRequest = orderInfoDTO.clone(CancelFulfillRequest.class); JsonResult<Boolean> jsonResult = fulfillApi.cancelFulfill(cancelFulfillRequest); if (!jsonResult.getSuccess()) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_FULFILL_ERROR); } } ... } @DubboService(version = "1.0.0", interfaceClass = FulfillApi.class, retries = 0) public class FulfillApiImpl implements FulfillApi { @Autowired private FulfillService fulfillService; @DubboReference(version = "1.0.0", retries = 0) private WmsApi wmsApi; @DubboReference(version = "1.0.0", retries = 0) private TmsApi tmsApi; ... @Override public JsonResult<Boolean> cancelFulfill(CancelFulfillRequest cancelFulfillRequest) { log.info("取消履约:request={}", JSONObject.toJSONString(cancelFulfillRequest)); //1.取消履约单 fulfillService.cancelFulfillOrder(request.getOrderId()); //2.取消捡货 wmsApi.cancelPickGoods(request.getOrderId()); //3.取消发货 tmsApi.cancelSendOut(request.getOrderId()); return JsonResult.buildSuccess(true); } ... } @Service public class FulfillServiceImpl implements FulfillService { @Autowired private OrderFulfillDAO orderFulfillDAO; @Autowired private OrderFulfillItemDAO orderFulfillItemDAO; ... @Override public void cancelFulfillOrder(String orderId) { //1.查询履约单 OrderFulfillDO orderFulfill = orderFulfillDAO.getOne(orderId); //2.移除履约单 if (null != orderFulfill) { orderFulfillDAO.removeById(orderFulfill.getId()); //3.查询履约单条目 List<OrderFulfillItemDO> fulfillItems = orderFulfillItemDAO.listByFulfillId(orderFulfill.getFulfillId()); //4.移除履约单条目 for (OrderFulfillItemDO item : fulfillItems) { orderFulfillItemDAO.removeById(item.getId()); } } } ... } @DubboService(version = "1.0.0", interfaceClass = WmsApi.class, retries = 0) public class WmsApiImpl implements WmsApi { ... @Transactional(rollbackFor = Exception.class) @Override public JsonResult<Boolean> cancelPickGoods(String orderId) { log.info("取消捡货,orderId={}", orderId); //1.查询出库单 List<DeliveryOrderDO> deliveryOrders = deliveryOrderDAO.listByOrderId(orderId); //2.移除出库单和条目 if (CollectionUtils.isNotEmpty(deliveryOrders)) { for (DeliveryOrderDO order : deliveryOrders) { List<DeliveryOrderItemDO> items = deliveryOrderItemDAO.listByDeliveryOrderId(order.getDeliveryOrderId()); for (DeliveryOrderItemDO item : items) { deliveryOrderItemDAO.removeById(item.getId()); } deliveryOrderDAO.removeById(order.getId()); } } return JsonResult.buildSuccess(true); } ... } @DubboService(version = "1.0.0", interfaceClass = TmsApi.class, retries = 0) public class TmsApiImpl implements TmsApi { ... @Transactional(rollbackFor = Exception.class) @Override public JsonResult<Boolean> cancelSendOut(String orderId) { log.info("取消发货,orderId={}", orderId); //1.查询物流单 List<LogisticOrderDO> logisticOrders = logisticOrderDAO.listByOrderId(orderId); //2.移除物流单 for (LogisticOrderDO order : logisticOrders) { logisticOrderDAO.removeById(order.getId()); } return JsonResult.buildSuccess(true); } ... }
13.拦截履约和取消订单的Seata事务原理分析
(1)取消订单接口使用Seata事务的AT模式
(2)Seata事务AT模式的原理
(1)取消订单接口使用Seata事务的AT模式
如果拦截履约时发现,订单已经出库配送,那么就会拦截失败。当拦截失败时,就不能更新订单状态了,可使用Seata的刚性事务实现。具体就是,在分布式事务入口添加@GlobalTransactional注解,在各分支事务添加@Transactional注解。
@Service public class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... //取消订单/超时未支付取消 @Override @GlobalTransactional(rollbackFor = Exception.class) public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) { //入参检查 checkCancelOrderRequestParam(cancelOrderRequest); //分布式锁 String orderId = cancelOrderRequest.getOrderId(); String key = RedisLockKeyConstants.CANCEL_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } try { //执行取消订单 return executeCancelOrder(cancelOrderRequest, orderId); } catch (Exception e) { log.error("biz error", e); throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } } @Override public JsonResult<Boolean> executeCancelOrder(CancelOrderRequest cancelOrderRequest, String orderId) { //1.组装数据 OrderInfoDO orderInfoDO = findOrderInfo(orderId, cancelOrderRequest.getCancelType()); CancelOrderAssembleRequest cancelOrderAssembleRequest = buildAssembleRequest(orderId, cancelOrderRequest, orderInfoDO); if (cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus() >= OrderStatusEnum.OUT_STOCK.getCode()) { throw new OrderBizException(OrderErrorCodeEnum.CURRENT_ORDER_STATUS_CANNOT_CANCEL); } TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { //2.执行履约取消、更新订单状态、新增订单日志操作 afterSaleManager.cancelOrderFulfillmentAndUpdateOrderStatus(cancelOrderAssembleRequest); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //查询订单状态是否已更新为"已取消" OrderInfoDO orderInfoByDatabase = orderInfoDAO.getByOrderId(orderId); if (OrderStatusEnum.CANCELED.getCode().equals(orderInfoByDatabase.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); try { Message message = new Message(RocketMqConstant.RELEASE_ASSETS_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest).getBytes(StandardCharsets.UTF_8)); //3.发送事务消息 释放权益资产 TransactionSendResult result = producer.sendMessageInTransaction(message, cancelOrderAssembleRequest); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_PROCESS_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } ... }
(2)Seata事务AT模式的原理
一.取消订单接口的入口会向Seata Server注册一个全局事务XID 二.调用的履约服务会向Seata Server注册一个分支事务Branch ID 三.在履约服务更新数据前,要先获取本地锁 四.在履约服务在获取本地锁成功后,会插入undo log表数据 五.然后,履约服务会向Seata Server服务器获取全局锁 六.接着,履约服务才提交本地事务并释放本地锁 七.最后,履约服务会向Seata Server上报分支事务成功
14.取消订单全链路Seata回滚原理与并发分析
(1)回滚原理
(2)并发分析
(1)回滚原理
如果分支事务出现异常,就会上报Seata Server需要回滚全局事务。如果某些分支事务还没提交,那么直接就不需要执行了。如果某些分支事务已经提交成功,那么就根据undo log进行数据恢复。
(2)并发分析
拦截履约的各分支事务并不会出现并发竞争全局锁的问题。因为取消履约、取消出库、取消配送都是针对一个订单操作,同一时刻不会出现大量对一个订单的操作。
15.支付退款时的双异步设计原因(提升性能 + 解耦)
支付退款的时候会有两个消费者(双异步):一个是退款准备消费者,一个是实际退款消费者。如果两个消费者合并成一个消费者,可能会出现两个问题。
问题一:退款速度可能会比较慢,因为一个消费者需要处理的事情多了。
问题二:如果第三方退款接口出现异常或报错,耦合比较严重特别不合理。这时计算好的数据和插入的记录,可能需要回滚或者重新消费消息。在故障情况下,第三方一直报错,那么系统就一直回滚,而且还会导致用户不能及时通过售后记录查看售后信息。
16.释放资产消息的高扩展性设计(多路发送消息)
订单系统的取消订单接口被调用时,会先发送一个释放资产的消息到MQ,然后订单系统会消费释放资产的消息。在消费释放资产的消息时,才发送具体释放哪些资产的消息到MQ,接着各个系统才能对各自监听的释放资产消息进行消费。
如果取消订单的核心业务代码,直接把具体的释放资产消息到MQ。那么当资产种类增加时,比如增加虚拟币、积分、权益等各种资产,就需要修改订单系统取消订单的核心业务代码了,扩展性会很差。
如果取消订单的核心业务代码,只发一个释放资产的消息到MQ。那么后续增加资产种类时,只需在消费释放资产的消息处进行改动即可,这样就可以大大提高了代码的扩展性。
17.取消订单链路中数据库事务与MQ消息不一致问题
(1)数据库事务与MQ消息不一致情况
(2)取消订单全链路数据不丢失的方案设计
(1)数据库事务与MQ消息不一致情况
在v1版本的取消订单接口中,由于"更新订单状态 + 取消履约 + 发送释放资产消息到MQ"属于刚性事务,所以其中任何一个操作出现Exception,都会进行全局回滚保证数据一致。但是更新数据库和发送消息到MQ,还是可能会出现数据不一致的情况。
比如代码顺序是:先发送释放资产消息,再更新订单状态,再取消履约。先发送释放资产消息到MQ成功,更新订单状态也成功,但取消履约失败。那么此时回滚只能回滚更新订单状态,发送到MQ的消息却不能回滚了。所以会存在这种不一致的风险,因此需要一种强一致的方案。
如下是v1版本的取消订单实现:
@Service public class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... //取消订单/超时未支付取消 @Override public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) { //入参检查 checkCancelOrderRequestParam(cancelOrderRequest); //分布式锁 String orderId = cancelOrderRequest.getOrderId(); String key = RedisLockKeyConstants.CANCEL_KEY + orderId; try { boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } //执行取消订单 executeCancelOrder(cancelOrderRequest, orderId); return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } } @Override @GlobalTransactional(rollbackFor = Exception.class) public void executeCancelOrder(CancelOrderRequest cancelOrderRequest, String orderId) { //1.组装数据 CancelOrderAssembleRequest cancelOrderAssembleRequest = buildAssembleRequest(orderId, cancelOrderRequest); //幂等校验:防止多个线程同时操作取消同一笔订单 if (OrderStatusEnum.CANCELED.getCode().equals(cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus())) { return; } //2.检验订单支付状态 checkOrderPayStatus(cancelOrderAssembleRequest); //3.更新订单状态和记录订单操作日志 updateOrderStatusAndSaveOperationLog(cancelOrderAssembleRequest); //超时未支付的订单不用继续再往下执行取消履约和释放资产 if (OrderStatusEnum.PAID.getCode() > cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus()) { return; } //4.履约取消 cancelFulfill(cancelOrderAssembleRequest); //5.发送释放资产消息到MQ defaultProducer.sendMessage(RocketMqConstant.RELEASE_ASSETS_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest), "释放资产"); } ... }
(2)取消订单全链路数据不丢失的方案设计
为保证取消订单的链路中,数据库的更新事务与推送给MQ的消息强一致,需要将两者包裹在一个事务中,保证它们要么一起成功,要么一起失败。于是,就需要通过RocketMQ的事务机制来实现了。
如下是v2版本的取消订单实现,保证了数据库事务与MQ消息强一致。
@Service public class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... //取消订单/超时未支付取消 @Override @GlobalTransactional(rollbackFor = Exception.class) public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) { //入参检查 checkCancelOrderRequestParam(cancelOrderRequest); //分布式锁 String orderId = cancelOrderRequest.getOrderId(); String key = RedisLockKeyConstants.CANCEL_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } try { //执行取消订单 return executeCancelOrder(cancelOrderRequest, orderId); } catch (Exception e) { log.error("biz error", e); throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } } @Override public JsonResult<Boolean> executeCancelOrder(CancelOrderRequest cancelOrderRequest, String orderId) { //1.组装数据 OrderInfoDO orderInfoDO = findOrderInfo(orderId, cancelOrderRequest.getCancelType()); CancelOrderAssembleRequest cancelOrderAssembleRequest = buildAssembleRequest(orderId, cancelOrderRequest, orderInfoDO); if (cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus() >= OrderStatusEnum.OUT_STOCK.getCode()) { throw new OrderBizException(OrderErrorCodeEnum.CURRENT_ORDER_STATUS_CANNOT_CANCEL); } TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { //2.执行履约取消、更新订单状态、新增订单日志操作 afterSaleManager.cancelOrderFulfillmentAndUpdateOrderStatus(cancelOrderAssembleRequest); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //查询订单状态是否已更新为"已取消" OrderInfoDO orderInfoByDatabase = orderInfoDAO.getByOrderId(orderId); if (OrderStatusEnum.CANCELED.getCode().equals(orderInfoByDatabase.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); try { Message message = new Message(RocketMqConstant.RELEASE_ASSETS_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest).getBytes(StandardCharsets.UTF_8)); //3.发送事务消息 释放权益资产 TransactionSendResult result = producer.sendMessageInTransaction(message, cancelOrderAssembleRequest); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_PROCESS_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } ... }
18.释放资产多路MQ故障重试与幂等方案
如果发送消息到MQ过程中出现故障,那么就会通过返回RECONSUME_LATER来进行重试。
此外,消费消息时调用的接口可能会出现同一请求多次被调用。因此,必须对调用的接口,使用分布式锁 + 状态前置校验进行幂等处理。加分布式锁是为了防止并非请求进来后可能会避开状态的前置校验。
@Configuration public class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; ... //释放资产消息消费者 @Bean("releaseAssetsConsumer") public DefaultMQPushConsumer releaseAssetsConsumer(ReleaseAssetsListener releaseAssetsListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RELEASE_ASSETS_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RELEASE_ASSETS_TOPIC, "*"); consumer.registerMessageListener(releaseAssetsListener); consumer.start(); return consumer; } } //监听并消费释放资产消息 @Component public class ReleaseAssetsListener implements MessageListenerConcurrently { @Autowired private DefaultProducer defaultProducer; @Autowired private OrderItemDAO orderItemDAO; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { //1.消费到释放资产message String message = new String(messageExt.getBody()); log.info("ReleaseAssetsListener message:{}", message); CancelOrderAssembleRequest cancelOrderAssembleRequest = JSONObject.parseObject(message, CancelOrderAssembleRequest.class); OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); //2.发送取消订单退款请求MQ if (orderInfoDTO.getOrderStatus() > OrderStatusEnum.CREATED.getCode()) { defaultProducer.sendMessage(RocketMqConstant.CANCEL_REFUND_REQUEST_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest), "取消订单退款"); } //3.发送释放库存MQ ReleaseProductStockRequest releaseProductStockRequest = buildReleaseProductStock(orderInfoDTO, orderItemDAO); defaultProducer.sendMessage(RocketMqConstant.CANCEL_RELEASE_INVENTORY_TOPIC, JSONObject.toJSONString(releaseProductStockRequest), "取消订单释放库存"); //4.发送释放优惠券MQ if (!Strings.isNullOrEmpty(orderInfoDTO.getCouponId())) { ReleaseUserCouponRequest releaseUserCouponRequest = buildReleaseUserCoupon(orderInfoDTO); defaultProducer.sendMessage(RocketMqConstant.CANCEL_RELEASE_PROPERTY_TOPIC, JSONObject.toJSONString(releaseUserCouponRequest), "取消订单释放优惠券"); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } ... } @Configuration public class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; //释放优惠券消息消费者 @Bean("releaseInventoryConsumer") public DefaultMQPushConsumer releaseInventoryConsumer(ReleasePropertyListener releasePropertyListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.RELEASE_PROPERTY_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.CANCEL_RELEASE_PROPERTY_TOPIC, "*"); consumer.registerMessageListener(releasePropertyListener); consumer.start(); return consumer; } } @Component public class ReleasePropertyListener implements MessageListenerConcurrently { @DubboReference(version = "1.0.0") private MarketApi marketApi; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt msg : list) { String content = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("ReleasePropertyConsumer message:{}", content); ReleaseUserCouponRequest releaseUserCouponRequest = JSONObject.parseObject(content, ReleaseUserCouponRequest.class); //释放优惠券 JsonResult<Boolean> jsonResult = marketApi.releaseUserCoupon(releaseUserCouponRequest); if (!jsonResult.getSuccess()) { throw new MarketBizException(MarketErrorCodeEnum.CONSUME_MQ_FAILED); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } @DubboService(version = "1.0.0", interfaceClass = MarketApi.class, retries = 0) public class MarketApiImpl implements MarketApi { ... //回退用户使用的优惠券 @Override public JsonResult<Boolean> releaseUserCoupon(ReleaseUserCouponRequest releaseUserCouponRequest) { log.info("开始执行回滚优惠券,couponId:{}", releaseUserCouponRequest.getCouponId()); //分布式锁 String couponId = releaseUserCouponRequest.getCouponId(); String key = RedisLockKeyConstants.RELEASE_COUPON_KEY + couponId; boolean lock = redisLock.lock(key); if (!lock) { throw new MarketBizException(MarketErrorCodeEnum.RELEASE_COUPON_FAILED); } try { //执行释放优惠券 Boolean result = couponService.releaseUserCoupon(releaseUserCouponRequest); return JsonResult.buildSuccess(result); } catch (MarketBizException e) { log.error("biz error", e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error", e); return JsonResult.buildError(e.getMessage()); } finally { redisLock.unlock(key); } } ... } @Service public class CouponServiceImpl implements CouponService { ... //释放用户优惠券 @Override public Boolean releaseUserCoupon(ReleaseUserCouponRequest releaseUserCouponRequest) { String userId = releaseUserCouponRequest.getUserId(); String couponId = releaseUserCouponRequest.getCouponId(); CouponDO couponAchieve = couponDAO.getUserCoupon(userId, couponId); if (CouponUsedStatusEnum.UN_USED.getCode().equals(couponAchieve.getUsed())) { log.info("当前用户未使用优惠券,不用回退,userId:{},couponId:{}", userId, couponId); return true; } couponAchieve.setUsed(CouponUsedStatusEnum.UN_USED.getCode()); couponAchieve.setUsedTime(null); couponDAO.updateById(couponAchieve); return true; } ... }
19.双异步支付退款不一致问题分析
在退款准备消费者中,如果先写售后记录到数据库,再发退款消息到MQ。如果写入售后记录到数据库成功了,但发送实际退款消息到MQ却失败了,那么此时就会产生数据库与MQ的数据不一致问题。
在退款准备消费者中,如果先发退款消息到MQ,再写售后记录到数据库。如果发送退款消息到MQ成功了,但写入售后记录到数据库却失败了,那么同样会产生数据库与MQ的数据不一致问题。
此时就可以使用RocketMQ的事务消息机制;
@Configuration public class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; ... //消费退款准备请求消息消费者 @Bean("cancelRefundConsumer") public DefaultMQPushConsumer cancelRefundConsumer(CancelRefundListener cancelRefundListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.REQUEST_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.CANCEL_REFUND_REQUEST_TOPIC, "*"); consumer.registerMessageListener(cancelRefundListener); consumer.start(); return consumer; } ... } //消费退款准备请求消息 @Component public class CancelRefundListener implements MessageListenerConcurrently { @Autowired private OrderAfterSaleService orderAfterSaleService; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); CancelOrderAssembleRequest cancelOrderAssembleRequest = JSONObject.parseObject(message, CancelOrderAssembleRequest.class); log.info("CancelRefundConsumer message:{}", message); //执行 取消订单/超时未支付取消 前的操作 JsonResult<Boolean> jsonResult = orderAfterSaleService.processCancelOrder(cancelOrderAssembleRequest); if (!jsonResult.getSuccess()) { throw new OrderBizException(OrderErrorCodeEnum.CONSUME_MQ_FAILED); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } @Service public class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... @Override public JsonResult<Boolean> processCancelOrder(CancelOrderAssembleRequest cancelOrderAssembleRequest) { String orderId = cancelOrderAssembleRequest.getOrderId(); //分布式锁 String key = RedisLockKeyConstants.REFUND_KEY + orderId; try { boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.PROCESS_REFUND_REPEAT); } //执行退款前的准备工作 //生成售后订单号 OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); OrderInfoDO orderInfoDO = orderInfoDTO.clone(OrderInfoDO.class); String afterSaleId = orderNoManager.genOrderId(OrderNoTypeEnum.AFTER_SALE.getCode(), orderInfoDO.getUserId()); //1.计算 取消订单 退款金额 CancelOrderRefundAmountDTO cancelOrderRefundAmountDTO = calculatingCancelOrderRefundAmount(cancelOrderAssembleRequest); cancelOrderAssembleRequest.setCancelOrderRefundAmountDTO(cancelOrderRefundAmountDTO); TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { //2.取消订单操作 记录售后信息 afterSaleManager.insertCancelOrderAfterSale(cancelOrderAssembleRequest, AfterSaleStatusEnum.REVIEW_PASS.getCode(), orderInfoDO, afterSaleId); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //查询售后数据是否插入成功 AfterSaleInfoDO afterSaleInfoDO = afterSaleInfoDAO.getOneByAfterSaleId(Long.valueOf(afterSaleId)); List<AfterSaleItemDO> afterSaleItemDOList = afterSaleItemDAO.listByAfterSaleId(Long.valueOf(afterSaleId)); List<AfterSaleLogDO> afterSaleLogDOList = afterSaleLogDAO.listByAfterSaleId(Long.valueOf(afterSaleId)); List<AfterSaleRefundDO> afterSaleRefundDOList = afterSaleRefundDAO.listByAfterSaleId(Long.valueOf(afterSaleId)); if (afterSaleInfoDO != null && afterSaleItemDOList.isEmpty() && afterSaleLogDOList.isEmpty() && afterSaleRefundDOList.isEmpty()) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); try { //3.组装事务MQ消息 ActualRefundMessage actualRefundMessage = new ActualRefundMessage(); actualRefundMessage.setOrderId(cancelOrderAssembleRequest.getOrderId()); actualRefundMessage.setLastReturnGoods(cancelOrderAssembleRequest.isLastReturnGoods()); actualRefundMessage.setAfterSaleId(Long.valueOf(afterSaleId)); Message message = new Message(RocketMqConstant.ACTUAL_REFUND_TOPIC, JSONObject.toJSONString(actualRefundMessage).getBytes(StandardCharsets.UTF_8)); //4.发送事务MQ消息--实际退款消息 TransactionSendResult result = producer.sendMessageInTransaction(message, actualRefundMessage); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.PROCESS_REFUND_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } finally { redisLock.unlock(key); } } ... }