前言
之前也用过一些缓存中间件,框架,也想着自己是不是也能用Java写一个出来,于是就有了这个想法,打算在写的过程中同步进行总结
源码:weloe/Java-Distributed-Cache (github.com)
本篇代码:
上篇:
https://www.cnblogs.com/weloe/p/17050512.html
思路
既然是分布式缓存,那么就一定会有缓存管理方面的问题,既然是要储存的数据,那么就不能让它无限制的存储,就需要设置临界值,这个也是需要缓存淘汰的原因。
而为了对缓存方便管理,比如,我们需要缓存的有多个功能,我们为了方便区分,可能就需要在key前加上功能前缀,这样不仅变得麻烦,同时由于key变大,也会增加内存的压力。
所以我们就需要把缓存分组进行管理,并提供一些方便的对外接口
实现
CacheObj
Java-Distributed-Cache/CacheObj.java at master · weloe/Java-Distributed-Cache (github.com)
在前篇缓存淘汰中,我们确定了我们真正存储数据的是一个k,v结构,因此,我们需要抽象出这里的k,v,k选择String,而v则抽象出一个CacheObj。
需要注意的是,这里的endTime是该缓存到期的时间,一般而言,我们都有为目标缓存设定缓存时间的需求,这也是缓存淘汰策略中的一种。实际存储为byte[]则是为了通用性。
public class CacheObj { private LocalDateTime endTime; private Class clazz; private int byteSize; // 存储的实际数据 private byte[] data; public CacheObj() { } public CacheObj(LocalDateTime endTime,Class clazz ,int byteSize, byte[] data) { this.endTime = endTime; this.clazz = clazz; this.byteSize = byteSize; this.data = data; } public int getByteSize() { return byteSize; } public byte[] getData() { return data; } public void setEndTime(LocalDateTime endTime) { this.endTime = endTime; } public LocalDateTime getEndTime() { return endTime; } public void setClazz(Class clazz) { this.clazz = clazz; } }
Cache
Java-Distributed-Cache/Cache.java at master · weloe/Java-Distributed-Cache (github.com)
有组管理,也就需要单一的缓存管理
public class Cache { // 最大字节 private int maxByteSize; // 目前使用字节 private int normalByteSize; // 缓存策略 private CacheStrategy<String, CacheObj> cacheStrategy; Lock readLock; Lock writeLock; public Cache(int maxByteSize, CacheStrategy<String, CacheObj> cacheStrategy) { this.maxByteSize = maxByteSize; this.normalByteSize = 0; this.cacheStrategy = cacheStrategy; readLock = new ReentrantReadWriteLock().readLock(); writeLock = new ReentrantReadWriteLock().writeLock(); } public CacheObj add(String key, CacheObj cacheObj) { writeLock.lock(); normalByteSize += cacheObj.getByteSize(); // 缓存上限 while (normalByteSize > maxByteSize) { // 淘汰缓存 CacheObj outCache = cacheStrategy.outCache(); normalByteSize -= outCache.getByteSize(); } // 加入缓存 CacheObj v = cacheStrategy.put(key, cacheObj); writeLock.unlock(); return v; } public CacheObj get(String key) { readLock.lock(); CacheObj v = cacheStrategy.get(key); // 判断是否过期 if (v != null && v.getEndTime() != null && LocalDateTime.now().isAfter(v.getEndTime())) { CacheObj obj = cacheStrategy.outCache(key); return null; } readLock.unlock(); return v; } public CacheObj remove(String key){ return cacheStrategy.outCache(key); } public void clear(){ cacheStrategy.clear(); } public void setMaxByteSize(int maxByteSize) { this.maxByteSize = maxByteSize; } public int getMaxByteSize() { return maxByteSize; } public int getNormalByteSize() { return normalByteSize; } }
Group
Java-Distributed-Cache/Group.java at master · weloe/Java-Distributed-Cache (github.com)
既然需要组管理,那么就需要抽象出一个Group类型,这里的getter是需要后期自定义的回调函数。
public class Group { private String name; private Cache cache; private Getter getter; @FunctionalInterface interface Getter { byte[] get(String k) throws Exception; }
put,get
为了方便管理,Group需要提供get,put法
public CacheObj get(String key) { if ("".equals(key) || key == null) { throw new RuntimeException("key不能为空"); } CacheObj cacheObj = cache.get(key); if (cacheObj != null) { return cacheObj; } return load(key); } /** * 通过Getter回调获取数据 * * @param key * @return */ private CacheObj load(String key) { byte[] bytes = null; try { bytes = getter.get(key); } catch (Exception e) { e.printStackTrace(); return null; } if (bytes == null) { return null; } CacheObj cacheObj = BytesUtil.bytes2CacheObj(bytes); cache.add(key, cacheObj); return cacheObj; } public CacheObj putCacheObj(String key,CacheObj cacheObj){ CacheObj obj = cache.add(key, cacheObj); return obj; }
expire
为存储的数据设定存储时间的方法
public CacheObj expire(String key, long num, ChronoUnit timeUnit){ CacheObj cacheObj; try { cacheObj = cache.get(key); cacheObj.setEndTime(LocalDateTime.now().plus(num, timeUnit)); } catch (Exception e) { return null; } return cacheObj; }
setSize
设置缓存临界值的方法
public boolean setMaxSize(int num){ if(num < cache.getNormalByteSize()){ return false; } cache.setMaxByteSize(num); return true; }
delete,clear
清除组缓存的方法,从这里也可以看出其方便性,即可以清除单一功能(组)的缓存
public CacheObj delete(String key){ CacheObj obj = cache.remove(key); return obj; } public void clear(){ cache.clear(); }
GroupManager
Java-Distributed-Cache/GroupManager.java at master · weloe/Java-Distributed-Cache (github.com)
既然有Group,就需要管理Group,也就需要相对应的put,get方法
public class GroupManager { private Map<String, Group> groupMap; public GroupManager(Map<String, Group> groupMap) { this.groupMap = groupMap; } public Group getGroup(String key) { Group group = groupMap.get(key); return group; } public Group put(Group group){ return groupMap.put(group.getName(),group); } }
测试
CacheTest
Java-Distributed-Cache/CacheTest.java at master · weloe/Java-Distributed-Cache (github.com)
class CacheTest { Cache cache; @BeforeEach void setUp() { CacheStrategy<String, CacheObj> lruCache = new LRUCache<>(5); lruCache.setCallback((s1, s2)-> System.out.println("缓存淘汰")); cache = new Cache(1024*1024,lruCache); } @Test void add() { String s = "123"; CacheObj cacheObj = new CacheObj(LocalDateTime.MAX, String.class, 512*1024, s.getBytes(StandardCharsets.UTF_8)); cache.add("test", cacheObj); for (int i = 0; i < 5; i++) { cache.add("test"+i,cacheObj); } } @Test void get() { CacheObj cacheObj = cache.get("123"); Assertions.assertNull(cacheObj); String s = "123"; cacheObj = new CacheObj(LocalDateTime.MAX,String.class, s.getBytes(StandardCharsets.UTF_8).length, s.getBytes(StandardCharsets.UTF_8)); cache.add("test", cacheObj); CacheObj test = cache.get("test"); Assertions.assertNotNull(test); byte[] data = test.getData(); String s1 = BytesUtil.bytes2String(data); System.out.println(s1); } }
GroupTest
Java-Distributed-Cache/GroupTest.java at master · weloe/Java-Distributed-Cache (github.com)
package com.weloe.cache.cachemanager; import com.weloe.cache.outstrategy.CacheStrategy; import com.weloe.cache.outstrategy.LRUCache; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.nio.charset.StandardCharsets; import java.time.temporal.ChronoUnit; import java.util.concurrent.TimeUnit; class GroupTest { Group group; @BeforeEach void setUp() { CacheStrategy<String, CacheObj> lruCache = new LRUCache<>(5); lruCache.setCallback((s1, s2)-> System.out.println("缓存淘汰")); group = new Group("group1", new Cache(1024*1024,lruCache), str -> { System.out.println("group1回调"); return new byte[0]; }); } @Test void get() { group.putCacheObj("1",new CacheObj()); CacheObj cacheObj = group.get("1"); } @Test void getName() { String name = group.getName(); System.out.println(name); } @Test void putCacheObj() { String x = "132"; group.putCacheObj("cache1",new CacheObj(null,String.class,x.getBytes(StandardCharsets.UTF_8).length,x.getBytes(StandardCharsets.UTF_8))); } @Test void expire() { String x = "132"; group.putCacheObj("cache1",new CacheObj(null,String.class,x.getBytes(StandardCharsets.UTF_8).length,x.getBytes(StandardCharsets.UTF_8))); CacheObj cache1 = group.expire("cache1", 2, ChronoUnit.MINUTES); System.out.println(cache1.getEndTime()); System.out.println(group.get("cache1").getEndTime()); Assertions.assertSame(cache1.getEndTime(),group.get("cache1").getEndTime()); } }
GroupManagerTest
Java-Distributed-Cache/GroupManagerTest.java at master · weloe/Java-Distributed-Cache (github.com)
package com.weloe.cache.cachemanager; import com.weloe.cache.outstrategy.LRUCache; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.concurrent.locks.ReentrantLock; class GroupManagerTest { GroupManager groupManager; @BeforeEach void setUp() { Group group1 = new Group("group1", new Cache(1024*1024, new LRUCache<>(5,(s1, s2)-> System.out.println("group1缓存淘汰"))), str -> {System.out.println("group1未获取缓存的回调");return new byte[0];} ); Group group2 = new Group("group2", new Cache(1024*1024, new LRUCache<>(5,(s1, s2)-> System.out.println("group2缓存淘汰"))), str -> {System.out.println("group2未获取缓存的回调");return new byte[0];} ); groupManager = new GroupManager(new HashMap<>(),new ReentrantLock()); groupManager.put(group1); groupManager.put(group2); } @Test void getGroup() { System.out.println(groupManager.getGroup("")); System.out.println(groupManager.getGroup("group1")); System.out.println(groupManager.getGroup("group2").getName()); } @Test void put() { Group group3 = new Group("group3", new Cache(1024*1024, new LRUCache<>(5,(s1, s2)-> System.out.println("group3缓存淘汰"))), str -> {System.out.println("group3未获取缓存的回调");return new byte[0];} ); groupManager.put(group3); System.out.println(groupManager.getGroup("group3").getName()); } }