大纲
1.阿里云Tair接入与Jedis连接池使用
2.Redis集群连接池与通用对象连接池源码
3.商品卖家系统高并发场景与缓存预热逻辑
4.商品卖家系统缓存预热架构设计
5.商品卖家系统单机下的缓存预热的实现
6.商品卖家系统分布式下的缓存预热的实现
7.商品卖家系统定时查询DB最新数据更新缓存
8.商品中心高并发架构总结
1.阿里云Tair接入与Jedis连接池使用
(1)引入依赖
(2)添加配置文件
(3)配置Tair相关Bean
(4)使用Tair相关命令
(1)引入依赖
Tair是多线程的,Redis是单线程的。TairJedis是阿⾥云基于Jedis开发的Redis企业版专⽤客户端。TairJedis除了Jedis原有功能,还⽀持Redis企业版包含的命令。
(1)引入依赖
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>com.aliyun.tair</groupId> <artifactId>alibabacloud-tairjedis-sdk</artifactId> <version>2.1.0</version> </dependency>
(2)添加配置文件
spring: tair: host: r-xxxxxxxxxxxxxxx.redis.rds.aliyuncs.com port: 6379 password: password timeout: 3000 maxIdle: 10000 maxTotal: 10000
(3)配置Tair相关Bean
@Data @Configuration public class TairConfig { @Value("${spring.tair.maxIdle:200}") private int maxIdle; @Value("${spring.tair.maxTotal:300}") private int maxTotal; @Value("${spring.tair.host}") private String host; @Value("${spring.tair.port:6379}") private int port; @Value("${spring.tair.password}") private String password; @Value("${spring.tair.timeout:3000}") private int timeout; @Bean @ConditionalOnClass(JedisPool.class) public JedisPool jedisPool() { JedisPoolConfig config = new JedisPoolConfig(); //最大空闲连接数,需自行评估,不超过Redis实例的最大连接数 config.setMaxIdle(maxIdle); //最大连接数,需自行评估,不超过Redis实例的最大连接数 config.setMaxTotal(maxTotal); config.setTestOnBorrow(false); config.setTestOnReturn(false); return new JedisPool(config, host, port, timeout, password); } @Bean @ConditionalOnClass(TairCache.class) public TairCache tairCache(JedisPool jedisPool) { return new TairCache(jedisPool); } }
(4)使用Tair相关命令
@Component public class TairCache { private JedisPool jedisPool; public TairCache(JedisPool jedisPool) { this.jedisPool = jedisPool; } public Jedis getJedis() { return jedisPool.getResource(); } public TairString createTairString(Jedis jedis) { return new TairString(jedis); } private TairHash createTairHash(Jedis jedis) { return new TairHash(jedis); } //缓存存储 public void set(String key, String value, int seconds) { log.info("tairString set key:{}, value:{}, seconds:{}", key, value, seconds); try (Jedis jedis = getJedis()) { TairString tairString = createTairString(jedis); String result; if (seconds > 0) { result = tairString.exset(key, value, new ExsetParams().ex(seconds)); } else { result = tairString.exset(key, value); } log.info("tairString set key:{}, value:{}, seconds:{}", key, value, seconds); } } //tairString存储 public boolean exset(String key, String value, int seconds) { try (Jedis jedis = getJedis()) { TairString tairString = createTairString(jedis); String result; if (seconds > 0) { result = tairString.exset(key, value, new ExsetParams().ex(seconds)); } else { result = tairString.exset(key, value); } return "OK".equals(result); } } //缓存获取 public String get(String key) { try (Jedis jedis = getJedis()) { ExgetResult<String> exget = createTairString(jedis).exget(key); if (exget == null) { return null; } return exget.getValue(); } catch (Exception e) { log.error("tairString get error,key{}", key, e); } return null; } //缓存自增 public Integer incr(String key) { return this.exincrby(key, 1); } //缓存自增 public Integer incr(String key, Integer incrNum) { return this.exincrby(key, incrNum); } //缓存自减 public Integer decr(String key, Integer decrNum) { return this.exincrby(key, -Math.abs(decrNum)); } //删除缓存 public Integer delete(String key) { try (Jedis jedis = getJedis()) { return jedis.del(key).intValue(); } } //删除缓存 public Integer mdelete(List<String> keyList) { try (Jedis jedis = getJedis()) { return jedis.del(keyList.toArray(new String[keyList.size()])).intValue(); } } //缓存批量获取 public List mget(List<String> keyList) { try (Jedis jedis = getJedis()) { TairString tairString = createTairString(jedis); return keyList.stream().map(key -> { ExgetResult<String> exget = tairString.exget(key); if (exget != null) { return exget.getValue(); } return null; }).collect(Collectors.toList()); } } //对TairString的value进行自增自减操作 public Integer exincrby(String key, Integer incrNum) { try (Jedis jedis = getJedis()) { int value = createTairString(jedis).exincrBy(key, incrNum, ExincrbyParams.ExincrbyParams().min(0)).intValue(); return value; } catch (Exception e) { //出现自增或者自减数据溢出等异常,直接返回操作失败 return -1; } } //存储hash对象 public Integer exhset(String key, String value) { try (Jedis jedis = getJedis()) { Map<String, Object> innerMap = JSON.parseObject(value).getInnerMap(); Map<String, String> map = Maps.transformEntries(innerMap, (k, v) -> String.valueOf(v)); String exhmset = createTairHash(jedis).exhmset(key, map); //成功返回 OK return 0; } } //存储hash对象 public Integer exhset(String key, String field, String value) { try (Jedis jedis = getJedis()) { return createTairHash(jedis).exhset(key, field, value, ExhsetParams.ExhsetParams().nx()).intValue(); } } //获取hash对象 public String exhget(String key, String field) { try (Jedis jedis = getJedis()) { String exhget = createTairHash(jedis).exhget(key, field); log.info("exhget key:{}, field:{}, value:{}", key, field, exhget); return exhget; } } }
2.Redis集群连接池与通用对象连接池源码
(1)Spring的RedisConnectionFactory和实现JedisConnectionFactory
(2)Jedis的Pool对象是基于Apache Commons的通用对象池的
(3)Apache Commons的通用对象池的borrowObject()方法
(4)ShardedJedisFactory中创建Jedis连接对象的具体方法
(5)Redis分片对象ShardedJedis的创建
(6)ShardedJedisPool的初始化
这里的Redis集群在客户端使用一致性Hash进行分片,与Redis Cluster这种服务端集群是不同的。
(1)Spring的RedisConnectionFactory和实现JedisConnectionFactory
@Data @Configuration @ConditionalOnClass(RedisConnectionFactory.class) public class RedisConfig { ... @Bean @ConditionalOnClass(RedisConnectionFactory.class) public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); redisTemplate.setDefaultSerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } ... } public interface RedisConnectionFactory extends PersistenceExceptionTranslator { //Provides a suitable connection for interacting with Redis. RedisConnection getConnection(); //Provides a suitable connection for interacting with Redis Cluster. RedisClusterConnection getClusterConnection(); //Provides a suitable connection for interacting with Redis Sentinel. RedisSentinelConnection getSentinelConnection(); } public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory { private @Nullable Pool<Jedis> pool; ... public RedisConnection getConnection() { if (isRedisClusterAware()) { return getClusterConnection(); } Jedis jedis = fetchJedisConnector(); JedisConnection connection = (getUsePool() ? new JedisConnection(jedis, pool, getDatabase(), getClientName()) : new JedisConnection(jedis, null, getDatabase(), getClientName())); connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults); return postProcessConnection(connection); } protected Jedis fetchJedisConnector() { try { if (getUsePool() && pool != null) { return pool.getResource(); } Jedis jedis = createJedis(); jedis.connect(); potentiallySetClientName(jedis); return jedis; } catch (Exception ex) { throw new RedisConnectionFailureException("Cannot get Jedis connection", ex); } } ... }
(2)Jedis的Pool对象是基于Apache Commons的通用对象池的
public abstract class Pool<T> implements Closeable { //internalPool就是Apache Commons的通用对象池 protected GenericObjectPool<T> internalPool; public T getResource() { try { return internalPool.borrowObject(); } catch (NoSuchElementException nse) { ... } } } //假设使用ShardedJedisPool去连接多个Redis节点组成的集群,去通过getResource()方法获取连接对象 public class ShardedJedisPool extends Pool<ShardedJedis> { ... @Override public ShardedJedis getResource() { ShardedJedis jedis = super.getResource(); jedis.setDataSource(this); return jedis; } //激活方法为空 @Override public void activateObject(PooledObject<ShardedJedis> p) throws Exception { } ... }
(3)Apache Commons的通用对象池的borrowObject()方法
public class GenericObjectPool<T> extends BaseGenericObjectPool<T> implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> { //存放空闲对象的队列 private final LinkedBlockingDeque<PooledObject<T>> idleObjects; ... @Override public T borrowObject() throws Exception { return borrowObject(getMaxWaitMillis()); } public T borrowObject(final long borrowMaxWaitMillis) throws Exception { assertOpen(); final AbandonedConfig ac = this.abandonedConfig; if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) && (getNumActive() > getMaxTotal() - 3) ) { removeAbandoned(ac); } PooledObject<T> p = null; //如果连接耗尽是否需要阻塞 final boolean blockWhenExhausted = getBlockWhenExhausted(); boolean create; final long waitTime = System.currentTimeMillis(); while (p == null) { create = false; //从队列中获取对象 p = idleObjects.pollFirst(); if (p == null) { //创建对象 p = create(); if (p != null) { create = true; } } ... } updateStatsBorrow(p, System.currentTimeMillis() - waitTime); return p.getObject(); } //创建对象 private PooledObject<T> create() throws Exception { ... final PooledObject<T> p; try { //调用Factory的makeObject()方法创建对象 p = factory.makeObject(); if (getTestOnCreate() && !factory.validateObject(p)) { createCount.decrementAndGet(); return null; } } catch (final Throwable e) { createCount.decrementAndGet(); throw e; } finally { synchronized (makeObjectCountLock) { makeObjectCount--; makeObjectCountLock.notifyAll(); } } ... return p; } @Override public void addObject() throws Exception { assertOpen(); if (factory == null) { throw new IllegalStateException("Cannot add objects without a factory."); } final PooledObject<T> p = create(); addIdleObject(p); } private void addIdleObject(final PooledObject<T> p) throws Exception { if (p != null) { factory.passivateObject(p); if (getLifo()) { idleObjects.addFirst(p); } else { idleObjects.addLast(p); } } } ... }
(4)ShardedJedisFactory中创建Jedis连接对象的具体方法
一个连接会封装成一个DefaultPooledObject对象。
public class ShardedJedisPool extends Pool<ShardedJedis> { ... private static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> { private List<JedisShardInfo> shards; ... //创建Jedis连接对象的具体方法 @Override public PooledObject<ShardedJedis> makeObject() throws Exception { //创建ShardedJedis对象 ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern); return new DefaultPooledObject<ShardedJedis>(jedis); } ... } ... }
(5)Redis分片对象ShardedJedis的创建
一个ShardedJedis分片可以理解成对应一个集群,一个ShardedJedis中会有各个Redis节点的连接。即一个ShardedJedis中,会连接Redis的各个节点,每个ShardedJedis分片都有多个虚拟节点。
public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, Closeable { ... public ShardedJedis(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) { super(shards, algo, keyTagPattern); } ... } public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo> implements BinaryJedisCommands { ... public BinaryShardedJedis(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) { super(shards, algo, keyTagPattern); } ... } public class Sharded<R, S extends ShardInfo<R>> { public static final int DEFAULT_WEIGHT = 1; private TreeMap<Long, S> nodes; private final Hashing algo; private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<ShardInfo<R>, R>(); ... public Sharded(List<S> shards, Hashing algo, Pattern tagPattern) { this.algo = algo; this.tagPattern = tagPattern; initialize(shards); } private void initialize(List<S> shards) { nodes = new TreeMap<Long, S>(); //每个分片都有多个虚拟节点,存放在nodes中 for (int i = 0; i != shards.size(); ++i) { final S shardInfo = shards.get(i); int N = 160 * shardInfo.getWeight(); if (shardInfo.getName() == null) { for (int n = 0; n < N; n++) { nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo); } } else { for (int n = 0; n < N; n++) { nodes.put(this.algo.hash(shardInfo.getName() + "*" + n), shardInfo); } } resources.put(shardInfo, shardInfo.createResource()); } } ... } public class JedisShardInfo extends ShardInfo<Jedis> { ... @Override public Jedis createResource() { //封装一个Jedis对象 return new Jedis(this); } ... }
(6)ShardedJedisPool的初始化
public class ShardedJedisPool extends Pool<ShardedJedis> { ... public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) { super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern)); } ... } public abstract class Pool<T> implements Closeable { protected GenericObjectPool<T> internalPool; ... public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) { initPool(poolConfig, factory); } public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) { if (this.internalPool != null) { try { closeInternalPool(); } catch (Exception e) { } } this.internalPool = new GenericObjectPool<>(factory, poolConfig); } ... } public class GenericObjectPool<T> extends BaseGenericObjectPool<T> implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> { //存放空闲对象的队列 private final LinkedBlockingDeque<PooledObject<T>> idleObjects; ... public GenericObjectPool(final PooledObjectFactory<T> factory, final GenericObjectPoolConfig<T> config) { super(config, ONAME_BASE, config.getJmxNamePrefix()); if (factory == null) { jmxUnregister();//tidy up throw new IllegalArgumentException("factory may not be null"); } this.factory = factory; //空闲对象 idleObjects = new LinkedBlockingDeque<>(config.getFairness()); setConfig(config); } public void setConfig(final GenericObjectPoolConfig<T> conf) { super.setConfig(conf); setMaxIdle(conf.getMaxIdle());//设置最大空闲数 setMinIdle(conf.getMinIdle());//设置最小空闲数 setMaxTotal(conf.getMaxTotal()); } ... }
(7)ShardedJedisPool的一致性Hash算法
下面使用ShardedJedis的set()方法为例进行说明,这里的Redis集群在客户端使用一致性Hash进行分片,与Redis Cluster这种服务端集群是不同的。
//假设使用ShardedJedisPool去连接多个Redis节点组成的集群,去通过getResource()方法获取连接对象 public class ShardedJedisPool extends Pool<ShardedJedis> { ... @Override public ShardedJedis getResource() { ShardedJedis jedis = super.getResource(); jedis.setDataSource(this); return jedis; } ... } public class ShardedJedisPool extends Pool<ShardedJedis> { ... @Override public ShardedJedis getResource() { ShardedJedis jedis = super.getResource(); jedis.setDataSource(this); return jedis; } ... } public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, Closeable { ... protected ShardedJedisPool dataSource = null; @Override public String set(final String key, final String value) { Jedis j = getShard(key);//根据key获取分片 return j.set(key, value); } ... } public class Sharded<R, S extends ShardInfo<R>> { private TreeMap<Long, S> nodes; private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<ShardInfo<R>, R>(); ... //根据key获取连接对象 public R getShard(String key) { return resources.get(getShardInfo(key)); } public S getShardInfo(String key) { return getShardInfo(SafeEncoder.encode(getKeyTag(key))); } public S getShardInfo(byte[] key) { SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key)); if (tail.isEmpty()) { return nodes.get(nodes.firstKey()); } return tail.get(tail.firstKey()); } ... }
3.商品卖家系统高并发场景与缓存预热逻辑
(1)缓存预热的必要性
(2)缓存预热方式之单机和分布式的区别
(3)缓存预热的逻辑
(1)缓存预热的必要性
查询卖家信息的接口会被C端⽤户频繁请求。因为C端⽤户在使⽤商品系统时,会产⽣⼤量的卖家数据请求。如果这些请求直接查询卖家数据库,则会对卖家数据库产⽣⾮常⼤的压⼒。所以查询卖家信息的接口,需要针对卖家数据进行缓存。同时由于请求量⽐较⼤,所以不能通过⽤户发起请求来进行惰性缓存,因此需要进行卖家数据的缓存预热。
因为卖家数据是⼀组相对固定、变化⼩的数据,所以在对外提供服务前,可先批量从DB获取数据,再设置到Redis缓存中。
