Rpc-实现Zookeeper注册中心

1.前言

本文章是笔主在声哥的手写RPC框架的学习下,对注册中心的一个拓展。因为声哥某些部分没有保留拓展性,所以本文章的项目与声哥的工程有部分区别,核心内容在Curator的注册发现与注销,思想看准即可。

本文章Git仓库:zko0/zko0-rpc

声哥的RPC项目写的确实很详细,跟学一遍受益匪浅:

何人听我楚狂声的博客

在声哥的项目里使用Nacos作为了服务注册中心。本人拓展添加了ZooKeeper实现服务注册。

Nacos的服务注册和发现,设计的不是非常好,每次服务的发现都需要去注册中心拉取。本人实现ZooKeeper注册中心时,参考了Dubbo的设计原理,结合本人自身想法,添加了本地缓存:

  • Client发现服务后缓存在本地,维护一个服务——实例列表
  • 当监听到注册中心的服务列表发生了变化,Client更新本地列表
  • 当注册中心宕机,Client能够依靠本地的服务列表继续提供服务

问题:

  1. 实现服务注册的本地缓存,还需要实现注册中心的监听,当注册中心的服务发生更改时能够实现动态更新。或者用轮训的方式,定时更新,不过这种方式的服务实时性较差
  2. 当Server宕机,非临时节点注册容易出现服务残留无法清除的问题。所以我建议全部使用临时节点去注册。

2.内容

zookeeper需要简单学一下,知识内容非常简单,搭建也很简单,在此跳过。

如果你感兴趣,可以参考我的ZooKeeper的文章:Zookeeper学习笔记 - zko0

①添加依赖

Curator:(简化ZooKeeper客户端使用)(Netfix研发,捐给Apache,是Apache顶级项目)

这里排除slf4j依赖,因为笔主使用的slf4j存在冲突

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> <dependency>     <groupId>org.apache.curator</groupId>     <artifactId>curator-recipes</artifactId>     <version>5.2.0</version>     <exclusions>         <exclusion>             <groupId>org.slf4j</groupId>             <artifactId>slf4j-api</artifactId>         </exclusion>     </exclusions> </dependency> 

②代码编写

1.首先创建一个连接类:

@Slf4j public class ZookeeperUtil {      //内部化构造方法     private ZookeeperUtil(){     }      private static final String SERVER_HOSTNAME= RegisterCenterConfig.getHostName();      private static final Integer SERVER_PORT=RegisterCenterConfig.getServerPort();      private static CuratorFramework zookeeperClient;      public static CuratorFramework getZookeeperClient(){         if (zookeeperClient==null){             synchronized (ZookeeperUtil.class){                 if (zookeeperClient==null){                     RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);                     zookeeperClient = CuratorFrameworkFactory.builder()                             .connectString(SERVER_HOSTNAME+":"+SERVER_PORT)                             .retryPolicy(retryPolic)                             //  zookeeper根目录为/serviceRegister,不为/                             .namespace("serviceRegister")                             .build();                     zookeeperClient.start();                 }             }         }         return zookeeperClient;     }      public static String getServerHostname(){         return SERVER_HOSTNAME;     }      public static Integer getServerPort(){         return SERVER_PORT;     }  } 

其中HOST,PORT信息我保存在regiserCenter.properties配置文件夹中,使用类读取:

public class RpcConfig {       //注册中心类型     private static String registerCenterType;      //序列化类型     private static String serializerType;      //负载均衡类型     private static String loadBalanceType;      //配置Nacos地址     private static String registerCenterHost;      private static Integer registerCenterPort;       private static boolean zookeeperDestoryIsEphemeral;      private static String serverHostName;      private static Integer serverPort;      static {         ResourceBundle bundle = ResourceBundle.getBundle("rpc");         registerCenterType=bundle.getString("registerCenter.type");         loadBalanceType=bundle.getString("loadBalance.type");         registerCenterHost=bundle.getString("registerCenter.host");         registerCenterPort = Integer.parseInt(bundle.getString("registerCenter.port"));         try {             zookeeperDestoryIsEphemeral="true".equals(bundle.getString("registerCenter.destory.isEphemeral"));         } catch (Exception e) {             zookeeperDestoryIsEphemeral=false;         }         serializerType=bundle.getString("serializer.type");         serverHostName=bundle.getString("server.hostName");         serverPort=Integer.parseInt(bundle.getString("server.port"));     }      public static String getRegisterCenterType() {         return registerCenterType;     }      public static String getSerializerType() {         return serializerType;     }      public static String getLoadBalanceType() {         return loadBalanceType;     }      public static String getRegisterCenterHost() {         return registerCenterHost;     }      public static Integer getRegisterCenterPort() {         return registerCenterPort;     }       public static String getServerHostName() {         return serverHostName;     }      public static Integer getServerPort() {         return serverPort;     }      public static boolean isZookeeperDestoryIsEphemeral() {         return zookeeperDestoryIsEphemeral;     }  } 

下面的代码我和声哥有些不同,我将服务注册,注销方法放在ServerUtils中,服务发现方法放在ClientUtils中:

服务的高一致性存在两种做法:

  • 因为ZooKeeper存在临时节点,注册中心可以实现Client(RPC的Server)断开,注册服务信息的自动丢失
  • 不设置为临时节点,手动的服务注册清除

我这里两种都实现了,虽然做两种方式不同但是功能相同的代码放在一起看起来很奇怪,这里只是做演示。选择其中一种即可。(我建议使用临时节点,当Server宕机,残留的服务信息也能及时清除)

注册实现原理图:

Rpc-实现Zookeeper注册中心

接口:

public interface ServiceDiscovery {     InetSocketAddress searchService(String serviceName);      void cleanLoaclCache(String serviceName); } 
public interface ServiceRegistry {     //服务注册     void register(String serviceName, InetSocketAddress inetAddress);      void cleanRegistry(); } 

ZooKeeper接口实现:

public class ZookeeperServiceDiscovery implements ServiceDiscovery{      private final LoadBalancer loadBalancer;      public ZookeeperServiceDiscovery(LoadBalancer loadBalancer) {         this.loadBalancer = loadBalancer;     }          @Override     public InetSocketAddress searchService(String serviceName) {         return ZookeeperClientUtils.searchService(serviceName,loadBalancer);     }      @Override     public void cleanLoaclCache(String serviceName) {         ZookeeperClientUtils.cleanLocalCache(serviceName);     } } 
public class ZookeeperServiceRegistry implements ServiceRegistry{     @Override     public void register(String serviceName, InetSocketAddress inetAddress) {         ZookeeperServerUitls.register(serviceName,inetAddress);     }      @Override     public void cleanRegistry() {         ZookeeperServerUitls.cleanRegistry();     } } 

Factory工厂:

public class ServiceFactory {      private static String center = RpcConfig.getRegisterCenterType();     private static String lb= RpcConfig.getLoadBalanceType();      private static  ServiceRegistry registry;      private static  ServiceDiscovery discovery;      private static Object registerLock=new Object();      private static Object discoveryLock=new Object();      public static ServiceDiscovery getServiceDiscovery(){         if (discovery==null){             synchronized (discoveryLock){                 if (discovery==null){                     if ("nacos".equalsIgnoreCase(center)){                         discovery= new NacosServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb));                     }else if ("zookeeper".equalsIgnoreCase(center)){                         discovery= new ZookeeperServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb));                     }                 }             }         }         return discovery;     }      public static ServiceRegistry getServiceRegistry(){         if (registry==null){             synchronized (registerLock){                 if (registry==null){                     if ("nacos".equalsIgnoreCase(center)){                         registry=  new NacosServiceRegistry();                     }else if ("zookeeper".equalsIgnoreCase(center)){                         registry= new ZookeeperServiceRegistry();                     }                 }             }         }         return registry;     }  } 

使用Gson序列化InetSocketAddress存在问题,编写Util:

public class InetSocketAddressSerializerUtil {     public static String getJsonByInetSockerAddress(InetSocketAddress address){         HashMap<String, String> map = new HashMap<>();         map.put("host",address.getHostName());         map.put("port",address.getPort()+"");         return new Gson().toJson(map);     }      public static InetSocketAddress getInetSocketAddressByJson(String json){         HashMap<String,String> hashMap = new Gson().fromJson(json, HashMap.class);         String host = hashMap.get("host");         Integer port=Integer.parseInt(hashMap.get("port"));         return new InetSocketAddress(host,port);     }  } 

上面主要是注册,发现的逻辑,我把主要方法写在了Utils中:

@Slf4j public class ZookeeperServerUitls {      private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();      private static final Set<String> instances=new ConcurrentHashSet<>();      public static void register(String serviceName, InetSocketAddress inetSocketAddress){          serviceName=ZookeeperUtil.serviceName2Path(serviceName);;         String uuid = UUID.randomUUID().toString();         serviceName=serviceName+"/"+uuid;         String json = InetSocketAddressSerializerUtil.getJsonByInetSockerAddress(inetSocketAddress);         try {             if (RpcConfig.isZookeeperDestoryIsEphemeral()){                 //会话结束节点,创建消失                 client.create()                         .creatingParentsIfNeeded()                         .withMode(CreateMode.EPHEMERAL)                         .forPath(serviceName,json.getBytes());             } else {                 client.create()                         .creatingParentsIfNeeded()                         .forPath(serviceName,json.getBytes());             }         }             catch (Exception e) {             log.error("服务注册失败");             throw  new RpcException(RpcError.REGISTER_SERVICE_FAILED);         }         //放入map         instances.add(serviceName);     }      public static void cleanRegistry(){         log.info("注销所有注册的服务");         //如果自动销毁,不需要清除         if (RpcConfig.isZookeeperDestoryIsEphemeral()) return;         if (ZookeeperUtil.getServerHostname()!=null&&ZookeeperUtil.getServerPort()!=null&&!instances.isEmpty()){             for (String path:instances) {                 try {                     client.delete().forPath(path);                 } catch (Exception e) {                     log.error("服务注销失败");                     throw new RpcException(RpcError.DESTORY_REGISTER_FALL);                 }             }         }     } } 
@Slf4j public class ZookeeperClientUtils {      private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();      private static final Map<String,  List<InetSocketAddress>> instances=new ConcurrentHashMap<>();      public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) {         InetSocketAddress address;         //本地缓存查询         if (instances.containsKey(serviceName)){             List<InetSocketAddress> addressList = instances.get(serviceName);             if (!addressList.isEmpty()){                 //使用lb进行负载均衡                 return loadBalancer.select(addressList);             }         }         try {             String path = ZookeeperUtil.serviceName2Path(serviceName);             //获取路径下所有的实现             List<String> instancePaths = client.getChildren().forPath(path);             List<InetSocketAddress> addressList = new ArrayList<>();             for (String instancePath : instancePaths) {                 byte[] bytes = client.getData().forPath(path+"/"+instancePath);                 String json = new String(bytes);                 InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);                 addressList.add(instance);             }             addLocalCache(serviceName,addressList);             return loadBalancer.select(addressList);         } catch (Exception e) {             log.error("服务获取失败====>{}",e);             throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);         }     }      public static void cleanLocalCache(String serviceName){         log.info("服务调用失败,清除本地缓存,重新获取实例===>{}",serviceName);         instances.remove(serviceName);     }      public static void addLocalCache(String serviceName,List<InetSocketAddress> addressList){         //直接替换原本的缓存         instances.put(serviceName,addressList);     } } 

③配置文件

rpc.properties放在resources下

#nacos    zookeeper #registerCenter.type=nacos registerCenter.type=zookeeper  #registerCenter.host=127.0.0.1 registerCenter.host=101.43.244.40  #zookeeper port default 2181 #registerCenter.port=9000 registerCenter.port=2181  registerCenter.destory.isEphemeral=false  #??random?roundRobin loadBalance.type=random  #kryo json jdk serializer.type=kryo  server.hostName=127.0.0.1 server.port=9999 

④更多

声哥的代码我做了很多修改,如果上述代码和你参考的项目代码出入比较大,可以查看本文章的工程阅读。

发表评论

评论已关闭。

相关文章