用Java写一个分布式缓存——RESP服务端

前言

本篇我们将完成一个RESP的socket的服务端,初步完成一个单机版缓存。

另外在其中我们还需要完成命令的动态路由

源码:https://github.com/weloe/Java-Distributed-Cache

本篇代码:

https://github.com/weloe/Java-Distributed-Cache/tree/master/src/main/java/com/weloe/cache/server

上篇:缓存管理 https://www.cnblogs.com/weloe/p/17068891.html

RESP协议

RESP协议支持5种数据类型:字符串,异常,整数,多行字符串,数组

数据类型由第一个字节进行区分,不同部分使用rn来分开

  • '+' : 简单字符串
  • '-' : 异常
  • ':' : 整数
  • '$': 多行字符串
  • '*': 数组

简单字符串一般用来返回该操作无误,操作成功,例如返回+okrn

异常: -error msgrn

整数:: 1rn

多行字符串:

$4rn test 

这里的4是实际数据的长度

实际信息为 test

数组:

*2rn $2rn abrn $3rn cdern 

信息为字符串数组[ab][cde]

实现

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/util/RESPUtil.java

根据协议我们就能编写出对请求信息的解析类了,我们把它抽象为一个工具类

对返回内容解析的代码

	/**      * 读取RESP协议的字节,转为String      *      * @return String      */     public static String parseRESPBytes(InputStream inputStream) {         byte[] bytes;          String result = null;         try {             while (inputStream.available() == 0) {             }             bytes = new byte[inputStream.available()];             inputStream.read(bytes);             result = new String(bytes);          } catch (IOException e) {             e.printStackTrace();         }          return result;     }      /**      * 解析RESP协议的String      *      * @param raw      * @return      */     public static Object parseRESPString(String raw) {         byte type = raw.getBytes()[0];         String result = raw.substring(1);         switch (type) {             case '+':                 // +okrn                 // 读单行                 return result.replace("rn", "");             case '-':                 // 异常                 // -Error msgrn                 throw new RuntimeException(result.replace("rn", ""));             case ':':                 // 数字                 return result.replace("rn", "");             case '$':                 return result.split("rn")[1];             case '*':                 // 多行字符串                 String[] strList = result.substring(result.indexOf("$")).split("rn");                 System.out.print("多条批量请求:");                 List<String> list = new LinkedList<>();                 for (int i = 1; i < strList.length; i += 2) {                     System.out.print(strList[i] + " ");                     list.add(strList[i]);                 }                 System.out.println();                 return list;             default:                 throw new RuntimeException("错误的数据格式");         }     } 

发送请求的代码

    /**      * 发送RESP请求      * @param host      * @param port      * @param args      * @return      * @throws IOException      */     public static byte[] sendRequest(String host,Integer port, String ... args) throws IOException {         Socket socket = new Socket(host,port);         RESPRequest request = new RESPRequest(socket);         PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));         sendRequest(writer,args);          byte[] param = request.getBytes();          request.close();         writer.close();         socket.close();         return param;     }      /**      * 发送RESP请求      * @param writer      * @param args      * @throws IOException      */     public static void sendRequest(PrintWriter writer, String ... args) throws IOException {         writer.println("*"+args.length);         for (String arg : args) {             writer.println("$"+arg.getBytes(StandardCharsets.UTF_8).length);             writer.println(arg);         }         writer.flush();     } 

命令的解析

通过RESP协议的实现,我们就能做到对我们的缓存发送命令,那么我们又该怎么对命令进行解析,然后做出对应的操作呢?

这里我们使用前缀树结构来进行动态路由

Node

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/parser/Node.java

首先要构建树的节点

需要注意的是只有叶子节点才有pattern

isWild来判断是否有通配符则是为了来匹配用户输入的参数,我们设置路由的时候用:作为通配符

例如设置路由set :group :k :v

命令为:set g1 k1 v1

把 :group和g1匹配,:k和k1匹配,:v和v1匹配

最后我们要解析出 group = g1 , k = k1, v = v1

/**  * @author weloe  */ public class Node {     /**      * 待匹配的命令参数,只有最后一层才有 例如 set :group :key :value      */     private String pattern;      /**      * 命令参数的一部分例如 set :group :key :value 中的 :group      */     private String part;      /**      * 子节点      */     private List<Node> children;      /**      * 是否是通配符节点      */     private boolean isWild;      public Node() {         this.children = new LinkedList<>();         this.part = "";         this.pattern = "";     }      public Node(String part, boolean isWild) {         this.part = part;         this.isWild = isWild;         this.children = new LinkedList<>();     } } 

注册节点的方法

	/**      * 注册节点      *      * @param pattern      * @param parts      * @param height      */     public void insert(String pattern, String[] parts, int height) {         // 终止条件,height匹配完,到了最下层         if (parts.length == height) {             this.pattern = pattern;             return;         }          String part = parts[height];         // 匹配出一个子节点         Node child = matchChild(part);         if (child == null) {             // 如果当前part的第一个字符是":"或者"*"就为模糊匹配             child = new Node(part, part.startsWith(":") || part.startsWith("*"));             // 增加当前节点的子节点             children.add(child);         }         child.insert(pattern, parts, height + 1);     } 

注册节点方法中调用的machChild()为 根据part部分匹配子节点的方法

	/**      * 根据part匹配子节点      *      * @param part      * @return 第一个匹配节点      */     public Node matchChild(String part) {         for (Node child : children) {             if (child.part.equals(part) || child.isWild) {                 return child;             }         }         return null;     } 

根据字符串数组(输入命令)来递归匹配出对应叶子节点的方法

	/**      * 根据parts[]匹配出节点      * @param parts      * @param height      * @return      */     public Node search(String[] parts, int height) {         // 匹配到末端         if(parts.length == height || part.startsWith("*")){             if(pattern == null){                 return null;             }             // 匹配到节点             return this;         }          String part = parts[height];         // 根据part找到匹配的子节点         List<Node> children = matchChildren(part);          for (Node child : children) {             Node node = child.search(parts, height + 1);             if(node != null){                 return node;             }         }          return null;     } 

search方法中调用的matchChildren为 根据part来匹配出所有符合的子节点的方法

    /**      * 根据part匹配子节点      * @param part      * @return 所有匹配的节点      */     public List<Node> matchChildren(String part) {         ArrayList<Node> nodes = new ArrayList<>();         for (Node child : children) {             if (child.part.equals(part) || child.isWild) {                 nodes.add(child);             }         }         return nodes;     } 

Router

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/parser/Router.java

Router提供了添加路由和根据用户命令匹配路由的方法

这里的HandlerFunc是需要我们在添加路由addRoute时进行设置的命令对应的处理函数

这里的Route类是我们在getRoute时返回的,node就是匹配到的树的叶子节点,map为匹配到的通配符参数

例如设置路由set :group :k :v

命令为:set g1 k1 v1

把 :group和g1匹配,:k和k1匹配,:v和v1匹配

map即为 group = g1 , k = k1, v = v1

/**  * @author weloe  */ public class Router {      @FunctionalInterface     public interface HandlerFunc {         Object handle(RESPContext context);     }      public class Route{         private Node node;         private Map<String,String> map;          public Route(Node node, Map<String, String> map) {             this.node = node;             this.map = map;         }          public Object handle(RESPContext context){             context.setParamMap(map);             String key = "command"+"-"+node.getPattern();             return handlers.get(key).handle(context);         }          public Map<String, String> getMap() {             return map;         }          public Node getNode() {             return node;         }     }      /**      * 根节点      */     private Map<String,Node> roots;      private Map<String,HandlerFunc> handlers;       public Router() {         this.roots = new LinkedHashMap<>();         this.handlers = new LinkedHashMap<>();     }      /**      * 解析pattern      * @param pattern      * @return      */     public String[] parsePattern(String pattern){         String[] patterns = pattern.split(" ");          String[] parts = new String[patterns.length];         for (int i = 0; i < patterns.length; i++) {             parts[i] = patterns[i];             if(patterns[i].charAt(0) == '*'){                 break;             }         }          return parts;     }      public void addRoute(String method,String pattern,HandlerFunc handler){         String[] parts = parsePattern(pattern);          String key = method + "-" + pattern;         Node node = roots.get(method);         // 判断有没有该method对应的根节点,没有就建一个         if (node == null) {             roots.put(method,new Node());         }         roots.get(method).insert(pattern,parts,0);         handlers.put(key,handler);     }      public Route getRoute(String path){         return getRoute("command",path);     }      public Route getRoute(String method,String path){          String[] patterns = parsePattern(path);         Map<String, String> params = new LinkedHashMap<>();          Node root = roots.get(method);         if(root == null){             return null;         }         Node res = root.search(patterns, 0);          if (res == null) {             return null;         }         String[] parts = parsePattern(res.getPattern());         for (int i = 0; i < parts.length; i++) {             String part = parts[i];             if (part.charAt(0) == ':') {                 params.put(part.substring(1),patterns[i]);             }             if(part.charAt(0) == '*' && part.length() > 1){                 String collect = Arrays.stream(patterns).skip(i).collect(Collectors.joining(" "));                 params.put(part.substring(1),collect);                 break;             }          }          return new Route(res,params);     }  } 

测试

这里相当于我们输入delete g1 k1 v1

匹配到命令为 delete

匹配到的参数为key:k1

@Test     void getRoute() {         Router router = new Router();          router.addRoute("command","set :group :key :v",null);         router.addRoute("command","delete :group :key :v",null);         router.addRoute("command","expire :group :key :v",null);         router.addRoute("command","get :group :key :v",null);         router.addRoute("command","hget :key :field",null);         router.addRoute("command","config set maxsize :size",null);         router.addRoute("command","config set maxnum :num",null);         router.addRoute("command","config set cachestrategy :strategy",null);          Router.Route route = router.getRoute("command", "delete g1 k1 v1");         Assertions.assertEquals(route.getNode().getPattern(),"delete :group :key :v");         Assertions.assertEquals(route.getMap().get("key"),"k1");     }  

服务端

根据以上内容我们可以封装出RESPContext,RESPRqeuest,以及RESPReqspnse类

RESPContext

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/resp/RESPContext.java

作为一个服务端自然有上下文信息,我们将它封装为一个Context类

/**  * @author weloe  */ public class RESPContext {      private LocalDateTime startTime;      private Socket socket;      private RESPRequest request;      private RESPResponse response;      private Router.Route route;      private Map<String,String> paramMap;      private Group group;       public void initServer(Socket socket, RESPRequest request, RESPResponse response) throws IOException {          this.socket = socket;         this.request = request;         this.response = response;         this.startTime = LocalDateTime.now();     }       /**      * 解析RESP协议的字节      *      * @return      */     public String parseRESPBytes() {         String result = request.parseRESPBytes();          return result;     }      /**      * 解析RESP协议的String      *      * @param raw      * @return      */     public Object parseRESPString(String raw) {         Object obj = request.parseRESPString(raw);         return obj;     }      public void ok() {         response.ok();     }      public void ok(byte[] bytes){         response.ok(String.valueOf(bytes));     }      public void ok(String arg) {         response.ok(arg);     }      public void ok(String... args) {         response.ok(args);     }      public void error(String msg) {         response.error(msg);     }      public void close() {         if (request != null) {             try {                 request.close();             } catch (IOException e) {                 e.printStackTrace();             }         }          if (response != null) {             response.close();         }          if (socket != null) {             try {                 socket.close();             } catch (IOException e) {                 e.printStackTrace();             }         }      }      public Socket getSocket() {         return socket;     }      public RESPRequest getRequest() {         return request;     }      public RESPResponse getResponse() {         return response;     }      public LocalDateTime getStartTime() {         return startTime;     }      public void setStartTime(LocalDateTime startTime) {         this.startTime = startTime;     }      public void setParamMap(Map<String, String> paramMap) {         this.paramMap = paramMap;     }      public Map<String, String> getParamMap() {         return paramMap;     }      public void setRoute(Router.Route route) {         this.route = route;     }      public Router.Route getRoute() {         return route;     }      public void setGroup(Group group) {         this.group = group;     }      public Group getGroup() {         return group;     }      public String getParam(String key){         return paramMap.get(key);     } }  

RESPRequest

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/resp/RESPRequest.java

/**  * @author weloe  */ public class RESPRequest {     private InputStream inputStream;      private Map<String,String> params;      public RESPRequest(Socket socket) throws IOException {         this.inputStream = socket.getInputStream();     }      public String getParam(String key) {         return params.get(key);     }      /**      * 解析RESP协议的字节      *      * @return      */     public byte[] getBytes() {         byte[] bytes = null;          try {             while (inputStream.available() == 0) {             }             bytes = new byte[inputStream.available()];             inputStream.read(bytes);          } catch (IOException e) {             e.printStackTrace();         }          return bytes;     }      /**      * 解析RESP协议的字节      *      * @return      */     public String parseRESPBytes() {         byte[] bytes;         byte[] buf = new byte[1];         String result = null;         try {             System.out.println("等待数据传输"); //            try { //                // 读不到阻塞 //                inputStream.read(buf); //            } catch (IOException e) { //                return null; //            } //            result = new String(buf) + new String(bytes);             while (inputStream.available() == 0) {             }             bytes = new byte[inputStream.available()];             inputStream.read(bytes);             result = new String(bytes);          } catch (IOException e) {             e.printStackTrace();         }          return result;     }      /**      * 解析RESP协议的String      *      * @param raw      * @return      */     public Object parseRESPString(String raw) {         byte type = raw.getBytes()[0];         String result = raw.substring(1);         switch (type) {             case '+':                 // +okrn                 // 读单行                 return result.replace("rn", "");             case '-':                 // 异常                 // -Error msgrn                 throw new RuntimeException(result.replace("rn", ""));             case ':':                 // 数字                 return result.replace("rn", "");             case '$':                 return result.substring(1).replace("rn", "");             case '*':                 // 多行字符串                 String[] strList = result.substring(result.indexOf("$")).split("rn");                 System.out.print("多条批量请求:");                 List<String> list = new LinkedList<>();                 for (int i = 1; i < strList.length; i += 2) {                     System.out.print(strList[i] + " ");                     list.add(strList[i]);                 }                 System.out.println();                 return list;             default:                 throw new RuntimeException("错误的数据格式");         }     }      public void close() throws IOException {         if (inputStream != null) {             inputStream.close();         }     }       public InputStream getInputStream() {         return inputStream;     } } 

RESPResponse

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/resp/RESPResponse.java

/**  * @author weloe  */ public class RESPResponse {     private PrintWriter writer;      public RESPResponse(Socket socket) throws IOException {         // 字符输出流,可以直接按行输出         writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));     }      public void ok() {         writer.println(RESPStatus.OK.getMsg());         writer.flush();     }      public void ok(Integer arg) {         writer.println(":" + arg);         writer.flush();     }      public void ok(String arg) {         writer.println("$" + arg.getBytes(StandardCharsets.UTF_8).length);         writer.println(arg);         writer.flush();     }      public void ok(String... args) {         writer.println("*" + args.length);         for (String arg : args) {             writer.println("$" + arg.getBytes(StandardCharsets.UTF_8).length);             writer.println(arg);         }         writer.flush();     }      public void error(String msg) {         writer.println(RESPStatus.ERROR.getMsg() + " " + msg);         writer.flush();     }      public void close() {         if (writer != null) {             writer.close();         }     }      public PrintWriter getWriter() {         return writer;     }   } 

Service

https://github.com/weloe/Java-Distributed-Cache/tree/master/src/main/java/com/weloe/cache/server/command

命令有对应的HandlerFunc函数,我们就需要在函数中调用我们的相应的service,因此需要抽象出对应的service

这里的service为对前篇缓存管理中的类的简单调用,就不再一一赘述

ConfigService

/**  * config操作  * @author weloe  */ public class ConfigService {      public Object getNormalSize(Group group){         return group.getNormalSize();     }      public Object getMaxSize(Group group){         return group.getMaxSize();     }      public Object setMaxSize(Group group,String size) {         group.setMaxSize(Integer.parseInt(size));         return "";     }      public Object setMaxNum(Group group,String num) {         return null;     }  } 

DeleteService

/**  * 删除key相关操作  * @author weloe  */ public class DeleteService {      public Object delete(Group group,String key) {         CacheObj delete = group.delete(key);         if(delete == null){             return null;         }         return "";     }       public Object clear(Group group) {         group.clear();         return "";     } } 

ExpireService

/**  * 设置,获取key的过期时间,单位秒  * @author weloe  */ public class ExpireService {      public Object expire(Group group,String key,String value) {         LocalDateTime time = group.expire(key, Long.parseLong(value), ChronoUnit.SECONDS);         return time;     }      public Object ttl(Group group,String key) {         long ttl = group.ttl(key);         return ttl;     } } 

GroupService

/**  * group操作  * @author weloe  */ public class GroupService {      private GroupManager groupManager = GroupManager.getInstance();      public Object add(String groupName) {          Group group = new Group();         group.setName(groupName);         group.setCache(new Cache());         group.setGetter(k -> null);         groupManager.put(group);          return "";     }   } 

StringService

/**  * 缓存set,get操作  * @author weloe  */ public class StringService {      public String get(Group group,String key) {         CacheObj cacheObj = group.get(key);         if(cacheObj != null){             return new String(cacheObj.getData());         }         return null;     }       public Object set(Group group,String key,String value) {         group.putCacheObj(key, new CacheObj(value.getBytes(StandardCharsets.UTF_8)));         return "";     } } 

ServiceFactory

为了方便管理,另外写了管理service的类

public class ServiceFactory {      Map<String,Object> map;      public ServiceFactory() {         map = new LinkedHashMap<>();         map.put("str",new StringService());         map.put("group",new GroupService());         map.put("config",new ConfigService());         map.put("delete",new DeleteService());         map.put("expire",new ExpireService());     }       public<T> T getBean(String name){         return (T) map.get(name);     } } 

CommandQueue

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/parser/CommandQueue.java

这里我们使用多线程io解析命令,单线程执行命令的模型,多线程解析完命令得到Route后把Context加入到阻塞队列中,再消费阻塞队列的Context执行命令

public class CommandQueue {      private static PriorityBlockingQueue<RESPContext> commandQueue = new PriorityBlockingQueue<>(5, (o1, o2) -> {         if (o1.getStartTime().isBefore(o2.getStartTime())) {             return -1;         }         return 1;     });      public boolean add(RESPContext context){         return commandQueue.add(context);     }       public void consume() {         new Thread(() -> {             System.out.println("服务端等待接收命令...");             while (true) {                 RESPContext respContext = null;                 try {                     respContext = commandQueue.take();                 } catch (InterruptedException e) {                     respContext.error(e.getMessage());                     e.printStackTrace();                     continue;                 }                  System.out.println("执行命令"+respContext.getRoute().getNode().getPattern());                 // 执行命令                 Object handle = respContext.getRoute().handle(respContext);                 System.out.println(handle);                 if(handle == null){                     respContext.ok("nil");                 }else if(handle.equals("")){                     respContext.ok();                 }else {                     respContext.ok(handle.toString());                 }             }         }).start();       }      public PriorityBlockingQueue<RESPContext> getCommandQueue() {         return commandQueue;     } } 

Launch服务端启动类

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/Launch.java

public class Launch {     private static final ExecutorService poolExecutor = new ThreadPoolExecutor(2, 5,             30L, TimeUnit.SECONDS,             new ArrayBlockingQueue(10));      private static final GroupManager groupManager = GroupManager.getInstance();      private static final ServiceFactory factory = new ServiceFactory();      private static final Router router = new Router();      static {         // 注册路由信息         ConfigService config = factory.getBean("config");         StringService str = factory.getBean("str");         DeleteService delete = factory.getBean("delete");         ExpireService expire = factory.getBean("expire");         GroupService groupService = factory.getBean("group");          router.addRoute("group add :name", c -> groupService.add(c.getParam("name")));          router.addRoute("config set maxByteSize :group :size", c -> config.setMaxSize(c.getGroup(),c.getParam("size")))               .addRoute("config get maxByteSize :group", c -> config.getMaxSize(c.getGroup()))               .addRoute("config get normalSize :group", c -> config.getNormalSize(c.getGroup()))               .addRoute("config set maxNum :group :num", c -> config.setMaxNum(c.getGroup(),c.getParam("num")));           router.addRoute("expire :group :k :n", c -> expire.expire(c.getGroup(),c.getParam("k"),c.getParam("n")))               .addRoute("ttl :group :k", c -> expire.ttl(c.getGroup(),c.getParam("k")));          router.addRoute("delete :group :k", c -> delete.delete(c.getGroup(),c.getParam("size")))               .addRoute("clear :group", c -> delete.clear(c.getGroup()));          router.addRoute("set :group :k :v", c -> str.set(c.getGroup(),c.getParam("k"),c.getParam("v")))               .addRoute("get :group :k", c -> str.get(c.getGroup(),c.getParam("k")));       }       public static void main(String[] args) throws IOException {          CommandQueue commandQueue = new CommandQueue();         commandQueue.consume();          ServerSocket serverSocket = new ServerSocket(8081);          while (true) {             // 初始化server             Socket socket = serverSocket.accept();             System.out.println(socket.getInetAddress() + ":" + socket.getPort() + "连接");             poolExecutor.submit(() -> task(commandQueue, socket));          }       }      private static void task(CommandQueue commandQueue, Socket socket) {         RESPContext context = null;          System.out.println("线程"+Thread.currentThread().getId()+" 执行");         try {             while (true) {                 context = new RESPContext();                 context.initServer(socket,new RESPRequest(socket),new RESPResponse(socket));                  Object requestData = null;                 try {                     // 处理请求                     String res = context.parseRESPBytes();                     if(res == null){                         return;                     }                     System.out.printf("%s => %s%n", "原始格式", res.replace("rn", "\r\n"));                     requestData = context.parseRESPString(res);                  } catch (Exception e) {                     System.out.println(e.getMessage());                     context.error(e.getMessage());                     continue;                 }                  List<String> commandStr = (List<String>) requestData;                  System.out.println("接收到" + socket.getInetAddress() + ":" + socket.getPort() +"的命令");                  // 解析命令                 Router.Route route = router.getRoute(String.join(" ",commandStr));                 if (route == null) {                     context.ok("请检查你的命令参数");                     continue;                 }                  Map<String, String> paramMap = route.getMap();                 String name = paramMap.get("group");                 if(name != null){                     if(groupManager.getGroup(name) == null){                         context.ok("该group不存在");                         continue;                     }                     context.setGroup(groupManager.getGroup(name));                 }                 context.setRoute(route);                   // 命令加入阻塞队列                 commandQueue.add(context);             }          } catch (IOException e) {             e.printStackTrace();         } finally {             context.close();         }     }  }  

Client

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/client/Client.java

public class ClientLaunch {     static Socket socket;     static PrintWriter writer;     static BufferedReader reader;     static InputStream inputStream;      static String host = "127.0.0.1";     static int port = 8081;      public static void main(String[] args) {          try {             // 建立连接             socket = new Socket(host,port);             // 获取输出输入流             // 字符输出流,可以直接按行输出             writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));             inputStream = socket.getInputStream();              while (true) {                 Scanner reader = new Scanner(System.in);                 if (reader.hasNextLine()) {                     String s = reader.nextLine();                     if("exit".equals(s)){                         System.out.println("exit cli");                         return;                     }                     if (!s.isEmpty()) {                         Object obj;                         // 操作命令                         sendRequest(s.split(" "));                         // 响应                         obj = inputStreamHandleByteResponse();                         System.out.println(obj);                     }                 }             }          } catch (IOException e) {             e.printStackTrace();         } finally {             // 释放连接             try {                 if(reader != null) {                     reader.close();                 }             } catch (IOException e) {                 e.printStackTrace();             }             try {                 if(inputStream != null) {                     inputStream.close();                 }             } catch (IOException e) {                 e.printStackTrace();             }             try {                 if(writer != null) {                     writer.close();                 }             } catch (Exception e) {                 e.printStackTrace();             }             try {                 if(socket != null) {                     socket.close();                 }             } catch (IOException e) {                 e.printStackTrace();             }         }      }      private static void sendRequest(String ... args) {          writer.println("*"+args.length);         for (String arg : args) {             writer.println("$"+arg.getBytes(StandardCharsets.UTF_8).length);             writer.println(arg);         }          writer.flush();      }       public static Object inputStreamHandleByteResponse() {         String result = RESPUtil.parseRESPBytes(inputStream);         return RESPUtil.parseRESPString(result);     }   }  

测试

最后我们就可以启动Launch和Client来进行一下使用了

服务端启动

服务端等待接收命令... /127.0.0.1:53779连接 线程13 执行 等待数据传输 

客户端发送请求group add test

服务端返回

ok 

服务端输出

原始格式 => *3rn$5rngrouprn$3rnaddrn$4rntestrn 多条批量请求:group add test  接收到/127.0.0.1:53779的命令 执行命令group add :name 

客户端设置缓存set test testKey testV

服务端返回

ok 

服务端输出

原始格式 => *4rn$3rnsetrn$4rntestrn$7rntestKeyrn$5rntestVrn 多条批量请求:set test testKey testV  接收到/127.0.0.1:53779的命令 执行命令set :group :k :v 

客户端get操作get test testKey

服务端返回

testV 

至此,单机的缓存就基本完成~

发表评论

评论已关闭。

相关文章