订单初版—6.生单链路实现的重构文档

大纲

1.库存服务的数据库与缓存双写的实现

2.如何发起异构存储下的Seata TCC分布式事务

3.异构存储下的Seata TCC分布式事务原理

4.生单链路锁定库存的Seata TCC分布式事务实现

5.库存服务异构存储双写TCC异常处理

6.库存服务TCC事务的空悬挂问题

7.库存服务TCC二阶段重试的幂等问题

8.假设使用异步锁库存方案可能会导致的几种问题

9.生单链路的AT + TCC混合事务方案流程总结

10.生单链路非分布式事务的纯补偿方案

 

1.库存服务的数据库与缓存双写的实现

(1)生单链路使用Seata AT分布式事务的原理流程图

(2)生单链路Seata AT模式下的并发问题分析

(3)生单链路如何解决库存全局锁争用问题

(4)库存服务双写数据库 + 缓存的实现

 

(1)生单链路使用Seata AT分布式事务的原理流程图

库存服务写库存时,通常都会进行数据库 + 缓存的双写处理。写缓存是库存服务写库存的一个常规性操作,因为需要支撑高并发的库存扣减。

订单初版—6.生单链路实现的重构文档

(2)生单链路Seata AT模式下的并发问题分析

生单链路中的分布式事务环节在于:锁定优惠券 + 锁定库存 + 生成订单。

 

一.在锁定优惠券环节

每个用户都会有属于自己的优惠券。日常情况下,都是不同的用户使用不同的优惠券购买商品,所以并不会出现并发获取同一优惠券数据的全局锁的情况。

 

二.在锁定库存环节

对于爆品或秒杀,大量用户可能都会基于某商品进行下单扣减库存,因此会出现并发获取同一个SKU数据的全局锁。

 

第一个获取到某SKU数据的全局锁的事务,在进行生成订单环节由于需要插入多条SQL,所以可能会比较耗时,从而导致并发等待获取该SKU数据的全局锁的其他事务等待时间过长。

 

(3)生单链路如何解决库存全局锁争用问题

一.问题分析

一个商品SKU就对应一条库存数据记录。如果大量用户同时购买一个商品SKU,必然导致:多个分布式事务都去竞争和等待同一个SKU库存数据的全局锁。

 

二.解决方案

方案一:库存分桶方案

例如对库存表进行库存分桶。一般一个SKU就一条库存数据,该方案下一个SKU会有多条库存数据。比如1万的库存可分为1000条库存数据,每条库存数据可扣库存为10。每次扣减库存时,按照一定的规则和算法,选择一个库存分桶去扣减。

 

方案二:RocketMQ柔性事务方案

通过RocketMQ柔性事务方案来替换掉Seata刚性事务方案。在互联网公司里,一般的业务系统,都是使用RocketMQ柔性事务。大多情况下,RocketMQ柔性事务都能确保数据是一致的。

 

刚性事务:分支事务出现异常或者失败,则全局回滚。柔性事务:分支事务出现异常或者失败,则不断重试消费,直到成功。使用RocketMQ柔性事务方案,需要确保消息能被投递到RocketMQ。

 

方案三:使用没有全局锁的分布式事务方案

Seata支持AT、TCC、Saga、XA这几种事务方案。生单链路的建议是:锁定营销使用AT模式 + 锁定库存使用TCC模式的混合分布式事务方案。

 

三.生单链路中锁库存的技术方案重构

为了提升生单链路的性能,避免扣减库存时出现大量的全局锁争用。锁定库存使用Seata的TCC模式,纳入到全局事务中。而且为了让写库存时双写数据库 + 缓存的数据一致性,也可以用TCC模式实现。

 

四.存在异构存储的服务解决数据一致性问题的方案

使用Seata分布式事务的TCC模式。

 

(4)库存服务双写数据库 + 缓存的实现

@RestController @RequestMapping("/inventory") public class InventoryController {     @Autowired     private InventoryService inventoryService;      //新增商品库存     @PostMapping("/addProductStock")     public JsonResult<Boolean> addProductStock(@RequestBody AddProductStockRequest request) {         inventoryService.addProductStock(request);         return JsonResult.buildSuccess(true);     }     ... }  @Service public class InventoryServiceImpl implements InventoryService {     ...     @Override     public Boolean addProductStock(AddProductStockRequest request) {         log.info("新增商品库存:request={}", JSONObject.toJSONString(request));         //1.校验入参         checkAddProductStockRequest(request);          //2.查询商品库存         ProductStockDO productStock = productStockDAO.getBySkuCode(request.getSkuCode());         ParamCheckUtil.checkObjectNull(productStock, InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_EXISTED_ERROR);          //3.添加Redis锁,防并发         String lockKey = RedisLockKeyConstants.ADD_PRODUCT_STOCK_KEY + request.getSkuCode();         Boolean locked = redisLock.lock(lockKey);         if (!locked) {             throw new InventoryBizException(InventoryErrorCodeEnum.ADD_PRODUCT_SKU_STOCK_ERROR);         }          try {             //4.执行添加商品库存逻辑             addProductStockProcessor.doAdd(request);         } finally {             //5.解锁             redisLock.unlock(lockKey);         }         return true;     }     ... }  @Component public class AddProductStockProcessor {     @Autowired     private RedisCache redisCache;      @Autowired     private ProductStockDAO productStockDAO;      //执行添加商品库存逻辑     @Transactional(rollbackFor = Exception.class)     public void doAdd(AddProductStockRequest request) {         //1.构造商品库存         ProductStockDO productStock = buildProductStock(request);         //2.保存商品库存到MySQL         productStockDAO.save(productStock);         //3.保存商品库存到Redis         addStockToRedis(productStock);     }      //保存商品库存到Redis     public void addStockToRedis(ProductStockDO productStock) {         String productStockKey = CacheSupport.buildProductStockKey(productStock.getSkuCode());         Map<String, String> productStockValue = CacheSupport.buildProductStockValue(productStock.getSaleStockQuantity(), productStock.getSaledStockQuantity());         redisCache.hPutAll(productStockKey, productStockValue);     }      private ProductStockDO buildProductStock(AddProductStockRequest request) {         ProductStockDO productStock = new ProductStockDO();         productStock.setSkuCode(request.getSkuCode());         productStock.setSaleStockQuantity(request.getSaleStockQuantity());         productStock.setSaledStockQuantity(0L);         return productStock;     } }  public interface CacheSupport {     String PREFIX_PRODUCT_STOCK = "PRODUCT_STOCK:";     //可销售库存key     String SALE_STOCK = "saleStock";      //已销售库存key     String SALED_STOCK = "saledStock";      //构造缓存商品库存key     static String buildProductStockKey(String skuCode) {         return PREFIX_PRODUCT_STOCK + ":" + skuCode;     }      //构造缓存商品库存value     static Map<String, String> buildProductStockValue(Long saleStockQuantity, Long saledStockQuantity) {         Map<String, String> value = new HashMap<>();         value.put(SALE_STOCK, String.valueOf(saleStockQuantity));         value.put(SALED_STOCK, String.valueOf(saledStockQuantity));         return value;     } }

 

2.如何发起异构存储下的Seata TCC分布式事务

一.在双写数据库 + 缓存的入口添加Seata的@GlobalTransactional注解。

二.在写数据库接口和写缓存接口上添加Seata的@TwoPhaseBusinessAction注解。

三.在提供双写数据库 + 库存接口的服务上添加Seata的@LocalTCC注解。

订单初版—6.生单链路实现的重构文档

 

3.异构存储下的Seata TCC分布式事务原理

(1)TCC的核心逻辑

(2)TCC的事务流程

 

(1)TCC的核心逻辑

TCC的核心逻辑就是:try、commit、cancel。

 

一.try

会预留一些资源,但实际的动作并没有执行,当然实际应用比如写库中也可以直接执行实际的动作即提交SQL。

 

二.commit

分支事务执行try都成功后,就会让所有分支事务都执行commit,commit会执行实际的动作。

 

三.cancel

如果存在分支事务try失败了,那么所有分支事务都要执行cancel。执行cancel时会对预留的资源进行逆向补偿,取消资源预留。

 

(2)TCC的事务流程

订单初版—6.生单链路实现的重构文档

 

4.生单链路锁定库存的Seata TCC分布式事务实现

(1)生单链路中生成订单到扣减库存的实现

(2)库存服务中扣减库存的双写数据库 + 缓存实现

 

(1)生单链路中生成订单到扣减库存的实现

createOrder()时锁定优惠券 + 生成订单到数据库会使用Seata的AT模式来实现分布式事务。

@DubboService(version = "1.0.0", interfaceClass = OrderApi.class, retries = 0) public class OrderApiImpl implements OrderApi {     @Autowired     private OrderService orderService;     ...      //提交订单/生成订单接口     @Override     public JsonResult<CreateOrderDTO> createOrder(CreateOrderRequest createOrderRequest) {         try {             CreateOrderDTO createOrderDTO = orderService.createOrder(createOrderRequest);             return JsonResult.buildSuccess(createOrderDTO);         } catch (OrderBizException e) {             log.error("biz error", e);             return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());         } catch (Exception e) {             log.error("system error", e);             return JsonResult.buildError(e.getMessage());         }     }     ... }  @Service public class OrderServiceImpl implements OrderService {     @Autowired     private OrderManager orderManager;     ...      //提交订单/生成订单接口     @Override     public CreateOrderDTO createOrder(CreateOrderRequest createOrderRequest) {         //1.入参检查         checkCreateOrderRequestParam(createOrderRequest);         //2.风控检查         checkRisk(createOrderRequest);         //3.获取商品信息         List<ProductSkuDTO> productSkuList = listProductSkus(createOrderRequest);         //4.计算订单价格         CalculateOrderAmountDTO calculateOrderAmountDTO = calculateOrderAmount(createOrderRequest, productSkuList);         //5.验证订单实付金额         checkRealPayAmount(createOrderRequest, calculateOrderAmountDTO);         //6.生成订单(包含锁定优惠券、扣减库存等逻辑)         createOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);         //7.发送订单延迟消息用于支付超时自动关单         sendPayOrderTimeoutDelayMessage(createOrderRequest);         //返回订单信息         CreateOrderDTO createOrderDTO = new CreateOrderDTO();         createOrderDTO.setOrderId(createOrderRequest.getOrderId());          return createOrderDTO;     }      //插入订单到数据库     private void createOrder(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList, CalculateOrderAmountDTO calculateOrderAmountDTO) {         //插入订单到数据库         orderManager.createOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);     }     ... }  @Service public class OrderManagerImpl implements OrderManager {     @DubboReference(version = "1.0.0", retries = 0)     private InventoryApi inventoryApi;//库存服务     ...      //生成订单     //由于锁定优惠券不会出现竞争AT模式下的全局锁,所以锁定优惠券+生成订单可以一起使用Seata的AT模式     //但扣减库存继续使用Seata的AT模式则会出现竞争全局锁,所以扣减库存使用Seata的TCC模式     @Override     @GlobalTransactional(rollbackFor = Exception.class)     public void createOrder(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList, CalculateOrderAmountDTO calculateOrderAmountDTO) {         //锁定优惠券         lockUserCoupon(createOrderRequest);         //扣减库存         deductProductStock(createOrderRequest);         //生成订单到数据库         addNewOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);     }      //锁定商品库存     private void deductProductStock(CreateOrderRequest createOrderRequest) {         String orderId = createOrderRequest.getOrderId();         List<DeductProductStockRequest.OrderItemRequest> orderItemRequestList = ObjectUtil.convertList(createOrderRequest.getOrderItemRequestList(), DeductProductStockRequest.OrderItemRequest.class);         DeductProductStockRequest lockProductStockRequest = new DeductProductStockRequest();         lockProductStockRequest.setOrderId(orderId);         lockProductStockRequest.setOrderItemRequestList(orderItemRequestList);         JsonResult<Boolean> jsonResult = inventoryApi.deductProductStock(lockProductStockRequest);         //检查锁定商品库存结果         if (!jsonResult.getSuccess()) {             throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage());         }     }     ... }  @DubboService(version = "1.0.0", interfaceClass = InventoryApi.class, retries = 0) public class InventoryApiImpl implements InventoryApi {     @Autowired     private InventoryService inventoryService;      //扣减商品库存     @Override     public JsonResult<Boolean> deductProductStock(DeductProductStockRequest deductProductStockRequest) {         try {             Boolean result = inventoryService.deductProductStock(deductProductStockRequest);             return JsonResult.buildSuccess(result);         } catch (InventoryBizException e) {             log.error("biz error", e);             return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());         } catch (Exception e) {             log.error("system error", e);             return JsonResult.buildError(e.getMessage());         }     }     ... }

(2)库存服务中扣减库存的双写数据库 + 缓存实现

doDeduct()扣减库存时双写数据库 + 缓存会使用Seata的TCC模式来实现分布式事务。TCC模式特别适合这种多写异构存储的业务,关键的注解是@LocalTCC和@TwoPhaseBusinessAction。

@Service public class InventoryServiceImpl implements InventoryService {     ...     //扣减商品库存     @Override     public Boolean deductProductStock(DeductProductStockRequest deductProductStockRequest) {         //检查入参         checkLockProductStockRequest(deductProductStockRequest);         String orderId = deductProductStockRequest.getOrderId();         List<DeductProductStockRequest.OrderItemRequest> orderItemRequestList = deductProductStockRequest.getOrderItemRequestList();         for (DeductProductStockRequest.OrderItemRequest orderItemRequest : orderItemRequestList) {             String skuCode = orderItemRequest.getSkuCode();              //1.查询MySQL库存数据             ProductStockDO productStockDO = productStockDAO.getBySkuCode(skuCode);             if (productStockDO == null) {                 log.error("商品库存记录不存在,skuCode={}", skuCode);                 throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_NOT_FOUND_ERROR);             }              //2.查询Redis库存数据             String productStockKey = CacheSupport.buildProductStockKey(skuCode);             Map<String, String> productStockValue = redisCache.hGetAll(productStockKey);             if (productStockValue.isEmpty()) {                 //如果查询不到Redis库存数据,将MySQL库存数据放入Redis,以MySQL的数据为准                 addProductStockProcessor.addStockToRedis(productStockDO);             }              //3.添加Redis锁,防并发             String lockKey = MessageFormat.format(RedisLockKeyConstants.DEDUCT_PRODUCT_STOCK_KEY, orderId, skuCode);             Boolean locked = redisLock.lock(lockKey);             if (!locked) {                 log.error("无法获取扣减库存锁,orderId={},skuCode={}", orderId, skuCode);                 throw new InventoryBizException(InventoryErrorCodeEnum.DEDUCT_PRODUCT_SKU_STOCK_ERROR);             }              try {                 //4.查询库存扣减日志                 ProductStockLogDO productStockLog = productStockLogDAO.getLog(orderId, skuCode);                 if (null != productStockLog) {                     log.info("已扣减过,扣减库存日志已存在,orderId={},skuCode={}", orderId, skuCode);                     return true;                 }                 Integer saleQuantity = orderItemRequest.getSaleQuantity();                 Integer originSaleStock = productStockDO.getSaleStockQuantity().intValue();                 Integer originSaledStock = productStockDO.getSaledStockQuantity().intValue();                  //5.执行执库存扣减                 DeductStockDTO deductStock = new DeductStockDTO(orderId, skuCode, saleQuantity, originSaleStock, originSaledStock);                 deductProductStockProcessor.doDeduct(deductStock);             } finally {                 redisLock.unlock(lockKey);             }         }         return true;     }     ... }  //扣减商品库存处理器 @Component public class DeductProductStockProcessor {     @Autowired     private LockMysqlStockTccService lockMysqlStockTccService;      @Autowired     private LockRedisStockTccService lockRedisStockTccService;      @Autowired     private ProductStockLogDAO productStockLogDAO;      //执行扣减商品库存逻辑,由于createOrder()已经加了@GlobalTransactional注解,这里就不用加了     //@GlobalTransactional(rollbackFor = Exception.class)     public void doDeduct(DeductStockDTO deductStock) {         //1.执行执行MySQL库存扣减,就是执行TCC的try         boolean result = lockMysqlStockTccService.deductStock(null, deductStock);         if (!result) {             throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_NOT_FOUND_ERROR);         }          //2.执行Redis库存扣减,就是执行TCC的try         result = lockRedisStockTccService.deductStock(null, deductStock);         if (!result) {             throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_NOT_FOUND_ERROR);         }     } }  //锁定MySQL库存,Seata TCC模式的Service @LocalTCC public interface LockMysqlStockTccService {     //一阶段方法:扣减销售库存(saleStockQuantity-saleQuantity),这就是TCC中的try     //所有分支事务的try执行成功,则所有事务执行commit方法;     //存在分支事务的try执行失败,则所有事务执行rollback方法;     @TwoPhaseBusinessAction(name = "lockMysqlStockTccService", commitMethod = "commit", rollbackMethod = "rollback")     boolean deductStock(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "deductStock") DeductStockDTO deductStock);      //二阶段方法:增加已销售库存(saledStockQuantity+saleQuantity),这就是TCC中的commit     void commit(BusinessActionContext actionContext);      //回滚:增加销售库存(saleStockQuantity+saleQuantity),这就是TCC中的cancel     void rollback(BusinessActionContext actionContext); }  //锁定Redis库存,Seata TCC模式的Service @LocalTCC public interface LockRedisStockTccService {     //一阶段方法:扣减销售库存(saleStockQuantity-saleQuantity),这就是TCC中的try     //所有分支事务的try执行成功,则所有事务执行commit方法;     //存在分支事务的try执行失败,则所有事务执行rollback方法;     @TwoPhaseBusinessAction(name = "lockRedisStockTccService", commitMethod = "commit", rollbackMethod = "rollback")     boolean deductStock(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "deductStock") DeductStockDTO deductStock);      //二阶段方法:增加已销售库存(saledStockQuantity+saleQuantity),这就是TCC中的commit     void commit(BusinessActionContext actionContext);      //回滚:增加销售库存(saleStockQuantity+saleQuantity),这就是TCC中的cancel     void rollback(BusinessActionContext actionContext); }

其中,try操作可以是预留资源,也可以是直接执行动作(即等于commit)。比如锁库存,会在try操作中把写数据库或写缓存直接处理了。try操作具体是预留资源还是直接执行,往往会根据业务来决定。

@Service public class LockMysqlStockTccServiceImpl implements LockMysqlStockTccService {     ...     @Transactional(rollbackFor = Exception.class)     @Override     public boolean deductStock(BusinessActionContext actionContext, DeductStockDTO deductStock) {         //actionContext上下文获取全局事务xid         String xid = actionContext.getXid();         String skuCode = deductStock.getSkuCode();         Integer saleQuantity = deductStock.getSaleQuantity();         Integer originSaleStock = deductStock.getOriginSaleStock();         Integer originSaledStock = deductStock.getOriginSaledStock();          //标识try阶段开始执行         TccResultHolder.tagTryStart(getClass(), skuCode, xid);          if (isEmptyRollback()) {             return false;         }         log.info("一阶段方法:扣减MySQL销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);          int result = productStockDAO.deductSaleStock(skuCode, saleQuantity, originSaleStock);         //标识try阶段执行成功         if (result > 0) {             TccResultHolder.tagTrySuccess(getClass(), skuCode, xid);         }         return result > 0;     }     ... }  @Service public class LockRedisStockTccServiceImpl implements LockRedisStockTccService {     ...     @Override     public boolean deductStock(BusinessActionContext actionContext, DeductStockDTO deductStock) {         String xid = actionContext.getXid();         String skuCode = deductStock.getSkuCode();         Integer saleQuantity = deductStock.getSaleQuantity();         Integer originSaleStock = deductStock.getOriginSaleStock();         Integer originSaledStock = deductStock.getOriginSaledStock();          //标识try阶段开始执行         TccResultHolder.tagTryStart(getClass(), skuCode, xid);         log.info("一阶段方法:扣减redis销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);         if (isEmptyRollback()) {             return false;         }         String luaScript = LuaScript.DEDUCT_SALE_STOCK;         String saleStockKey = CacheSupport.SALE_STOCK;         String productStockKey = CacheSupport.buildProductStockKey(skuCode);         Long result = redisCache.execute(new DefaultRedisScript<>(luaScript, Long.class),             Arrays.asList(productStockKey, saleStockKey), String.valueOf(saleQuantity), String.valueOf(originSaleStock));          //标识try阶段执行成功         if (result > 0) {             TccResultHolder.tagTrySuccess(getClass(), skuCode, xid);         }         return result > 0;     }     ... }  //存储TCC第一阶段执行结果,用于解决TCC幂等,空回滚,悬挂问题 public class TccResultHolder {     //标识TCC try阶段开始执行的标识     private static final String TRY_START = "TRY_START";      //标识TCC try阶段执行成功的标识     private static final String TRY_SUCCESS = "TRY_SUCCESS";      //保存TCC事务执行过程的状态     private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();     ...      //标记try阶段开始执行     public static void tagTryStart(Class<?> tccClass, String bizKey, String xid) {         setResult(tccClass, bizKey, xid, TRY_START);     }          //标记try阶段执行成功     public static void tagTrySuccess(Class<?> tccClass, String bizKey, String xid) {         setResult(tccClass, bizKey, xid, TRY_SUCCESS);     }      //一个tccClass代表了TCC的一个分支事务     public static void setResult(Class<?> tccClass, String bizKey, String xid, String v) {         Map<String, String> results = map.get(tccClass);         if (results == null) {             synchronized (map) {                 if (results == null) {                     results = new ConcurrentHashMap<>();                     map.put(tccClass, results);                 }             }         }         results.put(getTccExecution(xid, bizKey), v);//保存当前分布式事务id     }     ... }

 

5.库存服务异构存储双写TCC异常处理

只要TCC的分支事务在try过程中出现异常,都需要回滚所有分支事务。

 

如果没有出现异常,则执行commit()方法。如果出现异常,则执行rollback()方法。

//锁定MySQL库存,Seata TCC模式的Service @LocalTCC public interface LockMysqlStockTccService {     //一阶段方法:扣减销售库存(saleStockQuantity-saleQuantity),这就是TCC中的try     //所有分支事务的try执行成功,则所有事务执行commit方法;     //存在分支事务的try执行失败,则所有事务执行rollback方法;     @TwoPhaseBusinessAction(name = "lockMysqlStockTccService", commitMethod = "commit", rollbackMethod = "rollback")     boolean deductStock(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "deductStock") DeductStockDTO deductStock);      //二阶段方法:增加已销售库存(saledStockQuantity+saleQuantity),这就是TCC中的commit     void commit(BusinessActionContext actionContext);      //回滚:增加销售库存(saleStockQuantity+saleQuantity),这就是TCC中的cancel     void rollback(BusinessActionContext actionContext); }  //锁定Redis库存,Seata TCC模式的Service @LocalTCC public interface LockRedisStockTccService {     //一阶段方法:扣减销售库存(saleStockQuantity-saleQuantity),这就是TCC中的try     //所有分支事务的try执行成功,则所有事务执行commit方法;     //存在分支事务的try执行失败,则所有事务执行rollback方法;     @TwoPhaseBusinessAction(name = "lockRedisStockTccService", commitMethod = "commit", rollbackMethod = "rollback")     boolean deductStock(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "deductStock") DeductStockDTO deductStock);      //二阶段方法:增加已销售库存(saledStockQuantity+saleQuantity),这就是TCC中的commit     void commit(BusinessActionContext actionContext);      //回滚:增加销售库存(saleStockQuantity+saleQuantity),这就是TCC中的cancel     void rollback(BusinessActionContext actionContext); }

写数据库的commit()和rollback():

@Service public class LockMysqlStockTccServiceImpl implements LockMysqlStockTccService {     @Autowired     private ProductStockDAO productStockDAO;      @Autowired     private ProductStockLogDAO productStockLogDAO;     ...      @Override     public void commit(BusinessActionContext actionContext) {         String xid = actionContext.getXid();         DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);         String skuCode = deductStock.getSkuCode();         Integer saleQuantity = deductStock.getSaleQuantity();         Integer originSaleStock = deductStock.getOriginSaleStock();         Integer originSaledStock = deductStock.getOriginSaledStock();         log.info("二阶段方法:增加mysql已销售库存,deductStock,xid={}", JSONObject.toJSONString(deductStock), xid);          //幂等         //当出现网络异常或者TC Server异常时,会出现重复调用commit阶段的情况,所以需要进行幂等操作         if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {             return;         }          //1.增加已销售库存         productStockDAO.increaseSaledStock(skuCode, saleQuantity, originSaledStock);          //2.插入一条扣减日志表         log.info("插入一条扣减日志表");         productStockLogDAO.save(buildStockLog(deductStock));          //移除标识         TccResultHolder.removeResult(getClass(), skuCode, xid);     }      @Override     public void rollback(BusinessActionContext actionContext) {         String xid = actionContext.getXid();         DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);         String skuCode = deductStock.getSkuCode();         Integer saleQuantity = deductStock.getSaleQuantity();         Integer originSaleStock = deductStock.getOriginSaleStock();         Integer originSaledStock = deductStock.getOriginSaledStock();         log.info("回滚:增加mysql销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);          //空回滚处理         if (TccResultHolder.isTagNull(getClass(), skuCode, xid)) {             log.info("mysql:出现空回滚");             insertEmptyRollbackTag();             return;         }          //幂等处理         //try阶段没有完成的情况下,不必执行回滚,因为try阶段有本地事务,事务失败时已经进行了回滚         //如果try阶段成功,而其他全局事务参与者失败,这里会执行回滚         if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {             log.info("mysql:无需回滚");             return;         }          //1.还原销售库存         productStockDAO.restoreSaleStock(skuCode, saleQuantity, originSaleStock - saleQuantity);          //2.删除库存扣减日志         ProductStockLogDO logDO = productStockLogDAO.getLog(deductStock.getOrderId(), skuCode);         if (null != logDO) {             productStockLogDAO.removeById(logDO.getId());         }          //移除标识         TccResultHolder.removeResult(getClass(), skuCode, xid);     }     ... }

写缓存的commit()和rollback():

@Service public class LockRedisStockTccServiceImpl implements LockRedisStockTccService {     @Autowired     private RedisCache redisCache;     ...      @Override     public void commit(BusinessActionContext actionContext) {         String xid = actionContext.getXid();         DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);         String skuCode = deductStock.getSkuCode();         Integer saleQuantity = deductStock.getSaleQuantity();         Integer originSaleStock = deductStock.getOriginSaleStock();         Integer originSaledStock = deductStock.getOriginSaledStock();         log.info("二阶段方法:增加redis已销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);          //幂等         //当出现网络异常或者TC Server异常时,会出现重复调用commit阶段的情况,所以需要进行幂等操作         if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {             log.info("已经执行过commit阶段");             return;         }          String luaScript = LuaScript.INCREASE_SALED_STOCK;         String saledStockKey = CacheSupport.SALED_STOCK;         String productStockKey = CacheSupport.buildProductStockKey(skuCode);         redisCache.execute(new DefaultRedisScript<>(luaScript, Long.class), Arrays.asList(productStockKey, saledStockKey), String.valueOf(saleQuantity), String.valueOf(originSaledStock));          //移除标识         TccResultHolder.removeResult(getClass(), skuCode, xid);     }      @Override     public void rollback(BusinessActionContext actionContext) {         String xid = actionContext.getXid();         DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);         String skuCode = deductStock.getSkuCode();         Integer saleQuantity = deductStock.getSaleQuantity();         Integer originSaleStock = deductStock.getOriginSaleStock();         Integer originSaledStock = deductStock.getOriginSaledStock();         log.info("回滚:增加redis销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);          //空回滚处理         if (TccResultHolder.isTagNull(getClass(), skuCode, xid)) {             log.info("redis:出现空回滚");             insertEmptyRollbackTag();             return;         }          //幂等处理         //try阶段没有完成的情况下,不必执行回滚         //如果try阶段成功,而其他全局事务参与者失败,这里会执行回滚         if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {             log.info("redis:无需回滚");             return;         }          String luaScript = LuaScript.RESTORE_SALE_STOCK;         String saleStockKey = CacheSupport.SALE_STOCK;         String productStockKey = CacheSupport.buildProductStockKey(skuCode);         redisCache.execute(new DefaultRedisScript<>(luaScript, Long.class), Arrays.asList(productStockKey, saleStockKey), String.valueOf(saleQuantity), String.valueOf(originSaleStock - saleQuantity));          //移除标识         TccResultHolder.removeResult(getClass(), skuCode, xid);     }     ... }

 

6.库存服务TCC事务的空悬挂问题

(1)空悬挂问题

(2)解决空悬挂的思路

 

(1)空悬挂问题

因为网络延迟等原因,分支事务的rollback()方法可能会比try()方法先执行,即rollback()方法进行了空回滚,然后try()方法才执行,从而导致try()方法预留的资源无法被取消。

 

(2)解决空悬挂的思路

当rollback()方法出现空回滚时,需要进行标识(如在数据库中查一条记录),然后在try()方法里会判断是否发生了空回滚。

@Service public class LockMysqlStockTccServiceImpl implements LockMysqlStockTccService {     ...     //这就是TCC的try     @Transactional(rollbackFor = Exception.class)     @Override     public boolean deductStock(BusinessActionContext actionContext, DeductStockDTO deductStock) {         //actionContext上下文获取全局事务xid         String xid = actionContext.getXid();         String skuCode = deductStock.getSkuCode();         Integer saleQuantity = deductStock.getSaleQuantity();         Integer originSaleStock = deductStock.getOriginSaleStock();         Integer originSaledStock = deductStock.getOriginSaledStock();          //标识try阶段开始执行         TccResultHolder.tagTryStart(getClass(), skuCode, xid);          //悬挂问题:rollback接口比try接口先执行,即rollback接口进行了空回滚,try接口才执行,导致try接口预留的资源无法被取消         //解决空悬挂的思路:即当rollback接口出现空回滚时,需要打一个标识(在数据库中查一条记录),在try()里判断是否发生了空回滚         if (isEmptyRollback()) {             return false;         }         log.info("一阶段方法:扣减MySQL销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);          int result = productStockDAO.deductSaleStock(skuCode, saleQuantity, originSaleStock);         //标识try阶段执行成功         if (result > 0) {             TccResultHolder.tagTrySuccess(getClass(), skuCode, xid);         }          return result > 0;     }      //判断是否发生的空回滚     private Boolean isEmptyRollback() {         //需要查询本地数据库,看是否发生了空回滚         return false;     }      //插入空回滚标识     private void insertEmptyRollbackTag() {         //在数据库插入空回滚的标识     }      @Override     public void rollback(BusinessActionContext actionContext) {         String xid = actionContext.getXid();         DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);         String skuCode = deductStock.getSkuCode();         Integer saleQuantity = deductStock.getSaleQuantity();         Integer originSaleStock = deductStock.getOriginSaleStock();         Integer originSaledStock = deductStock.getOriginSaledStock();         log.info("回滚:增加mysql销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);          //空回滚处理         if (TccResultHolder.isTagNull(getClass(), skuCode, xid)) {             log.info("mysql:出现空回滚");             //插入空回滚标识             insertEmptyRollbackTag();             return;         }          //幂等处理         //try阶段没有完成的情况下,不必执行回滚,因为try阶段有本地事务,事务失败时已经进行了回滚         //如果try阶段成功,而其他全局事务参与者失败,这里会执行回滚         if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {             log.info("mysql:无需回滚");             return;         }          //1.还原销售库存         productStockDAO.restoreSaleStock(skuCode, saleQuantity, originSaleStock - saleQuantity);          //2.删除库存扣减日志         ProductStockLogDO logDO = productStockLogDAO.getLog(deductStock.getOrderId(), skuCode);         if (null != logDO) {             productStockLogDAO.removeById(logDO.getId());         }          //移除标识         TccResultHolder.removeResult(getClass(), skuCode, xid);     }     ... }

 

7.库存服务TCC二阶段重试的幂等问题

如果执行commit失败,Seata Server会让分支事务不断重试commit。如果执行cancel失败,Seata Server会让分支事务不断重试cancel。只要出现重试,就需要保证重试操作的方法是幂等的。

 

当try开始执行时,会添加标识,表明开启了TCC事务。当标识被移除掉后,则说明commit或cancel执行成功。

 

重复执行commit或cancel时,通过判断标识是否为空,就能拦截掉重复执行的commit或cancel,从而实现幂等。

@Service public class LockMysqlStockTccServiceImpl implements LockMysqlStockTccService {     @Autowired     private ProductStockDAO productStockDAO;      @Autowired     private ProductStockLogDAO productStockLogDAO;     ...      @Override     public void commit(BusinessActionContext actionContext) {         String xid = actionContext.getXid();         DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);         String skuCode = deductStock.getSkuCode();         Integer saleQuantity = deductStock.getSaleQuantity();         Integer originSaleStock = deductStock.getOriginSaleStock();         Integer originSaledStock = deductStock.getOriginSaledStock();         log.info("二阶段方法:增加mysql已销售库存,deductStock,xid={}", JSONObject.toJSONString(deductStock), xid);          //幂等         //当出现网络异常或者TC Server异常时,会出现重复调用commit阶段的情况,所以需要进行幂等操作         if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {             return;         }          //1.增加已销售库存         productStockDAO.increaseSaledStock(skuCode, saleQuantity, originSaledStock);          //2.插入一条扣减日志表         log.info("插入一条扣减日志表");         productStockLogDAO.save(buildStockLog(deductStock));          //移除标识         TccResultHolder.removeResult(getClass(), skuCode, xid);     }      @Override     public void rollback(BusinessActionContext actionContext) {         String xid = actionContext.getXid();         DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);         String skuCode = deductStock.getSkuCode();         Integer saleQuantity = deductStock.getSaleQuantity();         Integer originSaleStock = deductStock.getOriginSaleStock();         Integer originSaledStock = deductStock.getOriginSaledStock();         log.info("回滚:增加mysql销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);          //空回滚处理         if (TccResultHolder.isTagNull(getClass(), skuCode, xid)) {             log.info("mysql:出现空回滚");             insertEmptyRollbackTag();             return;         }          //幂等处理         //try阶段没有完成的情况下,不必执行回滚,因为try阶段有本地事务,事务失败时已经进行了回滚         //如果try阶段成功,而其他全局事务参与者失败,这里会执行回滚         if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {             log.info("mysql:无需回滚");             return;         }          //1.还原销售库存         productStockDAO.restoreSaleStock(skuCode, saleQuantity, originSaleStock - saleQuantity);          //2.删除库存扣减日志         ProductStockLogDO logDO = productStockLogDAO.getLog(deductStock.getOrderId(), skuCode);         if (null != logDO) {             productStockLogDAO.removeById(logDO.getId());         }          //移除标识         TccResultHolder.removeResult(getClass(), skuCode, xid);     }     ... }  //存储TCC第一阶段执行结果,用于解决TCC幂等,空回滚,悬挂问题 public class TccResultHolder {     //标识TCC try阶段开始执行的标识     private static final String TRY_START = "TRY_START";      //标识TCC try阶段执行成功的标识     private static final String TRY_SUCCESS = "TRY_SUCCESS";      //保存TCC事务执行过程的状态     private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();     ...      //判断try阶段是否执行成功     public static boolean isTrySuccess(Class<?> tccClass, String bizKey, String xid) {         String v = getResult(tccClass, bizKey, xid);         if (StringUtils.isNotBlank(v) && TRY_SUCCESS.equals(v)) {             return true;         }         return false;     }      public static String getResult(Class<?> tccClass, String bizKey, String xid) {         Map<String, String> results = map.get(tccClass);         if (results != null) {             return results.get(getTccExecution(xid, bizKey));         }         return null;     }      public static void removeResult(Class<?> tccClass, String bizKey, String xid) {         Map<String, String> results = map.get(tccClass);         if (results != null) {             results.remove(getTccExecution(xid, bizKey));         }     }     ... }

 

8.假设使用异步锁库存方案可能会导致的几种问题

(1)发送库存扣减消息到MQ失败导致超卖

(2)消费库存扣减消息失败重试时出现重复消费

(3)大量并发扣减请求进来扣库存导致大量扣减失败

 

假如锁定优惠券 + 生成订单数据写库之后,通过发送库存扣减消息到MQ。后续再由消费者消费MQ的库存扣减消息,完成数据库 + 缓存的双写。通过这种异步锁库存的方式,来实现隔离Seata AT模式和Seata TCC模式。那么就可能会出现如下问题:

 

(1)发送库存扣减消息到MQ失败导致超卖

比如10个库存,发了11次消息,其中1次失败,那么就会造成10个库存扣完了,但是生成了11个订单。

 

(2)消费库存扣减消息失败重试时出现重复消费

比如10个库存,发了9次消息,其中一条消息重复消费了两次。这样库存扣完了,但只生成9个订单。

 

(3)大量并发扣减请求进来扣库存导致大量扣减失败

出现生成订单成功,但是后续出现大量扣库存失败,导致退款。

 

所以,异步锁库存并不科学,锁库存还是要使用同步。也就是生成订单到数据库 + 锁优惠券 + 锁库存,使用AT模式绑定成刚性事务。但由于锁库存使用了TCC模式,所以锁库存的分支事务不用再竞争全局锁,从而提高了锁库存的并发性能,而且TCC模式也保证了双写数据库 + 缓存的数据一致性。

 

9.生单链路的AT + TCC混合事务方案流程总结

一.生成订单数据 + 锁定优惠券,使用的是AT模式

订单数据和营销数据通常不需要做异构存储,使用数据库存储即可。往数据库写入订单数据 + 锁优惠券,由于都是与用户关联,所以即使并发情况下也不会出现竞争全局锁。

 

二.锁库存双写数据库 + 缓存,使用的是TCC模式

库存数据需要异构存储,所以扣减库存时,需要操作数据库 + 缓存。双写数据库 + 缓存会面临数据一致性问题,TCC模式可以保证数据一致性。

 

锁库存使用TCC模式后,即便出现大量并发请求锁库存,也不需要竞争AT模式下的全局锁了。

订单初版—6.生单链路实现的重构文档

 

10.生单链路非分布式事务的纯补偿方案

也有很多公司并没有使用Seata分布式事务这种比较复杂的技术,而是使用纯补偿方案来实现生单链路。

 

生单链路纯补偿方案需要引入操作日志来实现补偿检查,锁定优惠券需要有操作日志,锁定库存也需要有操作日志。

 

无论生单是否成功,都要发送消息到MQ,以生单请求为基础去检查锁定优惠券和锁定库存的操作日志。

 

如果生单成功,但锁定优惠券或锁定库存的操作日志缺失,则进行锁定优惠券或锁定库存的补偿操作。

 

如果生单失败,但锁定优惠券或锁定库存的操作日志显示锁定成功,则需要释放优惠券或库存的资源。

订单初版—6.生单链路实现的重构文档

 

发表评论

评论已关闭。

相关文章

当前内容话题
  • 0