商品中心—13.商品卖家系统的高并发文档

大纲

1.阿里云Tair接入与Jedis连接池使用

2.Redis集群连接池与通用对象连接池源码

3.商品卖家系统高并发场景与缓存预热逻辑

4.商品卖家系统缓存预热架构设计

5.商品卖家系统单机下的缓存预热的实现

6.商品卖家系统分布式下的缓存预热的实现

7.商品卖家系统定时查询DB最新数据更新缓存

8.商品中心高并发架构总结

 

1.阿里云Tair接入与Jedis连接池使用

(1)引入依赖

(2)添加配置文件

(3)配置Tair相关Bean

(4)使用Tair相关命令

 

(1)引入依赖

Tair是多线程的,Redis是单线程的。TairJedis是阿⾥云基于Jedis开发的Redis企业版专⽤客户端。TairJedis除了Jedis原有功能,还⽀持Redis企业版包含的命令。

 

(1)引入依赖

<dependency>     <groupId>redis.clients</groupId>     <artifactId>jedis</artifactId> </dependency> <dependency>     <groupId>com.aliyun.tair</groupId>     <artifactId>alibabacloud-tairjedis-sdk</artifactId>     <version>2.1.0</version> </dependency>

(2)添加配置文件

spring:     tair:         host: r-xxxxxxxxxxxxxxx.redis.rds.aliyuncs.com         port: 6379         password: password         timeout: 3000         maxIdle: 10000         maxTotal: 10000

(3)配置Tair相关Bean

@Data @Configuration public class TairConfig {     @Value("${spring.tair.maxIdle:200}")     private int maxIdle;     @Value("${spring.tair.maxTotal:300}")     private int maxTotal;     @Value("${spring.tair.host}")     private String host;     @Value("${spring.tair.port:6379}")     private int port;     @Value("${spring.tair.password}")     private String password;     @Value("${spring.tair.timeout:3000}")     private int timeout;      @Bean     @ConditionalOnClass(JedisPool.class)     public JedisPool jedisPool() {         JedisPoolConfig config = new JedisPoolConfig();         //最大空闲连接数,需自行评估,不超过Redis实例的最大连接数         config.setMaxIdle(maxIdle);         //最大连接数,需自行评估,不超过Redis实例的最大连接数         config.setMaxTotal(maxTotal);         config.setTestOnBorrow(false);           config.setTestOnReturn(false);         return new JedisPool(config, host, port, timeout, password);     }      @Bean     @ConditionalOnClass(TairCache.class)     public TairCache tairCache(JedisPool jedisPool) {         return new TairCache(jedisPool);     } }

(4)使用Tair相关命令

@Component public class TairCache {     private JedisPool jedisPool;      public TairCache(JedisPool jedisPool) {         this.jedisPool = jedisPool;     }      public Jedis getJedis() {         return jedisPool.getResource();     }      public TairString createTairString(Jedis jedis) {         return new TairString(jedis);     }      private TairHash createTairHash(Jedis jedis) {         return new TairHash(jedis);     }      //缓存存储     public void set(String key, String value, int seconds) {         log.info("tairString set key:{}, value:{}, seconds:{}", key, value, seconds);         try (Jedis jedis = getJedis()) {             TairString tairString = createTairString(jedis);             String result;             if (seconds > 0) {                 result = tairString.exset(key, value, new ExsetParams().ex(seconds));             } else {                 result = tairString.exset(key, value);             }             log.info("tairString set key:{}, value:{}, seconds:{}", key, value, seconds);         }     }      //tairString存储     public boolean exset(String key, String value, int seconds) {         try (Jedis jedis = getJedis()) {             TairString tairString = createTairString(jedis);             String result;             if (seconds > 0) {                 result = tairString.exset(key, value, new ExsetParams().ex(seconds));             } else {                 result = tairString.exset(key, value);             }             return "OK".equals(result);         }     }      //缓存获取     public String get(String key) {         try (Jedis jedis = getJedis()) {             ExgetResult<String> exget = createTairString(jedis).exget(key);             if (exget == null) {                 return null;             }             return exget.getValue();         } catch (Exception e) {             log.error("tairString get error,key{}", key, e);         }         return null;     }      //缓存自增     public Integer incr(String key) {         return this.exincrby(key, 1);     }      //缓存自增     public Integer incr(String key, Integer incrNum) {         return this.exincrby(key, incrNum);     }      //缓存自减     public Integer decr(String key, Integer decrNum) {         return this.exincrby(key, -Math.abs(decrNum));     }      //删除缓存     public Integer delete(String key) {         try (Jedis jedis = getJedis()) {             return jedis.del(key).intValue();         }     }      //删除缓存     public Integer mdelete(List<String> keyList) {         try (Jedis jedis = getJedis()) {             return jedis.del(keyList.toArray(new String[keyList.size()])).intValue();         }     }      //缓存批量获取     public List mget(List<String> keyList) {         try (Jedis jedis = getJedis()) {             TairString tairString = createTairString(jedis);             return keyList.stream().map(key -> {                 ExgetResult<String> exget = tairString.exget(key);                 if (exget != null) {                     return exget.getValue();                 }                 return null;             }).collect(Collectors.toList());         }     }      //对TairString的value进行自增自减操作     public Integer exincrby(String key, Integer incrNum) {         try (Jedis jedis = getJedis()) {             int value = createTairString(jedis).exincrBy(key, incrNum, ExincrbyParams.ExincrbyParams().min(0)).intValue();             return value;         } catch (Exception e) {             //出现自增或者自减数据溢出等异常,直接返回操作失败             return -1;         }     }      //存储hash对象     public Integer exhset(String key, String value) {         try (Jedis jedis = getJedis()) {             Map<String, Object> innerMap = JSON.parseObject(value).getInnerMap();             Map<String, String> map = Maps.transformEntries(innerMap, (k, v) -> String.valueOf(v));             String exhmset = createTairHash(jedis).exhmset(key, map);             //成功返回 OK             return 0;         }     }      //存储hash对象     public Integer exhset(String key, String field, String value) {         try (Jedis jedis = getJedis()) {             return createTairHash(jedis).exhset(key, field, value, ExhsetParams.ExhsetParams().nx()).intValue();         }     }      //获取hash对象     public String exhget(String key, String field) {         try (Jedis jedis = getJedis()) {             String exhget = createTairHash(jedis).exhget(key, field);             log.info("exhget key:{}, field:{}, value:{}", key, field, exhget);             return exhget;         }     } }

 

2.Redis集群连接池与通用对象连接池源码

(1)Spring的RedisConnectionFactory和实现JedisConnectionFactory

(2)Jedis的Pool对象是基于Apache Commons的通用对象池的

(3)Apache Commons的通用对象池的borrowObject()方法

(4)ShardedJedisFactory中创建Jedis连接对象的具体方法

(5)Redis分片对象ShardedJedis的创建

(6)ShardedJedisPool的初始化

 

这里的Redis集群在客户端使用一致性Hash进行分片,与Redis Cluster这种服务端集群是不同的。

 

(1)Spring的RedisConnectionFactory和实现JedisConnectionFactory

@Data @Configuration @ConditionalOnClass(RedisConnectionFactory.class) public class RedisConfig {     ...     @Bean     @ConditionalOnClass(RedisConnectionFactory.class)     public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {     RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();         redisTemplate.setConnectionFactory(redisConnectionFactory);         redisTemplate.setDefaultSerializer(new StringRedisSerializer());         redisTemplate.afterPropertiesSet();         return redisTemplate;     }     ... }  public interface RedisConnectionFactory extends PersistenceExceptionTranslator {     //Provides a suitable connection for interacting with Redis.     RedisConnection getConnection();      //Provides a suitable connection for interacting with Redis Cluster.     RedisClusterConnection getClusterConnection();      //Provides a suitable connection for interacting with Redis Sentinel.     RedisSentinelConnection getSentinelConnection(); }  public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {     private @Nullable Pool<Jedis> pool;     ...      public RedisConnection getConnection() {         if (isRedisClusterAware()) {             return getClusterConnection();         }          Jedis jedis = fetchJedisConnector();         JedisConnection connection = (getUsePool() ?              new JedisConnection(jedis, pool, getDatabase(), getClientName())             : new JedisConnection(jedis, null, getDatabase(), getClientName()));         connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);         return postProcessConnection(connection);     }      protected Jedis fetchJedisConnector() {         try {             if (getUsePool() && pool != null) {                 return pool.getResource();             }             Jedis jedis = createJedis();             jedis.connect();             potentiallySetClientName(jedis);             return jedis;         } catch (Exception ex) {             throw new RedisConnectionFailureException("Cannot get Jedis connection", ex);         }     }     ... }

(2)Jedis的Pool对象是基于Apache Commons的通用对象池的

public abstract class Pool<T> implements Closeable {     //internalPool就是Apache Commons的通用对象池     protected GenericObjectPool<T> internalPool;      public T getResource() {         try {             return internalPool.borrowObject();         } catch (NoSuchElementException nse) {             ...         }     } }  //假设使用ShardedJedisPool去连接多个Redis节点组成的集群,去通过getResource()方法获取连接对象 public class ShardedJedisPool extends Pool<ShardedJedis> {     ...     @Override     public ShardedJedis getResource() {         ShardedJedis jedis = super.getResource();         jedis.setDataSource(this);         return jedis;     }      //激活方法为空     @Override     public void activateObject(PooledObject<ShardedJedis> p) throws Exception {          }     ... }

(3)Apache Commons的通用对象池的borrowObject()方法

public class GenericObjectPool<T> extends BaseGenericObjectPool<T> implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> {     //存放空闲对象的队列     private final LinkedBlockingDeque<PooledObject<T>> idleObjects;     ...      @Override     public T borrowObject() throws Exception {         return borrowObject(getMaxWaitMillis());     }      public T borrowObject(final long borrowMaxWaitMillis) throws Exception {         assertOpen();         final AbandonedConfig ac = this.abandonedConfig;         if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) && (getNumActive() > getMaxTotal() - 3) ) {             removeAbandoned(ac);         }         PooledObject<T> p = null;          //如果连接耗尽是否需要阻塞         final boolean blockWhenExhausted = getBlockWhenExhausted();          boolean create;         final long waitTime = System.currentTimeMillis();          while (p == null) {             create = false;             //从队列中获取对象             p = idleObjects.pollFirst();             if (p == null) {                 //创建对象                 p = create();                 if (p != null) {                     create = true;                 }             }             ...         }         updateStatsBorrow(p, System.currentTimeMillis() - waitTime);         return p.getObject();     }      //创建对象     private PooledObject<T> create() throws Exception {         ...         final PooledObject<T> p;         try {             //调用Factory的makeObject()方法创建对象             p = factory.makeObject();             if (getTestOnCreate() && !factory.validateObject(p)) {                 createCount.decrementAndGet();                 return null;             }         } catch (final Throwable e) {             createCount.decrementAndGet();             throw e;         } finally {             synchronized (makeObjectCountLock) {                 makeObjectCount--;                 makeObjectCountLock.notifyAll();             }         }         ...         return p;     }      @Override     public void addObject() throws Exception {         assertOpen();         if (factory == null) {             throw new IllegalStateException("Cannot add objects without a factory.");         }         final PooledObject<T> p = create();         addIdleObject(p);     }      private void addIdleObject(final PooledObject<T> p) throws Exception {         if (p != null) {             factory.passivateObject(p);             if (getLifo()) {                 idleObjects.addFirst(p);             } else {                 idleObjects.addLast(p);             }         }     }     ... }

(4)ShardedJedisFactory中创建Jedis连接对象的具体方法

一个连接会封装成一个DefaultPooledObject对象。

public class ShardedJedisPool extends Pool<ShardedJedis> {     ...     private static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> {         private List<JedisShardInfo> shards;         ...          //创建Jedis连接对象的具体方法         @Override         public PooledObject<ShardedJedis> makeObject() throws Exception {             //创建ShardedJedis对象             ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);             return new DefaultPooledObject<ShardedJedis>(jedis);         }         ...     }     ... }

(5)Redis分片对象ShardedJedis的创建

一个ShardedJedis分片可以理解成对应一个集群,一个ShardedJedis中会有各个Redis节点的连接。即一个ShardedJedis中,会连接Redis的各个节点,每个ShardedJedis分片都有多个虚拟节点。

public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, Closeable {     ...     public ShardedJedis(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {         super(shards, algo, keyTagPattern);     }     ... }  public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo> implements BinaryJedisCommands {     ...     public BinaryShardedJedis(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {         super(shards, algo, keyTagPattern);     }     ... }  public class Sharded<R, S extends ShardInfo<R>> {     public static final int DEFAULT_WEIGHT = 1;     private TreeMap<Long, S> nodes;     private final Hashing algo;     private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<ShardInfo<R>, R>();     ...      public Sharded(List<S> shards, Hashing algo, Pattern tagPattern) {         this.algo = algo;         this.tagPattern = tagPattern;         initialize(shards);     }      private void initialize(List<S> shards) {         nodes = new TreeMap<Long, S>();         //每个分片都有多个虚拟节点,存放在nodes中         for (int i = 0; i != shards.size(); ++i) {             final S shardInfo = shards.get(i);             int N =  160 * shardInfo.getWeight();             if (shardInfo.getName() == null) {                 for (int n = 0; n < N; n++) {                     nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);                 }             } else {                 for (int n = 0; n < N; n++) {                     nodes.put(this.algo.hash(shardInfo.getName() + "*" + n), shardInfo);                 }             }             resources.put(shardInfo, shardInfo.createResource());         }     }     ... }  public class JedisShardInfo extends ShardInfo<Jedis> {     ...     @Override     public Jedis createResource() {         //封装一个Jedis对象         return new Jedis(this);     }     ... }

(6)ShardedJedisPool的初始化

public class ShardedJedisPool extends Pool<ShardedJedis> {     ...     public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {         super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern));     }     ... }  public abstract class Pool<T> implements Closeable {     protected GenericObjectPool<T> internalPool;     ...      public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {         initPool(poolConfig, factory);     }      public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {         if (this.internalPool != null) {             try {                 closeInternalPool();             } catch (Exception e) {             }         }         this.internalPool = new GenericObjectPool<>(factory, poolConfig);     }     ... }  public class GenericObjectPool<T> extends BaseGenericObjectPool<T> implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> {     //存放空闲对象的队列     private final LinkedBlockingDeque<PooledObject<T>> idleObjects;     ...      public GenericObjectPool(final PooledObjectFactory<T> factory, final GenericObjectPoolConfig<T> config) {         super(config, ONAME_BASE, config.getJmxNamePrefix());         if (factory == null) {             jmxUnregister();//tidy up             throw new IllegalArgumentException("factory may not be null");         }         this.factory = factory;         //空闲对象         idleObjects = new LinkedBlockingDeque<>(config.getFairness());         setConfig(config);     }      public void setConfig(final GenericObjectPoolConfig<T> conf) {         super.setConfig(conf);         setMaxIdle(conf.getMaxIdle());//设置最大空闲数         setMinIdle(conf.getMinIdle());//设置最小空闲数         setMaxTotal(conf.getMaxTotal());     }     ... }

(7)ShardedJedisPool的一致性Hash算法

下面使用ShardedJedis的set()方法为例进行说明,这里的Redis集群在客户端使用一致性Hash进行分片,与Redis Cluster这种服务端集群是不同的。

//假设使用ShardedJedisPool去连接多个Redis节点组成的集群,去通过getResource()方法获取连接对象 public class ShardedJedisPool extends Pool<ShardedJedis> {     ...     @Override     public ShardedJedis getResource() {         ShardedJedis jedis = super.getResource();         jedis.setDataSource(this);         return jedis;     }     ... }  public class ShardedJedisPool extends Pool<ShardedJedis> {     ...     @Override     public ShardedJedis getResource() {         ShardedJedis jedis = super.getResource();         jedis.setDataSource(this);         return jedis;     }     ... }  public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, Closeable {     ...     protected ShardedJedisPool dataSource = null;      @Override     public String set(final String key, final String value) {         Jedis j = getShard(key);//根据key获取分片         return j.set(key, value);     }     ... }  public class Sharded<R, S extends ShardInfo<R>> {     private TreeMap<Long, S> nodes;     private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<ShardInfo<R>, R>();     ...      //根据key获取连接对象     public R getShard(String key) {         return resources.get(getShardInfo(key));     }      public S getShardInfo(String key) {         return getShardInfo(SafeEncoder.encode(getKeyTag(key)));     }      public S getShardInfo(byte[] key) {         SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));         if (tail.isEmpty()) {             return nodes.get(nodes.firstKey());         }         return tail.get(tail.firstKey());     }     ... }

 

3.商品卖家系统高并发场景与缓存预热逻辑

(1)缓存预热的必要性

(2)缓存预热方式之单机和分布式的区别

(3)缓存预热的逻辑

 

(1)缓存预热的必要性

查询卖家信息的接口会被C端⽤户频繁请求。因为C端⽤户在使⽤商品系统时,会产⽣⼤量的卖家数据请求。如果这些请求直接查询卖家数据库,则会对卖家数据库产⽣⾮常⼤的压⼒。所以查询卖家信息的接口,需要针对卖家数据进行缓存。同时由于请求量⽐较⼤,所以不能通过⽤户发起请求来进行惰性缓存,因此需要进行卖家数据的缓存预热。

 

因为卖家数据是⼀组相对固定、变化⼩的数据,所以在对外提供服务前,可先批量从DB获取数据,再设置到Redis缓存中。

商品中心—13.商品卖家系统的高并发文档

查询卖家信息的接口如下:

@DubboService(version = "1.0.0", interfaceClass = SellerAbilityApi.class, retries = 0) public class SellerAbilityApiImpl implements SellerAbilityApi {     @Autowired     private SellerInfoService sellerInfoService;     ...      //提供给商品C端调用,根据卖家ID和卖家类型获取卖家信息     @Override     public JsonResult<List<SellerInfoResponse>> getSellerInfo(SellerInfoRequest sellerInfoRequest) {         try {             List<SellerInfoResponse> sellerInfoResponseList = sellerInfoService.querySellerInfoForRPC(sellerInfoRequest);             return JsonResult.buildSuccess(sellerInfoResponseList);         } catch (ProductBizException e) {             log.error("biz error: request={}", JSON.toJSONString(sellerInfoRequest), e);             return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());         } catch (Exception e) {             log.error("system error: request={}", JSON.toJSONString(sellerInfoRequest), e);             return JsonResult.buildError(e.getMessage());         }     }     ... }  @Service public class SellerInfoServiceImpl implements SellerInfoService {     @Autowired     private SellerInfoCache sellerInfoCache;     ...      //不同系统间调用RPC接口,查询卖家     //卖家系统提供给外部系统调用的卖家查询接口,只允许通过sellerIdList或者sellerType两个参数进行查询     @Override     public List<SellerInfoResponse> querySellerInfoForRPC(SellerInfoRequest request) {         //参数校验,RPC接口根据sellerIdList和sellerType查询         checkQuerySellerInfoRequestByRPC(request);         //如果未传入sellerIdList         if (CollectionUtils.isEmpty(request.getSellerIdList())) {             //根据sellerType获取该类型下的sellerId集合             Optional<List<Long>> sellerIdListOps = getSellerIdListBySellerType(request);             //如果类型下没有sellerId数据,直接返回空卖家集合             if (!sellerIdListOps.isPresent()) {                 return Collections.emptyList();             }             //将根据sellerType查询到的sellerIdList数据set到request中             request.setSellerIdList(sellerIdListOps.get());         }          //根据sellerIdList从缓存或者数据库中查询卖家信息         Optional<List<SellerInfoResponse>> sellerInfoListOps =              sellerInfoCache.listRedisStringDataByCache(                 request.getSellerIdList(),                 SellerInfoResponse.class,                 sellerId -> SellerRedisKeyConstants.SELLER_INFO_LIST + sellerId,                 //根据sellerId从DB中获取SellerInfo数据                 sellerId -> sellerRepository.querySellerInfoBySellerId(sellerId)             );         if (!sellerInfoListOps.isPresent()) {             return Collections.emptyList();         }         //过滤类型不一致的卖家信息         return filterAccordantSellerInfo(sellerInfoListOps.get(), request);     }     ... }

(2)缓存预热方式之单机和分布式的区别

卖家系统提供了两个接口可手动触发单机的缓存预热 + 分布式的缓存预热。无论是单机还是分布式的缓存预热,都会查出全量的卖家数据来进行预热。

 

如果已预热过,可通过设置force=true来强制再进行全量卖家数据的预热。由于预热完成后,卖家系统可能还会对卖家数据进行变更,所以每隔5分钟执行一个定时任务查询DB里的数据,diff缓存数据并刷新。

 

第一次预热可使用单机的缓存预热接口、也可使用分布式的缓存预热接口。

 

一.单机的意思,就是由卖家系统的一台机器进行全量卖家数据的预热操作。

 

二.分布式的意思,就是收到分布式预热请求的一台机器,会先去DB查询出一批批的卖家数据,然后发送到MQ。接着由卖家系统的多台机器去消费这些卖家数据,从而实现分布式预热。

 

(3)缓存预热的逻辑

预热逻辑,在业务上并不复杂。主要是在尚未对外提供服务前,把所有的卖家数据,设置到Redis缓存。在对外提供服务后,能做到绝大部分的卖家信息请求,都能直接通过缓存来提供数据服务,避免对MySQL造成⽐较⼤的压⼒。

商品中心—13.商品卖家系统的高并发文档

 

4.商品卖家系统缓存预热架构设计

(1)Redis缓存结构设计

(2)单机的缓存预热方案设计

(3)分布式的缓存预热⽅案设计

 

(1)Redis缓存结构设计

商品中心—13.商品卖家系统的高并发文档

(2)单机的缓存预热方案设计

步骤一:执⾏预热操作前,先添加分布式锁,保证执行的原⼦性。

 

步骤二:判断forceFlush是否强制刷新参数是否为1。如果为1,则代表强制刷新。如果为0,则代表不强制刷新缓存。

 

步骤三:forceFlush如果为0,则从Redis中获取缓存的预热状态。如果预热状态为成功,则不预热。

 

步骤四:通过sellerType卖家类型参数判断指定需要预热缓存的卖家类型。如果传⼊为0,代表预热所有卖家数据,否则代表预热的是sellerType卖家。根据sellerType,一页一页去DB查询出卖家数据,直到查出的数据为空。

 

步骤五:将每一页查出的100条卖家数据,构建成写入缓存的数据Map。接着将数据Map通过mset写入缓存,以任务的形式,提交给线程池处理。其中的线程池会通过Semaphore进行阻塞最多提交20个写入缓存的任务。

 

步骤六:数据处理完后,在Redis中设置卖家信息的缓存预热状态为成功。

 

步骤七:如果以上步骤出现异常,则在catch中设置缓存预热状态为失败。

 

步骤八:最后在finally块中释放分布式锁。

 

(3)分布式的缓存预热⽅案设计

一.⽣产者

步骤一:执⾏预热操作前,先添加分布式锁,保证执行的原⼦性。

 

步骤二:判断forceFlush是否强制刷新参数是否为1。如果为1,则代表强制刷新。如果为0,则代表不强制刷新缓存。

 

步骤三:forceFlush如果为0,则从Redis中获取缓存的预热状态。如果预热状态为成功,则不预热。

 

步骤四:通过sellerType卖家类型参数判断指定需要预热缓存的卖家类型。如果传⼊为0,代表预热所有卖家数据;否则代表预热的是sellerType卖家。然后通过分批次,查询出满⾜sellerType类型的所有卖家数据。每批查100个卖家,会将这100个卖家数据转换为JSON字符串。接着将JSON字符串添加到集合中,将最终的查询结果组装为List。

 

步骤五:将查询出的卖家数据集合List批量或分批次发送给MQ,对应的MQ的topic为:preheat_seller_cache_bucket_topic。

 

步骤六:消息发送成功后,在Redis中设置卖家信息的缓存预热状态为成功。

 

步骤七:如果以上步骤出现异常,则在catch中设置缓存预热状态为失败。

 

步骤八:最后在finally块中释放分布式锁。

 

二.消费者

步骤一:每个消费者每次消费⼀条消息。

 

步骤二:拿到消息后,解析出需要预热的缓存数据,即构造出100个元素的sellerInfoResultMap。其中key为Redis卖家信息缓存的key,value为卖家信息JSON字符串,然后通过执行mset将数据设置到Redis缓存。

 

步骤三:接着提取出sellerIdList集合,并根据消息中卖家信息的sellerType,构造出Redis卖家类型的⻚缓存的key,然后通过set命令设置到Redis。

 

5.商品卖家系统单机下的缓存预热的实现

(1)实现细节

(2)实现代码

 

(1)实现细节

一.一页一页地查询数据,然后构建写入缓存的数据的Map,最后生成任务提交给线程池进行处理。

 

二.如果在ThreadPoolExecutor中设置一个无限队列的Queue,那么可能会让该Queue不断增长从而撑满内存。如果提交的任务不需排队,而是提交一个任务就创建一个线程来执行,那么又可能会耗尽线程。

 

所以为了线程池安全,可以使用Semaphore信号量进行阻塞。如果超过了20个任务同时要运行,会通过Semaphore信号量阻塞提交任务,从而实现安全的线程池。

 

(2)实现代码

步骤一:执⾏预热操作前,先添加分布式锁,保证执行的原⼦性。

 

步骤二:判断forceFlush是否强制刷新参数是否为1。如果为1,则代表强制刷新。如果为0,则代表不强制刷新缓存。

 

步骤三:forceFlush如果为0,则从Redis中获取缓存的预热状态。如果预热状态为成功,则不预热。

 

步骤四:通过sellerType卖家类型参数判断指定需要预热缓存的卖家类型。如果传⼊为0,代表预热所有卖家数据;否则代表预热的是sellerType卖家。根据sellerType,一页一页去DB查询出卖家数据,直到查出的数据为空。

 

步骤五:将每一页查出的100条卖家数据,构建成写入缓存的数据Map。接着将数据Map通过mset写入缓存,以任务的形式,提交给线程池处理。其中的线程池会通过Semaphore进行阻塞最多提交20个写入缓存的任务。

 

步骤六:数据处理完后,在Redis中设置卖家信息的缓存预热状态为成功。

 

步骤七:如果以上步骤出现异常,则在catch中设置缓存预热状态为失败。

 

步骤八:最后在finally块中释放分布式锁。

@RestController @RequestMapping("/sellerCache") public class SellerCacheController {     @Autowired     private SellerInfoCache sellerInfoCache;      @PostMapping("/preheat")     public JsonResult<Boolean> preheatSellerCache(@RequestBody SellerInfoCacheRequest request) {         try {             Boolean result = sellerInfoCache.preheatSellerCache(request);             return JsonResult.buildSuccess(result);         } catch (BaseBizException baseBizException) {             log.error("biz error: request={}", JSON.toJSONString(request), baseBizException);             return JsonResult.buildError(baseBizException.getMessage());         } catch (Exception e) {             log.error("system error: request={}", JSON.toJSONString(request), e);             return JsonResult.buildError(e.getMessage());         }     }     ... }  @Service("sellerInfoCache") public class SellerInfoCache {     @Resource     private SellerRepository sellerRepository;      @Resource     private RedisCache redisCache;      @Resource     private RedisLock redisLock;      //缓存预热的线程池     @Autowired     @Qualifier("preheatCacheThreadPool")     private PreheatCacheThreadPool preheatCacheThreadPool;     ...      //根据卖家类型预热卖家缓存     public Boolean preheatSellerCache(SellerInfoCacheRequest cacheRequest) {         try {             //1.先获取一把分布式锁             redisLock.lock(SELLER_INFO_CACHE_PREHEAT_LOCK);              //2.判断是否需要强制刷新缓存,如果不需要强制刷新,则判断是否预热过             //如果需要强制刷新缓存,则不用判断是否预热过             if (!NumberUtils.INTEGER_ONE.equals(cacheRequest.getForceFlush())) {                 //如果已经预热过,则直接返回true                 if (isPreheated()) {                     return true;                 }             }             //3.根据卖家类型刷新缓存,如果没有选择类型,则默认两种都进行预热             if (NumberUtils.INTEGER_ZERO.equals(cacheRequest.getSellerType())) {                 //预热卖家信息                 preheatSellerInfoToCache(SellerTypeEnum.SELF.getCode());                 //预热卖家id列表信息                 preheatSellerIdCache(SellerTypeEnum.SELF.getCode());                 preheatSellerInfoToCache(SellerTypeEnum.POP.getCode());                 preheatSellerIdCache(SellerTypeEnum.POP.getCode());             } else {                 //4.如果选择了类型,就按照类型来预热                 preheatSellerInfoToCache(cacheRequest.getSellerType());                 preheatSellerIdCache(cacheRequest.getSellerType());             }             redisCache.set(SELLER_INFO_CACHE_PREHEAT_SUCCESS, SellerCacheStatusEnum.SUCCESS.getCode().toString(), -1);             return true;         } catch (Exception e) {             redisCache.set(SELLER_INFO_CACHE_PREHEAT_SUCCESS, SellerCacheStatusEnum.FAIL.getCode().toString(), -1);             throw new BaseBizException("缓存预热失败,error:{}", JSON.toJSONString(e));         } finally {             //释放锁             redisLock.unlock(SELLER_INFO_CACHE_PREHEAT_LOCK);         }     }      //判断缓存是否预热成功     private Boolean isPreheated() {         //如果预热key不存在,则说明没有预热过         Boolean isExist = redisCache.hasKey(SELLER_INFO_CACHE_PREHEAT_SUCCESS);         if (!isExist) {             return false;         }         //如果预热key存在,且值为1,则代表预热成功         Integer success = Integer.parseInt(redisCache.get(SELLER_INFO_CACHE_PREHEAT_SUCCESS));         if (!SellerCacheStatusEnum.SUCCESS.getCode().equals(success)) {             return false;         }         return true;     }      //根据用户类型预热缓存     public Boolean preheatSellerInfoToCache(Integer type) {         SellerInfoRequest request = new SellerInfoRequest();         request.setSellerType(type);         Integer pageNum = request.getPageNo();         //设置每页数据量         request.setPageSize(CollectionSize.DEFAULT);          //一页一页地去查询数据并交给线程池进行处理         while (true) {             Optional<List<SellerInfoResponse>> sellerInfoResponses = sellerRepository.querySellerInfo(request);             if (!sellerInfoResponses.isPresent()) {                 break;             }             List<SellerInfoResponse> sellerInfoResponseList = sellerInfoResponses.get();             //查到的数据为空时跳出循环             if (CollectionUtils.isEmpty(sellerInfoResponseList)) {                 break;             }              Map<String, String> result = new HashMap<>(sellerInfoResponseList.size());             //构建写入缓存的数据的map             for (SellerInfoResponse sellerInfoResponse : sellerInfoResponseList) {                 Long sellerId = sellerInfoResponse.getSellerId();                 result.put(SellerRedisKeyConstants.SELLER_INFO_LIST + sellerId, JSON.toJSONString(sellerInfoConverter.responseToDTO(sellerInfoResponse)));             }             log.info("本批次缓存map:{}", JSON.toJSONString(result));             //把本批次的缓存预热任务提交给线程池处理             preheatCacheThreadPool.execute(() -> redisCache.mset(result));              //继续取下一页的用户数据             request.setPageNo(++pageNum);             log.info("第" + pageNum + "页数据预热完成");         }         return true;     }      //根据类型预热用户id缓存     Boolean preheatSellerIdCache(Integer type) {         //根据类型全量查卖家ID         SellerInfoRequest request = new SellerInfoRequest();         Integer pageNo = request.getPageNo();         request.setSellerType(type);         request.setPageSize(CollectionSize.DEFAULT);         String key = SellerRedisKeyConstants.SELF_TYPE_LIST;         if (SellerTypeEnum.POP.getCode().equals(type)) {             key = SellerRedisKeyConstants.POP_TYPE_LIST;         }         while (true) {             Optional<List<Long>> optionalList = sellerRepository.pageSellerIdListByType(request);             if (!optionalList.isPresent()) {                 break;             }             List<Long> sellerIdList = optionalList.get();             //查到的数据为空时跳出循环             if (CollectionUtils.isEmpty(sellerIdList)) {                 break;             }             log.info("本批次缓存list:{}", JSON.toJSONString(sellerIdList));             //存Redis             String finalKey = key + pageNo;             preheatCacheThreadPool.execute(() -> redisCache.set(finalKey, JSON.toJSONString(sellerIdList), -1));             //继续取下一页的用户数据             request.setPageNo(++pageNo);         }         return true;     }     ... }  @Repository public class SellerRepository {     ...     //根据条件分页查询出卖家列表     public Optional<List<SellerInfoResponse>> querySellerInfo(SellerInfoRequest request) {         LambdaQueryWrapper<SellerInfoDO> queryWrapper = Wrappers.lambdaQuery();         //类型         queryWrapper.eq(Objects.nonNull(request.getSellerType()), SellerInfoDO::getSellerType, request.getSellerType());         //卖家位置(层级)         queryWrapper.eq(Objects.nonNull(request.getSellerPosition()), SellerInfoDO::getSellerPosition, request.getSellerPosition());         //状态         queryWrapper.eq(Objects.nonNull(request.getSellerStatus()), SellerInfoDO::getSellerStatus, request.getSellerStatus());         //卖家ID         queryWrapper.eq(Objects.nonNull(request.getSellerId()), SellerInfoDO::getSellerId, request.getSellerId());         //卖家编码         queryWrapper.eq(StringUtils.isNotEmpty(request.getSellerCode()), SellerInfoDO::getSellerCode, request.getSellerCode());         //卖家名称         queryWrapper.like(StringUtils.isNotEmpty(request.getSellerName()), SellerInfoDO::getSellerName, request.getSellerName());         //父卖家ID         queryWrapper.eq(Objects.nonNull(request.getParentId()), SellerInfoDO::getParentId, request.getParentId());         //卖家ID集合         queryWrapper.in(CollectionUtils.isNotEmpty(request.getSellerIdList()), SellerInfoDO::getSellerId, request.getSellerIdList());          Page<SellerInfoDO> page = new Page<>(request.getPageNo(), request.getPageSize());         Page<SellerInfoDO> pageResult = sellerInfoMapper.selectPage(page, queryWrapper);         if (Objects.isNull(pageResult)) {             return Optional.of(Collections.emptyList());         }         return Optional.of(sellerInfoConverter.listEntityToResponse(pageResult.getRecords()));     }     ... }  public class PreheatCacheThreadPool {     private final Semaphore semaphore;     private final ThreadPoolExecutor threadPoolExecutor;      public PreheatCacheThreadPool(int permits) {         //如果超过了20个任务同时要运行,会通过Semaphore信号量阻塞,从而实现安全的线程池         semaphore = new Semaphore(permits);          //这里设置核心线程数为0,是因为缓存预热的任务是一次性执行         //所以没有比要保留核心线程,等线程工作完毕,线程就全部退出即可          //如果不使用Semaphore,而在ThreadPoolExecutor中设置一个无限队列的Queue,         //那么可能会让该Queue不断增长从而撑满内存;         //如果提交的任务不需排队,而是提交一个任务就创建一个线程来执行,         //那么又可能会耗尽线程;         threadPoolExecutor = new ThreadPoolExecutor(             0,             permits * 2,//乘以2是因为semaphore的释放要比线程的完成早一点点             60,             TimeUnit.SECONDS,             new SynchronousQueue<>()         );     }      public void execute(Runnable task) {         //超过了20个同步任务就阻塞住,不让它执行太多,从而实现安全的线程池         semaphore.acquireUninterruptibly();          threadPoolExecutor.submit(() -> {             try {                 task.run();             } finally {                 //semaphore的释放要比当前线程任务的结束要早一点点                 //导致Semaphore放行很多任务进来了,但是线程池的线程还没释放                 semaphore.release();             }         });     } }

 

6.商品卖家系统分布式下的缓存预热的实现

(1)发送消息

(2)消费消息

 

(1)发送消息

步骤一:执⾏预热操作前,先添加分布式锁,保证执行的原⼦性。

 

步骤二:判断forceFlush是否强制刷新参数是否为1。如果为1,则代表强制刷新;如果为0,则代表不强制刷新缓存。

 

步骤三:forceFlush如果为0,则从Redis中获取缓存的预热状态。如果预热状态为成功,则不预热。

 

步骤四:通过sellerType卖家类型参数判断指定需要预热缓存的卖家类型。如果传⼊为0,代表预热所有卖家数据,否则代表预热的是sellerType卖家。然后通过分批次,查询出满⾜sellerType类型的所有卖家数据。每批查100个卖家,会将这100个卖家数据转换为JSON字符串。接着将JSON字符串添加到集合中,将最终的查询结果组装为List。

 

步骤五:将查询出的卖家数据集合List批量或分批次发送给MQ,对应的MQ的topic为:preheat_seller_cache_bucket_topic。

 

步骤六:消息发送成功后,在Redis中设置卖家信息的缓存预热状态为成功。

 

步骤七:如果以上步骤出现异常,则在catch中设置缓存预热状态为失败。

 

步骤八:最后在finally块中释放分布式锁。

@RestController @RequestMapping("/sellerCache") public class SellerCacheController {     @Autowired     private SellerInfoCache sellerInfoCache;          @PostMapping("shardingPreheat")     public JsonResult<Boolean> shardingPreheatSellerCache(@RequestBody SellerInfoCacheRequest request) {         try {             Boolean result = sellerInfoCache.shardingPreheatSellerCache(request);             return JsonResult.buildSuccess(result);         } catch (BaseBizException baseBizException) {             log.error("biz error: request={}", JSON.toJSONString(request), baseBizException);             return JsonResult.buildError(baseBizException.getMessage());         } catch (Exception e) {             log.error("system error: request={}", JSON.toJSONString(request), e);             return JsonResult.buildError(e.getMessage());         }     } }  @Service("sellerInfoCache") public class SellerInfoCache {     @Resource     private SellerRepository sellerRepository;      @Resource     private RedisCache redisCache;      @Resource     private RedisLock redisLock;      @Autowired     private DefaultProducer defaultProducer;     ...      //卖家缓存预热,分布式     public Boolean shardingPreheatSellerCache(SellerInfoCacheRequest request) {         try {             //检查参数             checkPreheatParam(request);             //获取一把分布式锁             redisLock.lock(SELLER_INFO_CACHE_PREHEAT_LOCK);             //如果需要强制刷新缓存,则不用判断是否预热过             if (!NumberUtils.INTEGER_ONE.equals(request.getForceFlush())) {                 //如果已经预热过,则直接返回true                 if (isPreheated()) {                     return true;                 }             }             //分批次查询出卖家数据             //sellerInfoList中的每个String都是100个卖家数据组册的一个batch对应的json数组             List<String> sellerInfoList = sellerRepository.querySellerInfoByPage(request.getSellerType());             //发送到MQ             //这些json数组对应的一条String消息会被defaultProducer.sendMessages()方法批量发送             //TODO 将sellerInfoList拆分成每10个为一批次进行分批发送             defaultProducer.sendMessages(PREHEAT_SELLER_CACHE_BUCKET_TOPIC, sellerInfoList, "卖家缓存预热消息");             redisCache.set(SELLER_INFO_CACHE_PREHEAT_SUCCESS, SellerCacheStatusEnum.SUCCESS.getCode().toString(), -1);             return true;         } catch (Exception e) {             redisCache.set(SELLER_INFO_CACHE_PREHEAT_SUCCESS, SellerCacheStatusEnum.FAIL.getCode().toString(), -1);             throw new BaseBizException("缓存预热失败,error:{}", JSON.toJSONString(e));         } finally {             //释放锁             redisLock.unlock(SELLER_INFO_CACHE_PREHEAT_LOCK);             return false;         }     }     ... }  @Component public class DefaultProducer {     private final TransactionMQProducer producer;     ...     //批量发送消息     public void sendMessages(String topic, List<String> messages, String type) {         sendMessages(topic, messages, -1, type);     }      //批量发送消息     public void sendMessages(String topic, List<String> messages, Integer delayTimeLevel, String type) {         List<Message> list = new ArrayList<>();         for (String message : messages) {             Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));             if (delayTimeLevel > 0) {                 msg.setDelayTimeLevel(delayTimeLevel);             }             list.add(msg);         }         try {             SendResult send = producer.send(list);             if (SendStatus.SEND_OK == send.getSendStatus()) {                 log.info("发送MQ消息成功, type:{}", type);             } else {                 throw new BaseBizException(send.getSendStatus().toString());             }         } catch (Exception e) {             log.error("发送MQ消息失败:", e);             throw new BaseBizException("消息发送失败");         }     }     ... }  @Repository public class SellerRepository {     ...     //分批次查询出卖家数据     //List中的每一个元素,是每一页的卖家数据集合JSON字符串     public List<String> querySellerInfoByPage(Integer sellerType) {         LambdaQueryWrapper<SellerInfoDO> queryWrapper = Wrappers.lambdaQuery();         queryWrapper.eq(SellerInfoDO::getDelFlag, YesOrNoEnum.YES.getCode());         queryWrapper.eq(SellerInfoDO::getSellerStatus, SellerInfoStatusEnum.OPEN_STATUS.getCode());         //如果指定预热全部卖家,则查询所有卖家数据         if (sellerType.equals(SellerTypeEnum.ALL.getCode())) {             return queryTotalSellerInfoByPage();         }         //查询匹配类型的卖家         return queryBatchSellerInfoByPage(sellerType);     }      //分批次查询出所有类型的卖家信息     private List<String> queryTotalSellerInfoByPage() {         List<String> selfSellerInfoList = queryBatchSellerInfoByPage(SellerTypeEnum.SELF.getCode());         List<String> popSellerInfoList = queryBatchSellerInfoByPage(SellerTypeEnum.POP.getCode());         List<String> resultList = new ArrayList<>(selfSellerInfoList);         resultList.addAll(popSellerInfoList);         return resultList;     }      //分批次查询出指定类型的卖家信息     private List<String> queryBatchSellerInfoByPage(Integer sellerType) {         List<String> batchList = new ArrayList<>();         SellerInfoRequest request = new SellerInfoRequest();         request.setSellerType(sellerType);         Integer pageNum = request.getPageNo();         //设置每页数据量         request.setPageSize(SkuSellerRelationConstants.QUERY_MAX_PAGE_SIZE);         while (true) {             //根据条件分页查询出卖家列表             Optional<List<SellerInfoResponse>> sellerInfoResponses = querySellerInfo(request);             if (!sellerInfoResponses.isPresent()) {                 break;             }             List<SellerInfoResponse> sellerInfoResponseList = sellerInfoResponses.get();             if (CollectionUtils.isEmpty(sellerInfoResponseList)) {                 break;             }             String batchResult = getBatchResult(sellerInfoResponseList, pageNum);             batchList.add(batchResult);             //继续取下一页的用户数据             request.setPageNo(++pageNum);         }         return batchList;     }      //根据条件分页查询出卖家列表     public Optional<List<SellerInfoResponse>> querySellerInfo(SellerInfoRequest request) {         LambdaQueryWrapper<SellerInfoDO> queryWrapper = Wrappers.lambdaQuery();         //类型         queryWrapper.eq(Objects.nonNull(request.getSellerType()), SellerInfoDO::getSellerType, request.getSellerType());         //卖家位置(层级)         queryWrapper.eq(Objects.nonNull(request.getSellerPosition()), SellerInfoDO::getSellerPosition, request.getSellerPosition());         //状态         queryWrapper.eq(Objects.nonNull(request.getSellerStatus()), SellerInfoDO::getSellerStatus, request.getSellerStatus());         //卖家ID         queryWrapper.eq(Objects.nonNull(request.getSellerId()), SellerInfoDO::getSellerId, request.getSellerId());         //卖家编码         queryWrapper.eq(StringUtils.isNotEmpty(request.getSellerCode()), SellerInfoDO::getSellerCode, request.getSellerCode());         //卖家名称         queryWrapper.like(StringUtils.isNotEmpty(request.getSellerName()), SellerInfoDO::getSellerName, request.getSellerName());         //父卖家ID         queryWrapper.eq(Objects.nonNull(request.getParentId()), SellerInfoDO::getParentId, request.getParentId());         //卖家ID集合         queryWrapper.in(CollectionUtils.isNotEmpty(request.getSellerIdList()), SellerInfoDO::getSellerId, request.getSellerIdList());          Page<SellerInfoDO> page = new Page<>(request.getPageNo(), request.getPageSize());         Page<SellerInfoDO> pageResult = sellerInfoMapper.selectPage(page, queryWrapper);         if (Objects.isNull(pageResult)) {             return Optional.of(Collections.emptyList());         }          return Optional.of(sellerInfoConverter.listEntityToResponse(pageResult.getRecords()));     }      //将每页的卖家数据转换成JSON     private String getBatchResult(List<SellerInfoResponse> sellerInfoResponseList, Integer pageNum) {         List<String> batchResult = new ArrayList<>(sellerInfoResponseList.size());         for (SellerInfoResponse sellerInfoResponse : sellerInfoResponseList) {             PreheatSellerMessage message = new PreheatSellerMessage();             message.setCachePageNo(pageNum);             message.setSellerInfo(sellerInfoResponse);             batchResult.add(JsonUtil.object2Json(message));         }         return JsonUtil.object2Json(batchResult);     }     ... }

(2)消费消息

通过多机器分布式消费MQ来实现分布式预热缓存。

 

步骤一:每个消费者每次消费⼀条消息。

 

步骤二:拿到消息后,解析出需要预热的缓存数据,即构造出100个元素的sellerInfoResultMap,其中key为Redis卖家信息缓存的key,value为卖家信息JSON字符串。然后通过执行mset将数据设置到Redis缓存。

 

步骤三:接着提取出sellerIdList集合,并根据消息中卖家信息的sellerType,构造出Redis卖家类型的⻚缓存的key,然后通过执行set设置到Redis缓存。

//缓存预热消费者 @Component public class SellerPreheatCacheListener implements MessageListenerConcurrently {     @Autowired     private RedisCache redisCache;      @Override     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {         //每个消费者每次消费分批次的多页数据,也就是List<String>,每个String对应了一页的数据         //每个String包含页码和该页下的100个SellerInfo集合         try {             for (MessageExt messageExt : list) {                 //获取到分批次的多页数据List                 String msg = new String(messageExt.getBody());                 List messageList = JsonUtil.json2Object(msg, List.class);                 if (CollectionUtils.isEmpty(messageList)) {                     throw new BaseBizException(ProductErrorCodeEnum.PARAM_CHECK_ERROR, ProductErrorCodeEnum.PARAM_CHECK_ERROR.getErrorCode());                 }                  Map<String, String> sellerInfoResultMap = new HashMap<>(messageList.size());                 List<Long> sellerIdList = new ArrayList<>(messageList.size());                 //List<String>中的每个String,都包含了100个卖家数据,这些卖家类型都是一样的                 for (Object message : messageList) {                     PreheatSellerMessage preheatSellerMessage = JsonUtil.json2Object(message.toString(), PreheatSellerMessage.class);                     if (Objects.isNull(preheatSellerMessage)) {                         throw new BaseBizException(ProductErrorCodeEnum.PARAM_CHECK_ERROR, ProductErrorCodeEnum.PARAM_CHECK_ERROR.getErrorCode());                     }                     //卖家信息                     SellerInfoResponse sellerInfo = preheatSellerMessage.getSellerInfo();                     //卖家信息的Key                     String sellerInfoKey = SellerRedisKeyConstants.SELLER_INFO_LIST + sellerInfo.getSellerId();                     //卖家信息集合                     sellerInfoResultMap.put(sellerInfoKey, JsonUtil.object2Json(sellerInfo));                     //卖家ID集合                     sellerIdList.add(sellerInfo.getSellerId());                 }                  PreheatSellerMessage preheatSellerMessage = JsonUtil.json2Object(messageList.get(0).toString(), PreheatSellerMessage.class);                 if (Objects.isNull(preheatSellerMessage)) {                     throw new BaseBizException(ProductErrorCodeEnum.PARAM_CHECK_ERROR, ProductErrorCodeEnum.PARAM_CHECK_ERROR.getErrorCode());                 }                  //卖家类型                 Integer sellerType = preheatSellerMessage.getSellerInfo().getSellerType();                 //卖家类型的key                 String sellerTypeKey = "";                 if (sellerType.equals(SellerTypeEnum.SELF.getCode())) {                     sellerTypeKey = SellerRedisKeyConstants.SELF_TYPE_LIST + preheatSellerMessage.getCachePageNo();                 } else {                     sellerTypeKey = SellerRedisKeyConstants.POP_TYPE_LIST + preheatSellerMessage.getCachePageNo();                 }                  //将卖家信息set到缓存                 redisCache.mset(sellerInfoResultMap);                 //将卖家类型ID集合set到缓存                 redisCache.set(sellerTypeKey, JsonUtil.object2Json(sellerIdList), -1);             }         } catch (Exception e) {             log.error("consume error, 缓存预热消息消费失败", e);             //本次消费失败,下次重新消费             return ConsumeConcurrentlyStatus.RECONSUME_LATER;         }         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;     } }

 

7.商品卖家系统定时查询DB最新数据更新缓存

//卖家缓存数据同步 @Component public class SellerCacheSyncSchedule {     @Autowired     private RedisCache redisCache;      @Autowired     private SellerRepository sellerRepository;      //卖家缓存数据同步定时任务     //5分钟执行一次,每次查询出数据库最近5分钟的更新和新增数据,然后同步差异数据到缓存     @XxlJob("cacheSync")     public void cacheSync() {         //计算出前五分钟的时间         Date beforeTIme = getBeforeTime();         //从数据库中查询出最近5分钟内的数据         List<SellerInfoDO> sellerInfoList = sellerRepository.querySellerInfoFiveMinute(beforeTIme);         //过滤出数据库和缓存中的差异数据,并将差异数据保存到缓存         saveDiffSellerInfo(sellerInfoList);     }      //比较数据库和缓存中的差异数据,过滤出差异数据,并将差异数据保存到缓存     private void saveDiffSellerInfo(List<SellerInfoDO> sellerInfoList) {         sellerInfoList.stream().peek(sellerInfo -> {             Long sellerId = sellerInfo.getSellerId();             String redisKey = SellerRedisKeyConstants.SELLER_INFO_LIST + sellerId;             //不存在说明是差异数据             if (!redisCache.hasKey(redisKey)) {                 redisCache.set(redisKey, JsonUtil.object2Json(sellerInfo), -1);             }             //TODO 存在比较差异         }).count();     }      //获取5分钟之前的时间     private Date getBeforeTime() {         Calendar calendar = Calendar.getInstance();         calendar.add(Calendar.MINUTE, -5);         return calendar.getTime();     } }  @Repository public class SellerRepository {     ...     //查询出前5分钟之内的数据     public List<SellerInfoDO> querySellerInfoFiveMinute(Date beforeTIme) {         List<SellerInfoDO> batchList = new ArrayList<>();         int pageNum = 1;         //设置每页数据量         int pageSize = ProductConstants.QUERY_ITEM_MAX_COUNT;         while (true) {             LambdaQueryWrapper<SellerInfoDO> queryWrapper = Wrappers.lambdaQuery();             queryWrapper.ge(SellerInfoDO::getCreateTime, beforeTIme);             queryWrapper.ge(SellerInfoDO::getUpdateTime, beforeTIme);              Page<SellerInfoDO> page = new Page<>(pageNum, pageSize);             Page<SellerInfoDO> pageResult = sellerInfoMapper.selectPage(page, queryWrapper);             if (Objects.isNull(pageResult) || pageResult.getRecords().size() <= 0) {                 break;             }              batchList.addAll(pageResult.getRecords());             //进入下一页             pageNum++;         }         return batchList;     }     ... }

 

8.商品中心高并发架构总结

一.高并发C端系统(限流 + 过滤 + 实时缓存同步)

二.高并发卖家系统(缓存预热 + 定时缓存同步)

三.高并发库存系统(缓存分桶)

 

发表评论

评论已关闭。

相关文章

当前内容话题
  • 0