商品中心—7.自研缓存框架的技术文档

大纲

1.商品C端系统监听商品变更及刷新缓存

2.自研缓存框架的数据表缓存组件

3.自研缓存框架的通用缓存读写组件与DB操作组件

 

1.商品C端系统监听商品变更及刷新缓存

FlushRedisCache的flushRedisStringData()方法刷新缓存的逻辑是:首先从DB查询最新的数据 -> 然后删除旧缓存 -> 最后更新缓存。

@Configuration public class ConsumerBeanConfig {     //配置内容对象     @Autowired     private RocketMQProperties rocketMQProperties;          //监听商品修改的MQ消息     @Bean("productUpdateTopic")     public DefaultMQPushConsumer productUpdateTopic(ProductUpdateListener productUpdateListener) throws MQClientException {         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.PRODUCT_UPDATE_CONSUMER_GROUP);         consumer.setNamesrvAddr(rocketMQProperties.getNameServer());         consumer.subscribe(RocketMqConstant.PRODUCT_UPDATE_TOPIC, "*");         consumer.registerMessageListener(productUpdateListener);         consumer.start();         return consumer;     } }  //商品变更时的缓存处理 @Component public class ProductUpdateListener implements MessageListenerConcurrently {     @DubboReference(version = "1.0.0")     private TableDataUpdateApi tableDataUpdateApi;          @Override     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {         try {             for (MessageExt messageExt : list) {                 //消息处理这里,涉及到sku的缓存更新以及对应的整个商品明细的缓存更新                 String msg = new String(messageExt.getBody());                 log.info("执行商品缓存数据更新逻辑,消息内容:{}", msg);                    TableDataChangeDTO tableDataChangeMessage = JsonUtil.json2Object(msg, TableDataChangeDTO.class);                 //更新sku对应的商品缓存信息                 tableDataUpdateApi.tableDataChange(tableDataChangeMessage);                 //发送回调消息通知                 tableDataUpdateApi.sendCallbackMessage(tableDataChangeMessage);             }         } catch (Exception e) {             log.error("consume error, 商品缓存更新失败", e);             //本次消费失败,下次重新消费             return ConsumeConcurrentlyStatus.RECONSUME_LATER;         }         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;     } }  //商品信息变更服务 @DubboService(version = "1.0.0", interfaceClass = TableDataUpdateApi.class, retries = 0) public class TableDataUpdateApiImpl implements TableDataUpdateApi {     @Resource     private FlushRedisCache flushRedisCache;          private ExecutorService executorService = Executors.newFixedThreadPool(10);         //商品表数据变更逆向更新缓存     @Override     public JsonResult tableDataChange(TableDataChangeDTO tableDataChangeDTO) {         executorService.execute(() -> {             try {                 //刷新缓存数据                 flushRedisCache.flushRedisStringData(false, tableDataChangeDTO.getTableName(), Sets.newHashSet(tableDataChangeDTO.getKeyId()));             } catch (Exception e) {                 log.error("刷新string类型缓存异常,入参为tableDataChangeDTO={}", tableDataChangeDTO, e);             }             try {                 flushRedisCache.flushRedisSetData(false, tableDataChangeDTO.getTableName(), Sets.newHashSet(tableDataChangeDTO.getKeyId()));             } catch (Exception e) {                 log.error("刷新set类型缓存异常,入参为tableDataChangeDTO={}", tableDataChangeDTO, e);             }             try {                 flushRedisCache.flushRedisSortedSetData(false, tableDataChangeDTO.getTableName(), Sets.newHashSet(tableDataChangeDTO.getKeyId()));             } catch (Exception e) {                 log.error("刷新sortedset类型缓存异常,入参为tableDataChangeDTO={}", tableDataChangeDTO, e);             }         });         return JsonResult.buildSuccess();     }     ... }  //数据变更—刷新缓存 @Component public class FlushRedisCache {     //继承了AbstractRedisStringCache的缓存实例会被注入到abstractRedisStringCacheMap这个map中     //例如CategoryBaseCache、FrontCategoryCache、ItemCollectCache、ProductDetailCache、SkuCollectCache等     @Autowired     private Map<String, AbstractRedisStringCache> abstractRedisStringCacheMap;          //更新string类型缓存     //@param flushAll  是否全量刷新     //@param tableName 表名     //@param idSet     主键ID集合     public void flushRedisStringData(boolean flushAll, String tableName, Set<Long> idSet) {         for (Map.Entry<String, AbstractRedisStringCache> entry : abstractRedisStringCacheMap.entrySet()) {             AbstractRedisStringCache stringCache = entry.getValue();             if (flushAll) {                 stringCache.flushRedisStringDataByTableUpdateData();                 continue;             }             //继承AbstractRedisStringCache的每个缓存实例都指定来表名,如下用于匹配出对应的缓存实例             if (stringCache.getTableName().contains(tableName)) {                 stringCache.flushRedisStringDataByTableAndIdSet(tableName, idSet);             }         }     }     ... }  //Redis(string)缓存抽象类 public abstract class AbstractRedisStringCache<DO, BO> {     @Resource     private RedisReadWriteManager redisReadWriteManager;     ...          //刷新缓存—根据主表ID集合(关联表变更需要查询主表)     public void flushRedisStringDataByTableAndIdSet(String tableName, Set<Long> idSet) {         Optional<Set<Long>> idSetOpt = getStringDatabase().getTableIdSetByRelationTableIdSet(tableName, idSet);         if (!idSetOpt.isPresent()) {             return;         }         flushRedisStringDataByIdSet(idSetOpt.get());     }          //刷新缓存—根据主键ID集合     //@param idSet 数据表主键ID     private void flushRedisStringDataByIdSet(Set<Long> idSet) {         //根据id集合从数据库中查询出数据         Optional<RedisStringCache<DO>> stringSourceOpt = getStringDatabase().listTableDataByIdSet(idSet, queryType());         if (!stringSourceOpt.isPresent()) {             return;         }         RedisStringCache<DO> redisStringCache = stringSourceOpt.get();         if (!CollectionUtils.isEmpty(redisStringCache.getDeleteSet())) {             //通过缓存读写组件删除缓存             redisReadWriteManager.delete(redisStringCache.getDeleteSet().stream().map(this::getRedisKey).collect(toSet()).toArray(new String[]{}));         }         if (CollectionUtils.isEmpty(redisStringCache.getAddList())) {             return;         }         List<BO> boList = convertDO2BO(redisStringCache.getAddList());         Map<String, BO> redisMap = convertBO2Map(boList);         //通过缓存读写组件写入缓存         redisReadWriteManager.batchWriteRedisString(redisMap);     }     ... }  //缓存读写管理 @Service public class RedisReadWriteManager {     @Resource     private RedisCache redisCache;     ...          //删除指定的key     public void delete(String... keys) {         Arrays.asList(keys).stream().forEach(key -> redisCache.delete(key));     }          //批量添加string缓存     public <T> void batchWriteRedisString(Map<String, T> redisMap) {         List<Map.Entry<String, T>> list = Lists.newArrayList(redisMap.entrySet());         try {             for (List<Map.Entry<String, T>> entries : Lists.partition(list, PAGE_SIZE_100)) {                 for (Map.Entry<String, T> entry : entries) {                     redisCache.setex(true, entry.getKey(), JSON.toJSONString(entry.getValue()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));                 }                 try {                     Thread.sleep(SLEEP_100);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }         } catch (Exception e) {             log.error("批量添加string缓存异常 redisMap={}", redisMap, e);         }     }     ... }

 

2.自研缓存框架的数据表缓存组件

(1)自研缓存框架的目录结构

(2)通过注解实现继承同一个抽象类的数据表缓存组件实例自动注入Map

 

(1)自研缓存框架的目录结构

商品中心—7.自研缓存框架的技术文档

(2)通过注解实现继承同一个抽象类的数据表缓存组件实例自动注入Map

继承AbstractRedisStringCache的数据表缓存组件实例会被注入到Map中,之后通过Map便可以根据表名来获取对应的数据表缓存组件实例了。数据表缓存组件实例中会封装一些获取DB数据表数据、获取缓存key等方法,而其继承的抽象类会提供一些根据DB获取缓存、刷新缓存等模版方法。

@Component public class FlushRedisCache {     //继承了AbstractRedisStringCache的缓存实例会被注入到abstractRedisStringCacheMap这个map中     //例如CategoryBaseCache、FrontCategoryCache、ItemCollectCache、ProductDetailCache、SkuCollectCache等     @Autowired     private Map<String, AbstractRedisStringCache> abstractRedisStringCacheMap;      //继承了AbstractRedisSortedSetCache的缓存实例会被注入到abstractRedisSortedSetCacheMap这个map中     //例如SkuSellerCache     @Autowired     private Map<String, AbstractRedisSortedSetCache> abstractRedisSortedSetCacheMap;     ... }  @Service("productDetailCache") public class ProductDetailCache extends AbstractRedisStringCache<ProductDetailDO, ProductDetailBO> {     //DB操作组件     @Resource     private ProductDetailStringDatabase productDetailStringDatabase;          @Resource     private ProductDetailConverter productDetailConverter;          @Override     protected RedisStringDatabase<ProductDetailDO> getStringDatabase() {         return productDetailStringDatabase;     }          @Override     protected String getPendingRedisKey() {         return AbstractRedisKeyConstants.PRODUCT_SKU_INFO_STRING;     }          @Override     protected List<ProductDetailBO> convertDO2BO(Collection<ProductDetailDO> doList) {         return productDetailConverter.converterDetailList(doList);     }          @Override     protected Map<String, Object> getTableFieldsMap(String key) {         Map<String, Object> dbInputMap = Maps.newHashMap();         dbInputMap.put(SkuCollectStringDatabase.SKU_ID, key);         return dbInputMap;     }          @Override     protected Class<ProductDetailBO> getBOClass() {         return ProductDetailBO.class;     }     ... }  //Redis(string)缓存抽象类 //@param <DO> 数据对象 //@param <BO> 缓存对象 public abstract class AbstractRedisStringCache<DO, BO> {     @Resource     private RedisReadWriteManager redisReadWriteManager;          //获取redis key     //@param key 需要替换的关键字     protected String getRedisKey(String key) {         return String.format(getPendingRedisKey(), key);     }          //单个获取数据库表名     public String getTableName() {         return getStringDatabase().getTableName();     }          //获取DB读取对象     protected abstract RedisStringDatabase<DO> getStringDatabase();          //获取待处理的Redis key     protected abstract String getPendingRedisKey();          //DO转BO     protected abstract List<BO> convertDO2BO(Collection<DO> doList);          //关联表字段值     protected abstract Map<String, Object> getTableFieldsMap(String key);          //获取BO对象的class     protected abstract Class<BO> getBOClass();     ...          //模版方法:根据关键字批量获取数据     //@param useLocalCache 是否使用本地缓存     //@param keyList       入参关键字     public Optional<List<BO>> listRedisStringData(Boolean useLocalCache, List<String> keyList) {         if (CollectionUtils.isEmpty(keyList)) {             return Optional.empty();         }         Optional<List<BO>> boListOpt = redisReadWriteManager.listRedisStringDataByCache(useLocalCache, keyList,             getBloomKey(), getStringDatabase()::getTableSingleFiled, getStringDatabase().getBloomField(),             getBOClass(), this::getRedisKey, (key) -> {                 Map<String, Object> tableFieldsMap = getTableFieldsMap(key);                 Optional<DO> doOpt;                 try {                     doOpt = getStringDatabase().getTableData(tableFieldsMap, queryType());                 } catch (Exception e) {                     log.error("根据关键字批量获取数据异常 key={},paramMap={}", key, tableFieldsMap, e);                     return Optional.empty();                 }                 if (!doOpt.isPresent()) {                     return Optional.empty();                 }                 List<BO> boList = convertDO2BO(Arrays.asList(doOpt.get()));                 if (CollectionUtils.isEmpty(boList)) {                     return Optional.empty();                 }                 return Optional.of(boList.get(0));             }         );         return boListOpt;     }          //模版方法:根据多关键字批量获取集合数据     //@param keyList    入参关键字     //@param requestMap key 数据库表字段,value 字段值,该map中的字段不要与getTableFieldsMap(key)获取的字段重复     public Optional<List<BO>> listRedisStringData(List<String> keyList, Map<String, Object> requestMap) {         if (CollectionUtils.isEmpty(keyList)) {             return Optional.empty();         }         Optional<List<BO>> boListOpt = redisReadWriteManager.listRedisStringDataByBatchCache(keyList, getBOClass(), this::getRedisKey, (key) -> {             Map<String, Object> tableFieldsMap = getTableFieldsMap(key);             if (MapUtils.isNotEmpty(requestMap)) {                 tableFieldsMap.putAll(requestMap);             }             Optional<List<DO>> doOpt;             try {                 doOpt = getStringDatabase().listTableData(tableFieldsMap, queryType());             } catch (Exception e) {                 log.error("根据关键字批量获取数据异常 key={},paramMap={}", key, tableFieldsMap, e);                 return Optional.empty();             }             if (!doOpt.isPresent()) {                 return Optional.empty();             }             List<BO> boList = convertDO2BO(doOpt.get());             if (CollectionUtils.isEmpty(boList)) {                 return Optional.empty();             }             return Optional.of(boList);         });         return boListOpt;     }          //模版方法:刷新缓存—根据主表ID集合(关联表变更需要查询主表)     //@param tableName 关联表名     //@param idSet     关联表主键集合     public void flushRedisStringDataByTableAndIdSet(String tableName, Set<Long> idSet) {         Optional<Set<Long>> idSetOpt = getStringDatabase().getTableIdSetByRelationTableIdSet(tableName, idSet);         if (!idSetOpt.isPresent()) {             return;         }         flushRedisStringDataByIdSet(idSetOpt.get());     }          //模版方法:刷新缓存—根据表变更数据ID集合     public void flushRedisStringDataByTableUpdateData() {         Optional<Set<Long>> updateIdSetOpt = getStringDatabase().getTableUpdateIdSet();         if (!updateIdSetOpt.isPresent()) {             return;         }         flushRedisStringDataByIdSet(updateIdSetOpt.get());     }          //模版方法:刷新缓存—根据主键ID集合     //@param idSet 数据表主键ID     private void flushRedisStringDataByIdSet(Set<Long> idSet) {         //根据id集合从数据库中查询出数据         Optional<RedisStringCache<DO>> stringSourceOpt = getStringDatabase().listTableDataByIdSet(idSet, queryType());         if (!stringSourceOpt.isPresent()) {             return;         }         RedisStringCache<DO> redisStringCache = stringSourceOpt.get();         if (!CollectionUtils.isEmpty(redisStringCache.getDeleteSet())) {             //通过缓存读写组件删除缓存             redisReadWriteManager.delete(redisStringCache.getDeleteSet().stream().map(this::getRedisKey).collect(toSet()).toArray(new String[]{}));         }         if (CollectionUtils.isEmpty(redisStringCache.getAddList())) {             return;         }         List<BO> boList = convertDO2BO(redisStringCache.getAddList());         Map<String, BO> redisMap = convertBO2Map(boList);         //通过缓存读写组件写入缓存         redisReadWriteManager.batchWriteRedisString(redisMap);     }     ... }

 

3.自研缓存框架的通用缓存读写组件与DB操作组件

(1)通用缓存读写组件

(2)数据表的DB操作组件

 

每个数据表缓存组件都会有至少两个必备组件:一个通用缓存读写组件 + 一个数据表的DB操作组件。

 

(1)通用缓存读写组件

通用缓存读写组件也包含两个必备组件:一个操作缓存的RedisCache组件 + 一个操作分布式锁的RedisLock组件。通用缓存读写组件封装了大量基础的缓存读写操作,这些基础的缓存读写操作会结合DB读库 + 缓存问题等解决方案来进行实现。

//缓存读写管理 @Service public class RedisReadWriteManager {     @Resource     private RedisCache redisCache;          @Resource     private RedisLock redisLock;     ...          //删除指定的key     public void delete(String... keys) {         Arrays.asList(keys).stream().forEach(key -> redisCache.delete(key));     }          //批量添加string缓存     public <T> void batchWriteRedisString(Map<String, T> redisMap) {         List<Map.Entry<String, T>> list = Lists.newArrayList(redisMap.entrySet());         try {             for (List<Map.Entry<String, T>> entries : Lists.partition(list, PAGE_SIZE_100)) {                 for (Map.Entry<String, T> entry : entries) {                     redisCache.setex(true, entry.getKey(), JSON.toJSONString(entry.getValue()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));                 }                 try {                     Thread.sleep(SLEEP_100);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }         } catch (Exception e) {             log.error("批量添加string缓存异常 redisMap={}", redisMap, e);         }     }          //批量获取多缓存数据     public <T> Optional<List<T>> listRedisStringDataByBatchCache(List<String> keyList, Class<T> clazz, Function<String, String> getRedisKeyFunction, Function<String, Optional<List<T>>> getDbFuction) {         try {             List<T> list = Lists.newArrayList();             List<String> pendingKeyList = keyList.stream().distinct().collect(toList());             List<String> redisKeyList = pendingKeyList.stream().map(getRedisKeyFunction).distinct().collect(toList());             List<String> cacheList = redisCache.mget(redisKeyList);             for (int i = 0; i < cacheList.size(); i++) {                 String cache = cacheList.get(i);                 //过滤无效缓存                 if (EMPTY_OBJECT_STRING.equals(cache)) {                     continue;                 }                 if (StringUtils.isNotBlank(cache)) {                     List<T> tList = JSON.parseArray(cache, clazz);                     list.addAll(tList);                     continue;                 }                 //缓存没有则读库                 Optional<List<T>> optional = listRedisStringDataByDb(pendingKeyList.get(i), getRedisKeyFunction, getDbFuction);                 if (optional.isPresent()) {                     list.addAll(optional.get());                 }             }             return CollectionUtils.isEmpty(list) ? Optional.empty() : Optional.of(list);         } catch (Exception e) {             log.error("批量获取缓存数据异常 keyList={},clazz={}", keyList, clazz, e);             throw e;         }     }          //读取数据库表多数据赋值到Redis     public <T> Optional<List<T>> listRedisStringDataByDb(String key, Function<String, String> getRedisKeyFunction, Function<String, Optional<List<T>>> getDbFuction) {         if (StringUtils.isEmpty(key) || Objects.isNull(getDbFuction)) {             return Optional.empty();         }         try {             if (!redisLock.lock(key)) {                 return Optional.empty();             }             String redisKey = getRedisKeyFunction.apply(key);             Optional<List<T>> optional = getDbFuction.apply(key);             if (!optional.isPresent()) {                 //把空对象暂存到redis                 redisCache.setex(true, redisKey, EMPTY_OBJECT_STRING, RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_ONE_DAY, TimeUnit.HOURS, NUMBER_24));                 log.warn("发生缓存穿透 redisKey={}", redisKey);                 return optional;             }             //把表数据对象存到redis             redisCache.setex(true, redisKey, JSON.toJSONString(optional.get()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));             log.info("表数据对象存到Redis redisKey={}, data={}", redisKey, optional.get());             return optional;         } finally {             redisLock.unlock(key);         }     }     ... }

(2)数据表的DB操作组件

该组件主要提供对数据表数据的查询方法。

@Service("productDetailStringDatabase") public class ProductDetailStringDatabase extends AbstractRedisStringDatabase<ProductDetailDO> {     public static final String SKU_ID = "skuId";     private static final String TABLE_NAME = "sku_info";          @Resource     private ProductRepository productRepository;          @Override     public String getTableName() {         return TABLE_NAME;     }          @Override     public Set<String> getTableNameSet() {         return Sets.newHashSet(getTableName());     }          @Override     public Optional<ProductDetailDO> getTableData(Map<String, Object> tableFieldsMap, String queryType) {         if (tableFieldsMap.containsKey(SKU_ID)) {             String skuId = (String) tableFieldsMap.get(SKU_ID);             ProductDetailDO productDetailDO = productRepository.queryProductDetail(skuId);             if (!Objects.isNull(productDetailDO) && DelFlagEnum.EFFECTIVE.getCode().equals(productDetailDO.getDelFlag())) {                 return Optional.of(productDetailDO);             }             return Optional.empty();         }         return Optional.empty();     }          @Override     public Optional<List<ProductDetailDO>> listTableData(Map<String, Object> tableFieldsMap, String queryType) {         return Optional.empty();     }          @Override     public Optional<RedisStringCache<ProductDetailDO>> listTableDataByIdSet(Set<Long> idSet, String queryType) {         RedisStringCache redisStringCache = new RedisStringCache();         List<ProductDetailDO> addList = new ArrayList<>();         for (Long skuId : idSet) {             ProductDetailDO productDetailDO = productRepository.queryProductDetail(String.valueOf(skuId));             if (!Objects.isNull(productDetailDO)) {                 addList.add(productDetailDO);             }         }         redisStringCache.setAddList(addList);         return Optional.of(redisStringCache);     }          @Override     public Optional<RedisStringCache<ProductDetailDO>> listTableDataByIdSet(List<Long> idSet, String queryType) {         Long skuId = idSet.get(0);         ProductDetailDO productDetailDO = productRepository.queryProductDetail(String.valueOf(skuId));         RedisStringCache redisStringCache = new RedisStringCache();         redisStringCache.setAddList(Arrays.asList(productDetailDO));         return Optional.of(redisStringCache);     } }

 

发表评论

评论已关闭。

相关文章

  • 0