数据并发安全校验处理工具类

一、项目现存问题描述

  当前系统项目中,存在一些并发安全风险问题(虽然并发量较小)。特别是在处理审批状态修改和涉及金额数量的操作,由于缺乏有效的并发控制,可能会导致业务逻辑重复执行和数据不一致。例如 并发场景下,多个线程同时尝试更新同一笔交易状态或金额,这不仅会导致数据不一致,还可能引发更严重的相关业务逻辑错误。

二、一般处理方案概述

乐观锁:

通过在表中添加一个版本号字段来实现,当更新记录时,检查版本号是否与读取时相同,否则表示数据已被其他事务修改,需要重试。PS:需要现行表增加字段并修改代码支持,改动稍大

悲观锁:

使用数据库提供的锁机制,在查询时即锁定记录。PS:应避免表级锁,查询条件应使用到索引字段

分布式锁:

对于跨服调用的场景,可以采用redis等缓存技术实现分布式锁,确保在同一时刻只有一个服务实例能够对共享资源进行操作。PS:我们的项目开发规则不支持服务层使用redis组件,固开发了这个工具类

事务管理:

合理配置事务隔离级别,确保事务间的可见性服务预期,避免脏读、不可重复读等问题。PS:不便于后期维护,容易造成事务的未知风险

三、基于现行项目的工具类设计方案

结合项目实际情况,设计了一个专用于解决此类并发问题的工具类。该工具类采用了悲观锁方案,使用便捷,以下是不分测试样例与工具类源码。

1、使用测试样例:

    //修改金额     @Transactional(rollbackFor = Exception.class)     public void addUserAccountAmount() {         ConcurrentDataUtils.updateAmount(UserInfo::getAccountAmount, 600, this,Pair.of(UserAccount::getId, 10));     }   //判断是否与预期值一致 eg:判断审批状态 是否为待审批,否则应拦截     @Transactional(rollbackFor = Exception.class)     public void isEqual() {         ConcurrentDataUtils.isEqual(Approve::getStatus, 0, this,Pair.of(Approve::getForeignId, 1122334));     }     //判断数据是否已存在 eg:同步、保存等场景     @Transactional(rollbackFor = Exception.class)     public void isExist() {         ConcurrentDataUtils.isExist(userInfoService,Pair.of(UserInfo::getIdCard, 1122334),Pair.of(UserInfo::getIsDelete,0));     }

2、工具类源码

数据并发安全校验处理工具类

package com.example.dlock_demo.utils;  import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.support.SFunction; import com.baomidou.mybatisplus.extension.service.IService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair;  import java.io.Serializable; import java.lang.invoke.SerializedLambda; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.math.BigDecimal; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap;  /**  * 并发校验、处理数据工具类  *  * @author: shf  * @date: 2025年02月14日 11:13  */ @Slf4j public class ConcurrentDataUtils {      private static final String DEFAULT_LAST_JOIN_SQL = "ORDER BY id DESC Limit 1 FOR UPDATE";      private static Map<Class<?>, SerializedLambda> CLASS_LAMDBA_CACHE = new ConcurrentHashMap<>();      /**      * 校验数据是否已存在      * <p>      * 根据查询条件默认查询的是满足条件的最后一条数据做判断或更新,查询条件入参优先传ID(行锁)、业务编号等表中唯一标识字段;      *      * @param <T>            实体类型      * @param <C>            查询条件列值的类型      * @param service        数据库操作Service      * @param conditionPairs 数据库查询条件(key:条件查询列字段 eg:删除状态;value:对应值)      */     @SafeVarargs     public static <T, C> boolean isExist(IService<T> service, Pair<SFunction<T, C>, C>... conditionPairs) {         if (service == null || conditionPairs == null || conditionPairs.length == 0) {             throw new RuntimeException("并发校验处理数据ConcurrentDataUtils-isExist查询条件为空");         }         return lockAndGet(service, conditionPairs) != null;     }      /**      * 校验入参与数据库字段值是否一致      * <p>      * 根据查询条件默认查询的是满足条件的最后一条数据做判断或更新,查询条件入参优先传ID(行锁)、业务编号等表中唯一标识字段;      * 期望值入参类型需要与实体字段数据类型一致;      *      * @param <T>            实体类型      * @param <R>            待验证值的类型      * @param <C>            查询条件列值的类型      * @param targetColumn   实体类查询字段      * @param expectVal      期望值 需要与实体字段数据类型一致      * @param service        数据库操作Service      * @param conditionPairs 数据库查询条件(key:条件查询列字段 eg:删除状态;value:对应值)      */     @SafeVarargs     public static <T, R, C> boolean isEqual(SFunction<T, R> targetColumn, R expectVal, IService<T> service, Pair<SFunction<T, C>, C>... conditionPairs) {         if (targetColumn == null || service == null || conditionPairs == null || conditionPairs.length == 0) {             throw new RuntimeException("并发校验处理数据ConcurrentDataUtils-isEqual查询条件为空");         }         T t = lockAndGet(service, conditionPairs);         if (t == null) {             log.warn("并发校验处理数据ConcurrentDataUtils-expectEqual查询为空");             return false;         }         R columnVal = targetColumn.apply(t);         return Objects.equals(expectVal, columnVal);     }      /**      * 处理增加或减少金额、数量      * <p>      * 根据查询条件默认查询的是满足条件的最后一条数据做判断或更新,查询条件入参优先传ID(行锁)、业务编号等表中唯一标识字段;      * 新增值入参类型需要与实体字段数据类型一致;      * 支持BigDecimal、long、int      *       * @param <T>            实体类型      * @param <R>            待验证值的类型      * @param <C>            查询条件列值的类型      * @param targetColumn   实体类查询字段      * @param thisVal        本次新增值 需要与实体字段数据类型一致      * @param service        数据库操作Service      * @param conditionPairs 数据库查询条件(key:条件查询列字段 eg:删除状态;value:对应值)      */     @SafeVarargs     public static <T, R, C> boolean updateAmount(SFunction<T, R> targetColumn, R thisVal, IService<T> service, Pair<SFunction<T, C>, C>... conditionPairs) {         if (targetColumn == null || thisVal == null || service == null || conditionPairs == null || conditionPairs.length == 0) {             throw new RuntimeException("并发校验处理数据ConcurrentDataUtils-updateAmount查询条件为空");         }         try {             T t = lockAndGet(service, conditionPairs);             if (t == null) {                 log.warn("并发校验处理数据ConcurrentDataUtils-update未查询到有效数据");                 return false;             }             R columnVal = targetColumn.apply(t);             R compute = compute(columnVal, thisVal);             //获取字段             String fieldName = getFieldName(targetColumn);             Field field = t.getClass().getDeclaredField(fieldName);             field.setAccessible(true);             field.set(t, compute);             return service.updateById(t);         } catch (Throwable e) {             log.error("并发校验处理数据ConcurrentDataUtils-update异常:", e);             throw new RuntimeException("并发校验处理数据ConcurrentDataUtils-update异常");         }     }      /**      * 根据查询条件默认查询的是满足条件的最后一条数据做判断或更新,查询条件入参优先传ID(行锁)、业务编号等表中唯一标识字段;      *      * @param <T>            实体类型      * @param <R>            查询字段值的类型      * @param <C>            查询条件列值的类型      * @param service        数据库操作Service      * @param conditionPairs 数据库查询条件(key:条件查询列字段 eg:删除状态;value:对应值)      * @return 实体数据      */     @SafeVarargs     private static <T, R, C> T lockAndGet(IService<T> service, Pair<SFunction<T, C>, C>... conditionPairs) {         if (service == null || conditionPairs == null) {             return null;         }         //加锁数据查询         LambdaQueryWrapper<T> wrapper = new LambdaQueryWrapper<>();         //动态拼接参数对         for (Pair<SFunction<T, C>, C> pair : conditionPairs) {             if (Objects.nonNull(pair.getValue())) {                 wrapper.eq(pair.getKey(), pair.getValue());             }         }         wrapper.last(DEFAULT_LAST_JOIN_SQL);         return service.getOne(wrapper);     }      /**      * 计算增减结果      *      * @param columnVal 字段原值      * @param thisVal   本次变动值      * @param <R>       计算结果类型      * @return 增减计算结果      */     private static <R> R compute(R columnVal, R thisVal) {         if (columnVal instanceof BigDecimal) {             BigDecimal original = Optional.of((BigDecimal) columnVal).orElse(BigDecimal.ZERO);             BigDecimal addVal = (BigDecimal) thisVal;             return (R) original.add(addVal);         } else if (columnVal instanceof Integer) {             Integer original = Optional.of((Integer) columnVal).orElse(0);             Integer addVal = (Integer) thisVal;             Integer i = original + addVal;             return (R) i;         } else if (columnVal instanceof Long) {             Long original = Optional.of((Long) columnVal).orElse(0L);             Long addVal = (Long) thisVal;             Long l = original + addVal;             return (R) l;         } else {             throw new RuntimeException("并发校验处理数据ConcurrentDataUtils-compute不支持的数据类型");         }     }      /***      * 转换方法引用为属性名      * @param fn      * @return      */     public static <T, R> String getFieldName(SFunction<T, R> fn) {         SerializedLambda lambda = getSerializedLambda(fn);         String methodName = lambda.getImplMethodName();         String prefix = null;         if (methodName.startsWith("get")) {             prefix = "get";         }         // 截取get之后的字符串并转换首字母为小写         return toLowerCaseFirstOne(methodName.replace(prefix, ""));     }      /**      * 首字母转小写      *      * @param s      */     private static String toLowerCaseFirstOne(String s) {         if (Character.isLowerCase(s.charAt(0))) {             return s;         } else {             return (new StringBuilder()).append(Character.toLowerCase(s.charAt(0))).append(s.substring(1)).toString();         }     }      private static SerializedLambda getSerializedLambda(Serializable fn) {         SerializedLambda lambda = CLASS_LAMDBA_CACHE.get(fn.getClass());         if (lambda == null) {             try {                 Method method = fn.getClass().getDeclaredMethod("writeReplace");                 method.setAccessible(Boolean.TRUE);                 lambda = (SerializedLambda) method.invoke(fn);                 CLASS_LAMDBA_CACHE.put(fn.getClass(), lambda);             } catch (Exception e) {                 e.printStackTrace();             }         }         return lambda;     }   }

ConcurrentDataUtils工具类

3、使用注意事项

  • 使用该工具类需要放在事务中,如果没有加事务, 则只是做了一层基本的查询判断,不能彻底解决并发问题;
  • 传参查询条件,优先传ID主键或业务唯一标识编号(建议设计表时将业务编号字段设置为普通索引,由于有逻辑删除字段固不能设置为唯一索引),走行级锁;
  • 实际使用时,查询条件需要传入删除状态字段(如果表设计规则该字段一致,则可优化省略,在工具类源码中加入非删除查询即可);
  • 仅支持单表获取单条数据的校验或更新操作且查询条件为等量查询(wrapper.eq(...)),查询条件必传否则抛出异常;
  • 避免大事务问题,如果除了使用该工具类,还有大量其他耗时业务处理,可将该工具类使用部分以及涉及到该表的数据处理独立成一个方法, 单独放在一个小事务中(需兼顾数据一致性,尽量将所有数据表操作放在一起)。
发表评论

评论已关闭。

相关文章