订单初版—7.支付和履约实现的重构文档

大纲

1.预支付到完成支付的业务流程

2.支付回调到推送履约的代码流程

3.支付回调到推送履约的双异步设计

4.发送订单履约的RocketMQ事务消息的原理

5.RocketMQ事务消息的实现原理

6.支付回调到发送履约的RocketMQ事务消息代码

7.履约场景引入Saga模式 + 状态机

8.履约流程的Seata Saga状态节点定义 + 节点流转

9.履约流程的Seata Saga失败补偿流程

10.Seata Saga状态机流转原理

11.履约系统的Seata Saga代码实现

12.履约系统的Seata Saga空回滚和悬挂问题

 

1.预支付到完成支付的业务流程

订单正向链路有三个核心环节:生成订单 + 支付 + 履约。

订单初版—7.支付和履约实现的重构文档

 

2.支付回调到推送履约的代码流程

(1)支付回调成功,更新订单状态 + 发送订单已完成支付的事务消息

(2)订单系统消费已完成支付的消息,更新订单履约状态 + 发送订单履约的消息

(3)履约系统消费触发订单履约的消息,进行具体的订单履约处理

订单初版—7.支付和履约实现的重构文档

(1)支付回调成功,更新订单状态 + 发送订单已完成支付的事务消息

@DubboService(version = "1.0.0", interfaceClass = OrderApi.class, retries = 0) public class OrderApiImpl implements OrderApi {     @Autowired     private OrderService orderService;     ...      //支付回调接口     @Override     public JsonResult<Boolean> payCallback(PayCallbackRequest payCallbackRequest) {         try {             orderService.payCallback(payCallbackRequest);             return JsonResult.buildSuccess(true);         } catch (OrderBizException e) {             log.error("biz error", e);             return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());         } catch (Exception e) {             log.error("error", e);             return JsonResult.buildError(e.getMessage());         }     }     ... }  @Service public class OrderServiceImpl implements OrderService {     @Autowired     private OrderInfoDAO orderInfoDAO;      @Autowired     private OrderNoManager orderNoManager;      @Autowired     private DefaultProducer defaultProducer;      @Autowired     private RedisLock redisLock;     ...      //支付回调     //支付回调有2把分布式锁的原因说明:同一笔订单在同一时间只能支付or取消     //不可以同时对一笔订单,既发起支付,又发起取消     @Override     public void payCallback(PayCallbackRequest payCallbackRequest) {         //1.入参检查         checkPayCallbackRequestParam(payCallbackRequest);         String orderId = payCallbackRequest.getOrderId();         Integer payAmount = payCallbackRequest.getPayAmount();         Integer payType = payCallbackRequest.getPayType();         List<String> redisKeyList = Lists.newArrayList();          //2.加支付分布式锁避免支付系统并发回调         String orderPayKey = RedisLockKeyConstants.ORDER_PAY_KEY + orderId;         //加取消订单分布式锁避免支付和取消订单同时操作同一笔订单         String cancelOrderKey = RedisLockKeyConstants.CANCEL_KEY + orderId;         redisKeyList.add(orderPayKey);         redisKeyList.add(cancelOrderKey);         boolean lock = redisLock.multiLock(redisKeyList);         if (!lock) {             throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_ERROR);         }          try {             //从数据库中查询出当前订单信息             OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);             OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId);              //3.校验参数             if (orderInfoDO == null || orderPaymentDetailDO == null) {                 throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL);             }             if (!payAmount.equals(orderInfoDO.getPayAmount())) {                 throw new OrderBizException(OrderErrorCodeEnum.ORDER_CALLBACK_PAY_AMOUNT_ERROR);             }              //4.异常场景判断             Integer orderStatus = orderInfoDO.getOrderStatus();             if (OrderStatusEnum.CREATED.getCode().equals(orderStatus)) {                 //如果订单状态是"已创建",直接更新订单状态为已支付,并发送事务消息                 TransactionMQProducer transactionMQProducer = defaultProducer.getProducer();                 transactionMQProducer.setTransactionListener(new TransactionListener() {                     @Override                     public LocalTransactionState executeLocalTransaction(Message message, Object o) {                         try {                             orderManager.updateOrderStatusPaid(payCallbackRequest, orderInfoDO, orderPaymentDetailDO);                             return LocalTransactionState.COMMIT_MESSAGE;                         } catch (BaseBizException e) {                             throw e;                         } catch (Exception e) {                             log.error("system error", e);                             return LocalTransactionState.ROLLBACK_MESSAGE;                         }                     }                      //回查接口                     @Override                     public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {                         //检查订单是否是已支付                         OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);                         if (orderInfoDO != null && OrderStatusEnum.PAID.getCode().equals(orderInfoDO.getOrderStatus())) {                             return LocalTransactionState.COMMIT_MESSAGE;                         }                         return LocalTransactionState.ROLLBACK_MESSAGE;                     }                 });                 //发送 "订单已完成支付" 消息                 sendPaidOrderSuccessMessage(transactionMQProducer, orderInfoDO);             } else {                 //如果订单状态不是 "已创建"                 if (OrderStatusEnum.CANCELED.getCode().equals(orderStatus)) {                     //如果订单那状态是取消状态                     Integer payStatus = orderPaymentDetailDO.getPayStatus();                     if (PayStatusEnum.UNPAID.getCode().equals(payStatus)) {                         //调用退款                         executeOrderRefund(orderInfoDO, orderPaymentDetailDO);                         throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_ERROR);                     } else if (PayStatusEnum.PAID.getCode().equals(payStatus)) {                         if (payType.equals(orderPaymentDetailDO.getPayType())) {                             throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_SAME_ERROR);                         } else {                             throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_NO_SAME_ERROR);                         }                     }                 } else {                     //如果订单状态不是取消状态                     if (PayStatusEnum.PAID.getCode().equals(orderPaymentDetailDO.getPayStatus())) {                         if (payType.equals(orderPaymentDetailDO.getPayType())) {                             return;                         }                         //调用退款                         executeOrderRefund(orderInfoDO, orderPaymentDetailDO);                         throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_REPEAT_ERROR);                     }                 }             }         } catch (Exception e) {             throw new OrderBizException(e.getMessage());         } finally {             //释放分布式锁             redisLock.unMultiLock(redisKeyList);         }     }      //发送订单已完成支付消息,触发订单进行履约     private void sendPaidOrderSuccessMessage(TransactionMQProducer transactionMQProducer, OrderInfoDO orderInfoDO) throws MQClientException {         String orderId = orderInfoDO.getOrderId();         PaidOrderSuccessMessage message = new PaidOrderSuccessMessage();         message.setOrderId(orderId);         String topic = RocketMqConstant.PAID_ORDER_SUCCESS_TOPIC;         byte[] body = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8);         Message mq = new Message(topic, body);         TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(mq, orderInfoDO);         if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {             throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_SEND_MQ_ERROR);         }     }     ... }  @Service public class OrderManagerImpl implements OrderManager {     ...     //支付回调更新订单状态     @Transactional(rollbackFor = Exception.class)     @Override     public void updateOrderStatusPaid(PayCallbackRequest payCallbackRequest, OrderInfoDO orderInfoDO, OrderPaymentDetailDO orderPaymentDetailDO) {         //主单信息         String orderId = payCallbackRequest.getOrderId();         Integer preOrderStatus = orderInfoDO.getOrderStatus();         orderInfoDO.setOrderStatus(OrderStatusEnum.PAID.getCode());         orderInfoDAO.updateById(orderInfoDO);          //主单支付信息         orderPaymentDetailDO.setPayStatus(PayStatusEnum.PAID.getCode());         orderPaymentDetailDAO.updateById(orderPaymentDetailDO);          //新增订单状态变更日志         OrderOperateLogDO orderOperateLogDO = new OrderOperateLogDO();         orderOperateLogDO.setOrderId(orderId);         orderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode());         orderOperateLogDO.setPreStatus(preOrderStatus);         orderOperateLogDO.setCurrentStatus(orderInfoDO.getOrderStatus());         orderOperateLogDO.setRemark("订单支付回调操作" + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus());         orderOperateLogDAO.save(orderOperateLogDO);          //判断是否存在子订单         List<OrderInfoDO> subOrderInfoDOList = orderInfoDAO.listByParentOrderId(orderId);         if (subOrderInfoDOList != null && !subOrderInfoDOList.isEmpty()) {             //先将主订单状态设置为无效订单             Integer newPreOrderStatus = orderInfoDO.getOrderStatus();             orderInfoDO.setOrderStatus(OrderStatusEnum.INVALID.getCode());             orderInfoDAO.updateById(orderInfoDO);              //新增订单状态变更日志             OrderOperateLogDO newOrderOperateLogDO = new OrderOperateLogDO();             newOrderOperateLogDO.setOrderId(orderId);             newOrderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode());             newOrderOperateLogDO.setPreStatus(newPreOrderStatus);             newOrderOperateLogDO.setCurrentStatus(OrderStatusEnum.INVALID.getCode());             orderOperateLogDO.setRemark("订单支付回调操作,主订单状态变更" + newOrderOperateLogDO.getPreStatus() + "-" + newOrderOperateLogDO.getCurrentStatus());             orderOperateLogDAO.save(newOrderOperateLogDO);              //再更新子订单的状态             for (OrderInfoDO subOrderInfo : subOrderInfoDOList) {                 Integer subPreOrderStatus = subOrderInfo.getOrderStatus();                 subOrderInfo.setOrderStatus(OrderStatusEnum.PAID.getCode());                 orderInfoDAO.updateById(subOrderInfo);                 //更新子订单的支付明细状态                 String subOrderId = subOrderInfo.getOrderId();                 OrderPaymentDetailDO subOrderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(subOrderId);                 if (subOrderPaymentDetailDO != null) {                     subOrderPaymentDetailDO.setPayStatus(PayStatusEnum.PAID.getCode());                     orderPaymentDetailDAO.updateById(subOrderPaymentDetailDO);                 }                  //新增订单状态变更日志                 OrderOperateLogDO subOrderOperateLogDO = new OrderOperateLogDO();                 subOrderOperateLogDO.setOrderId(subOrderId);                 subOrderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode());                 subOrderOperateLogDO.setPreStatus(subPreOrderStatus);                 subOrderOperateLogDO.setCurrentStatus(OrderStatusEnum.PAID.getCode());                 orderOperateLogDO.setRemark("订单支付回调操作,子订单状态变更" + subOrderOperateLogDO.getPreStatus() + "-" + subOrderOperateLogDO.getCurrentStatus());                 orderOperateLogDAO.save(subOrderOperateLogDO);             }         }     }     ... }

(2)订单系统消费已完成支付的消息,更新订单履约状态 + 发送订单履约的消息

//订单系统进行消费 @Configuration public class ConsumerConfig {     @Autowired     private RocketMQProperties rocketMQProperties;      //订单完成支付消息消费者     @Bean("paidOrderSuccessConsumer")     public DefaultMQPushConsumer paidOrderSuccessConsumer(PaidOrderSuccessListener paidOrderSuccessListener) throws MQClientException {         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PAID_ORDER_SUCCESS_CONSUMER_GROUP);         consumer.setNamesrvAddr(rocketMQProperties.getNameServer());         consumer.subscribe(PAID_ORDER_SUCCESS_TOPIC, "*");         consumer.registerMessageListener(paidOrderSuccessListener);         consumer.start();         return consumer;     }     ... }  //订单系统监听订单支付完成的消息 @Component public class PaidOrderSuccessListener implements MessageListenerConcurrently {     @Autowired     private OrderInfoDAO orderInfoDAO;      @Autowired     private OrderFulFillService orderFulFillService;      @Autowired     private RedisLock redisLock;      @Autowired     private DefaultProducer defaultProducer;      @Override     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {         try {             for (MessageExt messageExt : list) {                 String message = new String(messageExt.getBody());                 PaidOrderSuccessMessage paidOrderSuccessMessage = JSON.parseObject(message, PaidOrderSuccessMessage.class);                 String orderId = paidOrderSuccessMessage.getOrderId();                 log.info("触发订单履约,orderId:{}", orderId);                 OrderInfoDO order = orderInfoDAO.getByOrderId(orderId);                 if (Objects.isNull(order)) {                     throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL);                 }                  //1.加分布式锁 + 里面的履约前置状态校验防止消息重复消费                 String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId;                 if (!redisLock.lock(key)) {                     log.error("order has not acquired lock,cannot fulfill, orderId={}", orderId);                     throw new BaseBizException(OrderErrorCodeEnum.ORDER_FULFILL_ERROR);                 }                  try {                     //2.进行订单履约逻辑                     TransactionMQProducer producer = defaultProducer.getProducer();                     producer.setTransactionListener(new TransactionListener() {                         @Override                         public LocalTransactionState executeLocalTransaction(Message message, Object o) {                             try {                                 orderFulFillService.triggerOrderFulFill(orderId);                                 return LocalTransactionState.COMMIT_MESSAGE;                             } catch (BaseBizException e) {                                 throw e;                             } catch (Exception e) {                                 log.error("system error", e);                                 return LocalTransactionState.ROLLBACK_MESSAGE;                             }                         }                          //回查接口                         @Override                         public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {                             //检查订单是否"已履约"状态                             OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);                             if (orderInfoDO != null && OrderStatusEnum.FULFILL.getCode().equals(orderInfoDO.getOrderStatus())) {                                 return LocalTransactionState.COMMIT_MESSAGE;                             }                             return LocalTransactionState.ROLLBACK_MESSAGE;                         }                     });                     ReceiveFulfillRequest receiveFulfillRequest = orderFulFillService.buildReceiveFulFillRequest(order);                     String topic = TRIGGER_ORDER_FULFILL_TOPIC;                     byte[] body = JSON.toJSONString(receiveFulfillRequest).getBytes(StandardCharsets.UTF_8);                     Message mq = new Message(topic, body);                     producer.sendMessageInTransaction(mq, order);                 } finally {                     redisLock.unlock(key);                 }             }             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;         } catch (Exception e) {             log.error("consumer error", e);             //本地业务逻辑执行失败,触发消息重新消费             return ConsumeConcurrentlyStatus.RECONSUME_LATER;         }     } }  @Service public class OrderFulFillServiceImpl implements OrderFulFillService {     @Autowired     private OrderInfoDAO orderInfoDAO;      @Autowired     private OrderOperateLogDAO orderOperateLogDAO;     ...      @Transactional(rollbackFor = Exception.class)     @Override     public void triggerOrderFulFill(String orderId) throws OrderBizException {         //1.查询订单         OrderInfoDO order = orderInfoDAO.getByOrderId(orderId);         if (Objects.isNull(order)) {             return;         }          //2.校验订单是否已支付         OrderStatusEnum orderStatus = OrderStatusEnum.getByCode(order.getOrderStatus());         if (!OrderStatusEnum.PAID.equals(orderStatus)) {             log.info("order has not been paid,cannot fulfill, orderId={}", order.getOrderId());             return;         }          //3.更新订单状态为:已履约         orderInfoDAO.updateOrderStatus(orderId, OrderStatusEnum.PAID.getCode(), OrderStatusEnum.FULFILL.getCode());          //4.并插入一条订单变更记录         orderOperateLogDAO.save(orderOperateLogFactory.get(order, OrderStatusChangeEnum.ORDER_FULFILLED));     }     ... }

(3)履约系统消费触发订单履约的消息,进行具体的订单履约处理

//履约系统消费 @Configuration public class ConsumerConfig {     @Autowired     private RocketMQProperties rocketMQProperties;      //触发订单履约消息消费者     @Bean("triggerOrderFulfillConsumer")     public DefaultMQPushConsumer triggerOrderFulfillConsumer(TriggerOrderFulfillTopicListener triggerOrderFulfillTopicListener) throws MQClientException {         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(TRIGGER_ORDER_FULFILL_CONSUMER_GROUP);         consumer.setNamesrvAddr(rocketMQProperties.getNameServer());         consumer.subscribe(TRIGGER_ORDER_FULFILL_TOPIC, "*");         consumer.registerMessageListener(triggerOrderFulfillTopicListener);         consumer.start();         return consumer;     }     ... }  //监听并消费订单履约消息 @Component public class TriggerOrderFulfillTopicListener implements MessageListenerConcurrently {     @Autowired     private FulfillService fulfillService;//履约服务      @Override     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {         try {             for (MessageExt messageExt : list) {                 String message = new String(messageExt.getBody());                 ReceiveFulfillRequest request = JSON.parseObject(message, ReceiveFulfillRequest.class);                 fulfillService.receiveOrderFulFill(request);             }             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;         } catch (Exception e) {             log.error("consumer error", e);             return ConsumeConcurrentlyStatus.RECONSUME_LATER;         }     } }  @Service public class FulfillServiceImpl implements FulfillService {     ...     @Override     public Boolean receiveOrderFulFill(ReceiveFulfillRequest request) {         log.info("接受订单履约成功,request={}", JSONObject.toJSONString(request));         String orderId = request.getOrderId();          //加分布式锁(防止重复触发履约)         String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId;         boolean lock = redisLock.lock(key);         if (!lock) {             throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_ERROR);         }          try {             //1.幂等:校验orderId是否已经履约过             if (orderFulfilled(request.getOrderId())) {                 log.info("该订单已履约!!!,orderId={}", request.getOrderId());                 return true;             }              //2.saga状态机,触发wms捡货和tms发货             StateMachineEngine stateMachineEngine = (StateMachineEngine) springApplicationContext.getBean("stateMachineEngine");             Map<String, Object> startParams = new HashMap<>(3);             startParams.put("receiveFulfillRequest", request);              //配置的saga状态机 json的name             //位于/resources/statelang/order_fulfull.json             String stateMachineName = "order_fulfill";             log.info("开始触发saga流程,stateMachineName={}", stateMachineName);             StateMachineInstance inst = stateMachineEngine.startWithBusinessKey(stateMachineName, null, null, startParams);             if (ExecutionStatus.SU.equals(inst.getStatus())) {                 log.info("订单履约流程执行完毕. xid={}", inst.getId());             } else {                 log.error("订单履约流程执行异常. xid={}", inst.getId());                 throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_IS_ERROR);             }             return true;         } finally {             redisLock.unlock(key);         }     }     ... }

 

3.支付回调到推送履约的双异步设计

(1)双异步设计的原因

(2)履约系统通过MQ对订单进行异步履约

(3)双异步设计的目的总结

 

(1)双异步设计的原因

第一个异步:订单支付回调后,订单系统先发一个支付完成的消息到MQ。

 

第二个异步:订单系统消费到支付完成消息后进行更新,再发消息到MQ。

 

为什么需要双异步:为什么订单系统收到支付回调后要先发一个消息到MQ,然后再由自己监听消费,并发送真正触发履约的消息到MQ。

 

由于更新订单状态和更新订单履约状态都是由订单系统执行的,为什么不按如下处理:订单系统收到支付回调后,先把订单状态更新为已完成支付,然后紧接着把订单履约状态更新为已履约,最后再发送订单履约消息到MQ让履约系统消费该消息进行发货。

 

原因一:从业务语义来说,支付回调后的主要操作其实是更新订单状态为支付完成,而对订单进行履约并不属于支付回调的主要工作。支付回调最多是触发履约系统进行订单履约,所以支付回调成功后,在支付回调中更新订单履约状态并不合适。因此从业务语义来说,支付回调后发送一个订单支付完成的消息到MQ即可,不应有更多其他操作。

 

原因二:从扩展性角度来说,由于订单支付完成是一个非常关键的消息,所以可能以后会在订单支付完成后,需要进行更多的业务处理。也就是说,可能以后会有更多系统需要监听和消费订单支付完成消息,如会员系统、营销系统、数据分析系统等都需要消费订单支付完成的消息。

 

(2)履约系统通过MQ对订单进行异步履约

因为对订单进行履约,涉及到仓库、仓储、库存的调度,需要通知物流公司进行配送等,这个过程非常耗时及复杂,所以触发履约系统对订单进行履约不能使用同步来实现。

订单初版—7.支付和履约实现的重构文档

(3)双异步设计的目的总结

第一个异步是为了可扩展

第二个异步是为了提升性能

 

4.发送订单履约的RocketMQ事务消息的原理

(1)如何让更新数据库 + 发送消息到MQ是一致的

(2)发送订单履约的RocketMQ事务消息的原理

 

(1)如何让更新数据库 + 发送消息到MQ是一致的

问题一:如何保证更新订单状态为已完成 + 发送订单支付完成消息是一致的。也就是更新数据库 + 发送消息到MQ,要么同时成功,要么同时失败。

 

问题二:如何保证更新订单履约状态为已履约 + 发送触发订单履约消息是一致的。同样是更新数据库 + 发送消息到MQ,要么同时成功,要么同时失败。

 

为了解决更新数据库 + 发送消息到MQ是一致的,可以使用RocketMQ的事务机制。

 

(2)发送订单履约的RocketMQ事务消息的原理

订单初版—7.支付和履约实现的重构文档

 

5.RocketMQ事务消息的实现原理

订单初版—7.支付和履约实现的重构文档

 

6.支付回调到发送履约的RocketMQ事务消息代码

(1)支付回调成功后发送支付完成的事务消息

(2)消费订单支付完成消息时发送触发订单履约的事务消息

 

(1)支付回调成功后发送支付完成的事务消息

@Service public class OrderServiceImpl implements OrderService {     ...     //支付回调     //支付回调有2把分布式锁的原因说明:同一笔订单在同一时间只能支付or取消     //不可以同时对一笔订单,既发起支付,又发起取消     @Override     public void payCallback(PayCallbackRequest payCallbackRequest) {         ...         try {             //从数据库中查询出当前订单信息             OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);             OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId);              //3.校验参数             if (orderInfoDO == null || orderPaymentDetailDO == null) {                 throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL);             }             if (!payAmount.equals(orderInfoDO.getPayAmount())) {                 throw new OrderBizException(OrderErrorCodeEnum.ORDER_CALLBACK_PAY_AMOUNT_ERROR);             }              //4.异常场景判断             Integer orderStatus = orderInfoDO.getOrderStatus();             if (OrderStatusEnum.CREATED.getCode().equals(orderStatus)) {                 //如果订单状态是"已创建",直接更新订单状态为已支付,并发送事务消息                 TransactionMQProducer transactionMQProducer = defaultProducer.getProducer();                 transactionMQProducer.setTransactionListener(new TransactionListener() {                     @Override                     public LocalTransactionState executeLocalTransaction(Message message, Object o) {                         try {                             orderManager.updateOrderStatusPaid(payCallbackRequest, orderInfoDO, orderPaymentDetailDO);                             return LocalTransactionState.COMMIT_MESSAGE;                         } catch (BaseBizException e) {                             throw e;                         } catch (Exception e) {                             log.error("system error", e);                             return LocalTransactionState.ROLLBACK_MESSAGE;                         }                     }                      //回查                     @Override                     public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {                         //检查订单是否是已支付                         OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);                         if (orderInfoDO != null && OrderStatusEnum.PAID.getCode().equals(orderInfoDO.getOrderStatus())) {                             return LocalTransactionState.COMMIT_MESSAGE;                         }                         return LocalTransactionState.ROLLBACK_MESSAGE;                     }                 });                 //发送 "订单已完成支付" 消息                 sendPaidOrderSuccessMessage(transactionMQProducer, orderInfoDO);             } else {                 ...             }         } catch (Exception e) {             throw new OrderBizException(e.getMessage());         } finally {             //释放分布式锁             redisLock.unMultiLock(redisKeyList);         }     }      //发送订单已完成支付消息,触发订单进行履约     private void sendPaidOrderSuccessMessage(TransactionMQProducer transactionMQProducer, OrderInfoDO orderInfoDO) throws MQClientException {         String orderId = orderInfoDO.getOrderId();         PaidOrderSuccessMessage message = new PaidOrderSuccessMessage();         message.setOrderId(orderId);         String topic = RocketMqConstant.PAID_ORDER_SUCCESS_TOPIC;         byte[] body = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8);         Message mq = new Message(topic, body);         TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(mq, orderInfoDO);         if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {             throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_SEND_MQ_ERROR);         }     }     ... }

(2)消费订单支付完成消息时发送触发订单履约的事务消息

//订单系统监听订单支付完成后的消息 @Component public class PaidOrderSuccessListener implements MessageListenerConcurrently {     @Autowired     private OrderInfoDAO orderInfoDAO;      @Autowired     private OrderFulFillService orderFulFillService;      @Autowired     private RedisLock redisLock;      @Autowired     private DefaultProducer defaultProducer;      @Override     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {         try {             for (MessageExt messageExt : list) {                 String message = new String(messageExt.getBody());                 PaidOrderSuccessMessage paidOrderSuccessMessage = JSON.parseObject(message, PaidOrderSuccessMessage.class);                 String orderId = paidOrderSuccessMessage.getOrderId();                 log.info("触发订单履约,orderId:{}", orderId);                  OrderInfoDO order = orderInfoDAO.getByOrderId(orderId);                 if (Objects.isNull(order)) {                     throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL);                 }                  //1.加分布式锁 + 里面的履约前置状态校验防止消息重复消费                 String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId;                 if (!redisLock.lock(key)) {                     log.error("order has not acquired lock,cannot fulfill, orderId={}", orderId);                     throw new BaseBizException(OrderErrorCodeEnum.ORDER_FULFILL_ERROR);                 }                                try {                     //2.进行订单履约逻辑                     TransactionMQProducer producer = defaultProducer.getProducer();                     producer.setTransactionListener(new TransactionListener() {                         @Override                         public LocalTransactionState executeLocalTransaction(Message message, Object o) {                             try {                                 orderFulFillService.triggerOrderFulFill(orderId);                                 return LocalTransactionState.COMMIT_MESSAGE;                             } catch (BaseBizException e) {                                 throw e;                             } catch (Exception e) {                                 log.error("system error", e);                                 return LocalTransactionState.ROLLBACK_MESSAGE;                             }                         }                          @Override                         public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {                             //检查订单是否"已履约"状态                             OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);                             if (orderInfoDO != null && OrderStatusEnum.FULFILL.getCode().equals(orderInfoDO.getOrderStatus())) {                                 return LocalTransactionState.COMMIT_MESSAGE;                             }                             return LocalTransactionState.ROLLBACK_MESSAGE;                         }                     });                      ReceiveFulfillRequest receiveFulfillRequest = orderFulFillService.buildReceiveFulFillRequest(order);                     String topic = TRIGGER_ORDER_FULFILL_TOPIC;                     byte[] body = JSON.toJSONString(receiveFulfillRequest).getBytes(StandardCharsets.UTF_8);                     Message mq = new Message(topic, body);                     producer.sendMessageInTransaction(mq, order);                 } finally {                     redisLock.unlock(key);                 }             }             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;         } catch (Exception e) {             log.error("consumer error", e);             //本地业务逻辑执行失败,触发消息重新消费             return ConsumeConcurrentlyStatus.RECONSUME_LATER;         }     } }

 

7.履约场景引入Saga模式 + 状态机

(1)进行订单履约时使用Saga模式的流程简介

(2)Saga模式的流程定义文件和状态机

(3)Saga模式需要自定义补偿逻辑

(4)Saga模式的优点和缺点及适用场景

 

(1)进行订单履约时使用Saga模式的流程简介

Saga模式运行时,会按照一个顺序流程去运行长事务。比如第一步更新订单履约数据,第二步调度发货,第三步物流配送。如果运行到某一步时出现异常,则会从这一步开始往回顺序执行补偿逻辑。比如先补偿第三步,再补偿第二步,最后补偿第一步。

订单初版—7.支付和履约实现的重构文档

(2)Saga模式的流程定义文件和状态机

使用Saga模式的分布式事务在启动时,需要有一个Saga模式的流程定义文件。比如履约系统需要有一个本地文件:Saga模式的流程定义文件。

 

在该文件中,需定义好整个长事务的流程:第一步执行的逻辑是A,第二步执行的逻辑是B,第三步执行的逻辑是C。第一步回滚的逻辑是a,第二步回滚的逻辑是b,第三步回滚的逻辑是c。

 

Saga模式启动时,也需要向Seata Server注册全局事务XID。Saga模式启动后,便会根据状态机来判断流程定义文件中的每一步,应该执行正向还是执行逆向,也就是会通过状态机来控制正向和逆向。

订单初版—7.支付和履约实现的重构文档

(3)Saga模式需要自定义补偿逻辑

一.AT模式下的补偿逻辑

分支事务写入本地事务的数据前,会先生成undo log。当某分支事务出现异常,已提交分支事务需要进行回滚补偿时,会根据生成的undo log来进行回滚补偿。

 

二.TCC模式下的补偿逻辑

先执行try逻辑,如果try逻辑执行成功则执行commit逻辑来进行提交,如果try逻辑执行失败则执行cancel逻辑进行回滚补偿。

 

三.Saga模式的补偿逻辑

每个分支事务都有正向的逻辑,每一个正向逻辑都会搭配一个逆向补偿逻辑,Saga模式需要我们自定义实现这些逆向补偿逻辑。

 

(4)Saga模式的优点和缺点及适用场景

优点是:执行正向链路的分支事务时,不再需要获取全局锁。缺点是:由于没有全局锁,当多个分布式事务并发执行时,不能写隔离。Saga模式的应用场景是:一个业务链路里,需要很多系统层层调用。

 

一.长事务使用AT模式可能导致持有全局锁时间长

这种长事务如果使用AT模式,由于需要获取全局锁,而长事务链路又过长,从而会导致持有全局锁的时间也过长。即便对某数据的全局锁竞争并不激烈,但也需要过长时间才能释放,所以其并发能力特别低。

 

二.长事务使用TCC模式需要改造工作量较大

这种长事务如果使用TCC模式,则会对代码产生很大的侵入性。因为需要对长事务接口改造成三个接口:try、commit、cancel。而且长事务的业务链路比较长,改造业务链路的各接口工作量过大了。所以常见于异构存储的场景,不太常见于长事务。

 

三.Saga模式特别适用于老系统下的长事务

因此Saga模式适用于长事务的场景,特别是老系统下的长事务。如果不希望对老系统有较大侵入性的代码改造,而且希望稍微修改一点代码就能让老系统支持分布式事务,那么就可以使用Saga模式。因为Saga模式可以独立地对老系统定义补偿逻辑,不需要像TCC模式那样去改造接口,也不用像AT模式那样侵入性添加undo log表。

 

8.履约流程的Seata Saga状态节点定义 + 节点流转

(1)流程定义文件分析

(2)流程定义文件中对应的方法

 

(1)流程定义文件分析

在履约系统的resources/statelang目录下,有一个JSON文件,该JSON文件order_fullfill.json便是Saga模式流程定义文件。

{     "Name": "order_fulfill",     "Comment": "订单履约流程",     "StartState": "FulfillService",     "Version": "1.0.0",     "States": {         //第一个节点         "FulfillService": {             "Type": "ServiceTask",             "ServiceName": "fulfillSagaService",//所在服务             "ServiceMethod": "createFulfillOrder",//正向执行方法             "CompensateState": "CreateFulfillOrderCompensate",//逆向补偿方法             "Next": "ChoiceWmsState",//下一个节点             "Input": [//请求参数                 "$.[receiveFulfillRequest]"             ],             "Output": {//返回结果                 "CreateFulfillOrderResult": "$.#root"             },             "Status": {                 "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU                 "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA                 "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN             },             "Catch": [                 {                     "Exceptions": [                         "java.lang.Throwable"                     ],                     "Next": "CompensationTrigger"//抛异常时执行的下一个节点                 }             ]         },         //状态机,判断节点         "ChoiceWmsState": {             "Type": "Choice",             "Choices": [                 {                     "Expression": "[CreateFulfillOrderResult] == true",                     "Next": "WmsService"//上一个节点返回true,就执行下一个节点                 }             ],             "Default": "Fail"         },         //第二个节点         "WmsService": {             "Type": "ServiceTask",             "ServiceName": "wmsSageService",//所在服务             "ServiceMethod": "pickGoods",//正向执行方法             "CompensateState": "WmsPickGoodsCompensate",//逆向补偿方法             "Next": "ChoiceTmsState",//下一个节点             "Input": [//请求参数                 "$.[receiveFulfillRequest]"             ],             "Output": {//返回结果                 "WmsPickGoodsResult": "$.#root"             },             "Status": {                 "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU                 "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA                 "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN             },             "Catch": [                 {                     "Exceptions": [                       "java.lang.Throwable"                     ],                     "Next": "CompensationTrigger"//抛异常时执行的下一个节点                 }             ]         },         //状态机,判断节点         "ChoiceTmsState": {             "Type": "Choice",             "Choices": [                 {                     "Expression": "[WmsPickGoodsResult] == true",                     "Next": "TmsService"//上一个节点返回true,就执行下一个节点                 }             ],             "Default": "Fail"         },         //第三个节点         "TmsService": {             "Type": "ServiceTask",             "ServiceName": "tmsSagaService",//所在服务             "ServiceMethod": "sendOut",//正向执行方法             "CompensateState": "TmsSendOutCompensate",//逆向补偿方法             "Input": [//请求参数                 "$.[receiveFulfillRequest]"             ],             "Output": {//返回结果                 "TmsSendOutResult": "$.#root"             },             "Status": {                 "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU                 "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA                 "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN             },             "Catch": [                 {                     "Exceptions": [                         "java.lang.Throwable"                     ],                     "Next": "CompensationTrigger"//抛异常时执行的下一个节点                 }             ],             "Next": "Succeed"//下一个节点是Succeed节点         },         //第一个节点的补偿逻辑         "CreateFulfillOrderCompensate": {             "Type": "ServiceTask",             "ServiceName": "fulfillSagaService",             "ServiceMethod": "createFulfillOrderCompensate",             "Input": [                 "$.[receiveFulfillRequest]"             ]         },         //第二个节点的补偿逻辑         "WmsPickGoodsCompensate": {             "Type": "ServiceTask",             "ServiceName": "wmsSageService",             "ServiceMethod": "pickGoodsCompensate",             "Input": [                 "$.[receiveFulfillRequest]"             ]         },         //第三个节点的补偿逻辑         "TmsSendOutCompensate": {             "Type": "ServiceTask",             "ServiceName": "tmsSagaService",             "ServiceMethod": "sendOutCompensate",             "Input": [                 "$.[receiveFulfillRequest]"             ]         },         //补偿触发后的下一个节点         "CompensationTrigger": {             "Type": "CompensationTrigger",             "Next": "Fail"//下一个节点是Fail节点         },         //成功节点         "Succeed": {             "Type": "Succeed"         },         //失败节点         "Fail": {             "Type": "Fail",             "ErrorCode": "500",             "Message": "订单履约异常!!"         }     } }

(2)流程定义文件中对应的方法

需要注意的是:触发调用各节点补偿方法时,都是由履约系统通过本地或者RPC来调用的。

@Service("fulfillSagaService") public class FulfillSagaServiceImpl implements FulfillSagaService {     @Autowired     private FulfillService fulfillService;      @Override     public Boolean createFulfillOrder(ReceiveFulfillRequest request) {         log.info("创建履约单,request={}", JSONObject.toJSONString(request));         String fulfillException = request.getFulfillException();         if (StringUtils.isNotBlank(fulfillException) && fulfillException.equals("true")) {             throw new FulfillBizException("创建履约单异常!");         }         //创建履约单         fulfillService.createFulfillOrder(request);         return true;     }      @Override     public Boolean createFulfillOrderCompensate(ReceiveFulfillRequest request) {         log.info("补偿创建履约单,request={}", JSONObject.toJSONString(request));         //取消履约单         fulfillService.cancelFulfillOrder(request.getOrderId());         log.info("补偿创建履约单结束,request={}", JSONObject.toJSONString(request));         return true;     } }  @Service("wmsSageService") public class WmsSageServiceImpl implements WmsSagaService {     @DubboReference(version = "1.0.0", retries = 0)     private WmsApi wmsApi;      @Override     public Boolean pickGoods(ReceiveFulfillRequest request) {         log.info("捡货,request={}", JSONObject.toJSONString(request));         //调用wms系统进行捡货         JsonResult<PickDTO> jsonResult = wmsApi.pickGoods(buildPickGoodsRequest(request));         log.info("捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));         if (!jsonResult.getSuccess()) {             throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR);         }         return true;     }      @Override     public Boolean pickGoodsCompensate(ReceiveFulfillRequest request) {         log.info("补偿捡货,request={}", JSONObject.toJSONString(request));         //调用wms系统进行捡货         JsonResult<Boolean> jsonResult = wmsApi.cancelPickGoods(request.getOrderId());         log.info("补偿捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));         if (!jsonResult.getSuccess()) {             throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR);         }         return true;     }     ... }  @Service("tmsSagaService") public class TmsSagaServiceImpl implements TmsSagaService {     @DubboReference(version = "1.0.0", retries = 0)     private TmsApi tmsApi;      @Autowired     private OrderFulfillDAO orderFulfillDAO;      @Override     public Boolean sendOut(ReceiveFulfillRequest request) {         log.info("发货,request={}", JSONObject.toJSONString(request));          //1.调用tms进行发货         JsonResult<SendOutDTO> jsonResult = tmsApi.sendOut(buildSendOutRequest(request));         if (!jsonResult.getSuccess()) {             throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR);         }         log.info("发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));         if (!jsonResult.getSuccess()) {             throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR);         }          //2.查询履约单         OrderFulfillDO orderFulfill = orderFulfillDAO.getOne(request.getOrderId());          //3.存储物流单号         String logisticsCode = jsonResult.getData().getLogisticsCode();         orderFulfillDAO.saveLogisticsCode(orderFulfill.getFulfillId(), logisticsCode);         return true;     }      @Override     public Boolean sendOutCompensate(ReceiveFulfillRequest request) {         log.info("补偿发货,request={}", JSONObject.toJSONString(request));          //调用tms进行补偿发货         JsonResult<Boolean> jsonResult = tmsApi.cancelSendOut(request.getOrderId());         if (!jsonResult.getSuccess()) {             throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR);         }         log.info("补偿发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));          if (!jsonResult.getSuccess()) {             throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR);         }         return true;     }     ... }

 

9.履约流程的Seata Saga失败补偿流程

订单初版—7.支付和履约实现的重构文档

 

10.Seata Saga状态机流转原理

订单初版—7.支付和履约实现的重构文档

 

11.履约系统的Seata Saga代码实现

(1)进行流程定义

(2)配置状态机

(3)实现正向逆向方法

(4)驱动Saga状态机开始执行长事务

 

(1)进行流程定义

{     "Name": "order_fulfill",     "Comment": "订单履约流程",     "StartState": "FulfillService",     "Version": "1.0.0",     "States": {         //第一个节点         "FulfillService": {             "Type": "ServiceTask",             "ServiceName": "fulfillSagaService",//所在服务             "ServiceMethod": "createFulfillOrder",//正向执行方法             "CompensateState": "CreateFulfillOrderCompensate",//逆向补偿方法             "Next": "ChoiceWmsState",//下一个节点             "Input": [//请求参数                 "$.[receiveFulfillRequest]"             ],             "Output": {//返回结果                 "CreateFulfillOrderResult": "$.#root"             },             "Status": {                 "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU                 "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA                 "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN             },             "Catch": [                 {                     "Exceptions": [                         "java.lang.Throwable"                     ],                     "Next": "CompensationTrigger"//抛异常时执行的下一个节点                 }             ]         },         //状态机,判断节点         "ChoiceWmsState": {             "Type": "Choice",             "Choices": [                 {                     "Expression": "[CreateFulfillOrderResult] == true",                     "Next": "WmsService"//上一个节点返回true,就执行下一个节点                 }             ],             "Default": "Fail"         },         //第二个节点         "WmsService": {             "Type": "ServiceTask",             "ServiceName": "wmsSageService",//所在服务             "ServiceMethod": "pickGoods",//正向执行方法             "CompensateState": "WmsPickGoodsCompensate",//逆向补偿方法             "Next": "ChoiceTmsState",//下一个节点             "Input": [//请求参数                 "$.[receiveFulfillRequest]"             ],             "Output": {//返回结果                 "WmsPickGoodsResult": "$.#root"             },             "Status": {                 "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU                 "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA                 "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN             },             "Catch": [                 {                     "Exceptions": [                         "java.lang.Throwable"                     ],                     "Next": "CompensationTrigger"//抛异常时执行的下一个节点                 }             ]         },         //状态机,判断节点         "ChoiceTmsState": {             "Type": "Choice",             "Choices": [                 {                     "Expression": "[WmsPickGoodsResult] == true",                     "Next": "TmsService"//上一个节点返回true,就执行下一个节点                 }             ],             "Default": "Fail"         },         //第三个节点         "TmsService": {             "Type": "ServiceTask",             "ServiceName": "tmsSagaService",//所在服务             "ServiceMethod": "sendOut",//正向执行方法             "CompensateState": "TmsSendOutCompensate",//逆向补偿方法             "Input": [//请求参数                 "$.[receiveFulfillRequest]"             ],             "Output": {//返回结果                 "TmsSendOutResult": "$.#root"             },             "Status": {                 "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU                 "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA                 "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN             },             "Catch": [                 {                     "Exceptions": [                         "java.lang.Throwable"                     ],                     "Next": "CompensationTrigger"//抛异常时执行的下一个节点                 }             ],             "Next": "Succeed"//下一个节点是Succeed节点         },         //第一个节点的补偿逻辑         "CreateFulfillOrderCompensate": {             "Type": "ServiceTask",             "ServiceName": "fulfillSagaService",             "ServiceMethod": "createFulfillOrderCompensate",             "Input": [                 "$.[receiveFulfillRequest]"             ]         },         //第二个节点的补偿逻辑         "WmsPickGoodsCompensate": {             "Type": "ServiceTask",             "ServiceName": "wmsSageService",             "ServiceMethod": "pickGoodsCompensate",             "Input": [                 "$.[receiveFulfillRequest]"             ]         },         //第三个节点的补偿逻辑         "TmsSendOutCompensate": {             "Type": "ServiceTask",             "ServiceName": "tmsSagaService",             "ServiceMethod": "sendOutCompensate",             "Input": [                 "$.[receiveFulfillRequest]"             ]         },         //补偿触发后的下一个节点         "CompensationTrigger": {             "Type": "CompensationTrigger",             "Next": "Fail"//下一个节点是Fail节点         },         //成功节点         "Succeed": {             "Type": "Succeed"         },         //失败节点         "Fail": {             "Type": "Fail",             "ErrorCode": "500",             "Message": "订单履约异常!!"         }     } }

(2)配置状态机

//配置数据源 @Configuration public class DataSourceConfiguration {     @ConfigurationProperties(prefix = "spring.datasource")     @Bean     public DruidDataSource druidDataSource() {         return new DruidDataSource();     }      @Bean(name = "transactionManager")     @Primary     public DataSourceTransactionManager transactionManager(@Qualifier("druidDataSource") DruidDataSource druidDataSource) {         return new DataSourceTransactionManager(druidDataSource);     } }  //配置Saga状态机 @Configuration public class StateMachineConfiguration {     @Bean     public ThreadPoolExecutorFactoryBean threadExecutor() {         ThreadPoolExecutorFactoryBean threadExecutor = new ThreadPoolExecutorFactoryBean();         threadExecutor.setThreadNamePrefix("SAGA_ASYNC_EXE_");         threadExecutor.setCorePoolSize(1);         threadExecutor.setMaxPoolSize(20);         return threadExecutor;     }      @Bean     public DbStateMachineConfig dbStateMachineConfig(ThreadPoolExecutorFactoryBean threadExecutor, DruidDataSource druidDataSource) throws IOException {         DbStateMachineConfig dbStateMachineConfig = new DbStateMachineConfig();         //设置数据源         dbStateMachineConfig.setDataSource(druidDataSource);         //设置状态机的线程池         dbStateMachineConfig.setThreadPoolExecutor((ThreadPoolExecutor) threadExecutor.getObject());         //设置状态机的配置文件         dbStateMachineConfig.setResources(new PathMatchingResourcePatternResolver().getResources("classpath*:statelang/*.json"));         //设置开启异步化         dbStateMachineConfig.setEnableAsync(true);         //设置当前Saga长事务所属的分组         dbStateMachineConfig.setTxServiceGroup("demo-eshop-fulfill-group");         return dbStateMachineConfig;     }      //Saga状态机实例     @Bean     public ProcessCtrlStateMachineEngine stateMachineEngine(DbStateMachineConfig dbStateMachineConfig) {         ProcessCtrlStateMachineEngine stateMachineEngine = new ProcessCtrlStateMachineEngine();         stateMachineEngine.setStateMachineConfig(dbStateMachineConfig);         return stateMachineEngine;     }      @Bean     public StateMachineEngineHolder stateMachineEngineHolder(ProcessCtrlStateMachineEngine stateMachineEngine) {         StateMachineEngineHolder stateMachineEngineHolder = new StateMachineEngineHolder();         stateMachineEngineHolder.setStateMachineEngine(stateMachineEngine);         return stateMachineEngineHolder;     } }

(3)实现正向逆向方法

@Service("fulfillSagaService") public class FulfillSagaServiceImpl implements FulfillSagaService {     @Autowired     private FulfillService fulfillService;      @Override     public Boolean createFulfillOrder(ReceiveFulfillRequest request) {         log.info("创建履约单,request={}", JSONObject.toJSONString(request));         String fulfillException = request.getFulfillException();         if (StringUtils.isNotBlank(fulfillException) && fulfillException.equals("true")) {             throw new FulfillBizException("创建履约单异常!");         }         //创建履约单         fulfillService.createFulfillOrder(request);         return true;     }      @Override     public Boolean createFulfillOrderCompensate(ReceiveFulfillRequest request) {         log.info("补偿创建履约单,request={}", JSONObject.toJSONString(request));         //取消履约单         fulfillService.cancelFulfillOrder(request.getOrderId());         log.info("补偿创建履约单结束,request={}", JSONObject.toJSONString(request));         return true;     } }  @Service("wmsSageService") public class WmsSageServiceImpl implements WmsSagaService {     @DubboReference(version = "1.0.0", retries = 0)     private WmsApi wmsApi;      @Override     public Boolean pickGoods(ReceiveFulfillRequest request) {         log.info("捡货,request={}", JSONObject.toJSONString(request));         //调用wms系统进行捡货         JsonResult<PickDTO> jsonResult = wmsApi.pickGoods(buildPickGoodsRequest(request));         log.info("捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));         if (!jsonResult.getSuccess()) {             throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR);         }         return true;     }      @Override     public Boolean pickGoodsCompensate(ReceiveFulfillRequest request) {         log.info("补偿捡货,request={}", JSONObject.toJSONString(request));         //调用wms系统进行捡货         JsonResult<Boolean> jsonResult = wmsApi.cancelPickGoods(request.getOrderId());         log.info("补偿捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));         if (!jsonResult.getSuccess()) {             throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR);         }         return true;     }     ... }  @Service("tmsSagaService") public class TmsSagaServiceImpl implements TmsSagaService {     @DubboReference(version = "1.0.0", retries = 0)     private TmsApi tmsApi;      @Autowired     private OrderFulfillDAO orderFulfillDAO;      @Override     public Boolean sendOut(ReceiveFulfillRequest request) {         log.info("发货,request={}", JSONObject.toJSONString(request));         //1.调用tms进行发货         JsonResult<SendOutDTO> jsonResult = tmsApi.sendOut(buildSendOutRequest(request));         if (!jsonResult.getSuccess()) {             throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR);         }         log.info("发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));         if (!jsonResult.getSuccess()) {             throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR);         }         //2.查询履约单         OrderFulfillDO orderFulfill = orderFulfillDAO.getOne(request.getOrderId());         //3.存储物流单号         String logisticsCode = jsonResult.getData().getLogisticsCode();         orderFulfillDAO.saveLogisticsCode(orderFulfill.getFulfillId(), logisticsCode);         return true;     }      @Override     public Boolean sendOutCompensate(ReceiveFulfillRequest request) {         log.info("补偿发货,request={}", JSONObject.toJSONString(request));         //调用tms进行补偿发货         JsonResult<Boolean> jsonResult = tmsApi.cancelSendOut(request.getOrderId());         if (!jsonResult.getSuccess()) {             throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR);         }         log.info("补偿发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));         if (!jsonResult.getSuccess()) {             throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR);         }         return true;     }     ... }

(4)驱动Saga状态机开始执行长事务

//监听并消费订单履约消息 @Component public class TriggerOrderFulfillTopicListener implements MessageListenerConcurrently {     @Autowired     private FulfillService fulfillService;//履约服务      @Override     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {         try {             for (MessageExt messageExt : list) {                 String message = new String(messageExt.getBody());                 ReceiveFulfillRequest request = JSON.parseObject(message, ReceiveFulfillRequest.class);                 fulfillService.receiveOrderFulFill(request);             }             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;         } catch (Exception e) {             log.error("consumer error", e);             return ConsumeConcurrentlyStatus.RECONSUME_LATER;         }     } }  @Service public class FulfillServiceImpl implements FulfillService {     ...     @Override     public Boolean receiveOrderFulFill(ReceiveFulfillRequest request) {         log.info("接受订单履约成功,request={}", JSONObject.toJSONString(request));         String orderId = request.getOrderId();          //加分布式锁(防止重复触发履约)         String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId;         boolean lock = redisLock.lock(key);         if (!lock) {             throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_ERROR);         }          try {             //1.幂等:校验orderId是否已经履约过             if (orderFulfilled(request.getOrderId())) {                 log.info("该订单已履约!!!,orderId={}", request.getOrderId());                 return true;             }              //2.获取Saga状态机,触发wms捡货和tms发货             StateMachineEngine stateMachineEngine = (StateMachineEngine) springApplicationContext.getBean("stateMachineEngine");             Map<String, Object> startParams = new HashMap<>(3);             startParams.put("receiveFulfillRequest", request);              //配置的Saga状态机json的name,位于/resources/statelang/order_fulfull.json             String stateMachineName = "order_fulfill";             log.info("开始触发saga流程,stateMachineName={}", stateMachineName);              //通过状态机启动长事务流程             StateMachineInstance inst = stateMachineEngine.startWithBusinessKey(stateMachineName, null, null, startParams);             if (ExecutionStatus.SU.equals(inst.getStatus())) {                 log.info("订单履约流程执行完毕. xid={}", inst.getId());             } else {                 log.error("订单履约流程执行异常. xid={}", inst.getId());                 throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_IS_ERROR);             }             return true;         } finally {             redisLock.unlock(key);         }     }     ... }

 

12.履约系统的Seata Saga空回滚和悬挂问题

(1)空回滚问题

(2)空悬挂问题

 

(1)空回滚问题

正向操作没有执行成功,但也进行了逆向操作。解决方案是:通过自定义Holder把正向操作记录下来,如果发现空回滚,则进行记录,且不能进行回滚。

 

(2)空悬挂问题

逆向操作先执行,之后才执行正向操作。解决方案是:通过自定义Holder把正向操作记录下来,确保执行逆向操作时,已经执行过正向操作。

 

Saga的空回滚和空悬挂与TCC的解决思路一样。

 

举报
发表评论

评论已关闭。

相关文章

当前内容话题
  • 0