Dubbo源码—1.服务发布的主要流程

大纲

1.Dubbo 2.7和3.x版本的区别

2.Dubbo服务的基本流程和启动入口

3.Dubbo服务发布的主流程

4.服务发布时执行相关组件的初始化

5.服务发布时执行的服务实例刷新操作

6.服务发布时执行的服务实例初始化操作

7.服务发布时执行的服务实例发布操作

8.执行服务实例发布操作时的主流程

9.服务发布过程中ProxyFactory生成Invoker

10.服务发布过程中Protocol组件发布Invoker

11.服务发布过程中NettyServer的构造流程

12.服务发布过程中RegistryProtocol的服务注册

13.Dubbo服务发布的完整流程总结

 

1.Dubbo 2.7和3.x版本的区别

区别一:后者引入了ModuleDeployer组件专门做服务启动时的初始化工作。将原来的注册中心拆分为三大中心:注册中心、配置中心、元数据中心。

 

区别二:后者很多地方使用了Double Check来代替前者对方法加Synchronized锁,大量采用了Double Check + Volatile + Static来实现单例模式。

 

区别三:后者引入了MigrationRuleListener、MigrationRuleHandler、MigrationInvoker,引入DynamicDirectory代替RegistryDirectory。

 

2.Dubbo服务的基本流程和启动入口

(1)Dubbo服务的基本流程

(2)Provider启动入口

(3)Consumer启动入口

 

(1)Dubbo服务的基本流程

Dubbo源码—1.服务发布的主要流程

(2)Provider启动入口

public class Application {     public static void main(String[] args) throws Exception {         //Service和ServiceConfig是什么         //Service是一个服务,每个服务可能会包含多个接口         //ServiceConfig便是针对这个服务的一些配置         //下面传入的泛型DemoServiceImpl便是服务接口的实现         ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();          //设置服务暴露出去的接口         service.setInterface(DemoService.class);          //设置暴露出去的接口的实现类         service.setRef(new DemoServiceImpl());          //服务名称,可以在服务框架里进行定位         service.setApplication(new ApplicationConfig("dubbo-demo-api-provider"));          //所有的RPC框架,必须要和注册中心配合使用,服务启动后必须向注册中心进行注册         //注册中心可以知道每个服务有几个实例,每个实例在哪台服务器上         //进行服务调用时,要先找注册中心咨询要调用的服务有几个实例,分别都在什么机器上         //下面便是设置ZooKeeper作为注册中心的地址         service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));                  //设置元数据上报的地方         //Dubbo服务实例启动后,会有自己的元数据,需要上报到一个地方进行管理,比如zookeeper         service.setMetadataReportConfig(new MetadataReportConfig("zookeeper://127.0.0.1:2181"));          //配置完毕后,调用ServiceConfig的export()方法启动网络监听程序         //当接收到调用请求时,该网络监听程序会建立网络连接进行通信         //接收按照协议封装的请求数据,该网络监听程序会执行RPC调用         //此外,ServiceConfig的export()方法还会把自己作为一个服务实例注册到zk里         service.export();          System.out.println("dubbo service started");         new CountDownLatch(1).await();     } }

(3)Consumer启动入口

public class Application {     public static void main(String[] args) throws Exception {         //Reference和ReferenceConfig是什么         //Reference是一个引用,是对Provider端的一个服务实例的引用         //ReferenceConfig这个服务实例的引用的一些配置         //通过泛型传递了这个服务实例对外暴露的接口         ReferenceConfig<DemoService> reference = new ReferenceConfig<>();                  //设置应用名称         reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));          //设置注册中心的地址,默认是ZooKeeper         reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));          //设置元数据上报地址         reference.setMetadataReportConfig(new MetadataReportConfig("zookeeper://127.0.0.1:2181"));          //设置要调用的服务的接口         reference.setInterface(DemoService.class);          //直接通过ReferenceConfig的get()方法来拿到一个DemoService接口         //它是一个动态代理接口,只要被调用,便会通过底层调用Provider服务实例的对应接口         DemoService service = reference.get();         String message = service.sayHello("dubbo");         System.out.println(message);         Thread.sleep(10000000L);     } }

 

3.Dubbo服务发布的主流程

ServiceConfig的export()方法在进行服务发布时,首先会初始化相关组件,然后刷新服务实例,接着初始化服务实例,最后发布服务实例。

public class ServiceConfig<T> extends ServiceConfigBase<T> {     ...     @Override     public void export() {         if (this.exported) {             return;         }          //prepare for export         //对比Dubbo 2.6.x和2.7.x源码,Dubbo 3.0的变动还是有点大的,比如这里使用了ModuleDeployer组件         //Dubbo服务实例内部会有很多的代码组件,通过ModuleDeployer便可以避免零零散散的去调用和初始化          //1.执行相关组件的初始化         //通过获取到的ModuleDeployer来对相关组件进行初始化         //比如会对MetadataReport元数据上报组件进行构建和初始化,以及启动(建立跟zk的连接)         getScopeModel().getDeployer().start();          synchronized (this) {             if (this.exported) {                 return;             }              //2.执行服务实例的刷新操作             //也就是刷新ProviderConfig->MethodConfig->ArgumentConfig             if (!this.isRefreshed()) {                 this.refresh();             }             if (this.shouldExport()) {                 //3.执行服务实例的初始化                 //也就是会把Metadata元数据给准备好,后续可以进行元数据上报                 this.init();                  //这是Dubbo服务实例的延迟发布的特性                 //如果设置了Dubbo服务实例是延迟发布的,当调用了export()方法后,就会进入这里                 //在延迟指定的时间后,再去进行服务的发布                 if (shouldDelay()) {                     //延迟发布                     doDelayExport();                 } else {                     //立即发布                     //4.执行服务实例的发布                     //这里可以作为服务发布的直接入口                     doExport();                 }             }         }     }     ... }

 

4.服务发布时执行相关组件的初始化

public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> implements ModuleDeployer {     //已经完成发布的服务实例集合     private List<ServiceConfigBase<?>> exportedServices = new ArrayList<>();      //下面这些组件,本身都是跟model组件体系强关联的     private ModuleModel moduleModel;      //父级ApplicationDeployer组件     private ApplicationDeployer applicationDeployer;     ...      @Override     public Future start() throws IllegalStateException {         //initialize,maybe deadlock applicationDeployer lock & moduleDeployer lock         //调用DefaultApplicationDeployer的initialize()方法         applicationDeployer.initialize();         return startSync();     }      @Override     public void prepare() {         //module这个层级是application层级的下层         //application层级是framework层级的下层         applicationDeployer.initialize();         this.initialize();     }     ... }  public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationModel> implements ApplicationDeployer {     private final DubboShutdownHook dubboShutdownHook;     ...      @Override     public void initialize() {         if (initialized) {             return;         }         //ApplicationDeployer组件,可能会被多线程并发访问         //Ensure that the initialization is completed when concurrent calls         synchronized (startLock) {             if (initialized) {                 return;             }             //注册退出时需要进行资源销毁的ShutdownHook             //register shutdown hook             registerShutdownHook();              //启动配置中心ConfigCenter             //动配置中心是专门存放配置信息的             startConfigCenter();              //加载应用配置             loadApplicationConfigs();              //初始化ModuleDeployer             initModuleDeployers();              //启动元数据中心MetadataCenter             //元数据中心是专门存放发布d的服务实例信息的             startMetadataCenter();              initialized = true;              if (logger.isInfoEnabled()) {                 logger.info(getIdentifier() + " has been initialized!");             }         }          //老的Dubbo版本只有注册中心的概念         //后来随着版本的迭代和演进,出现了配置中心、元数据中心等这些用于解耦的概念         //虽然Dubbo的服务实例信息、配置信息、元数据信息,都可以放在zk里         //但如果把一个服务实例的各种数据和信息都存储在zk里,那么这些数据和信息会严重耦合在一起         //下面从扩展性和可用性两个层面来分析,如果把各种数据都耦合放在一个zk里可能会出现的问题         //(1)扩展性         //在一个大规模的微服务系统里,服务本身可能都有上百个,服务实例可能有几百上千个,所以服务实例数据可能就很多         //但每个服务实例关联的配置数据,可能不是太多,尤其是元数据可能也不是太多         //而这里的扩展性不是指功能上的扩展,而是指数据上的扩展         //随着服务实例数据在膨胀,配置数据和元数据虽然可能也在增加,但是增加的速度可能比不上服务实例数据         //此时就需要对注册中心进行扩容或者更换一个系统去存储         //而在这个过程之中,由于所有数据耦合在一起了,牵一发而动全身,不好扩展了         //这就是多种不同类型的数据耦合在一起时会出现的痛点,也就是数据耦合导致数据扩展性差的问题         //(2)可用性         //由于注册数据、配置数据、元数据都放在一个地方比如zk         //那么一旦zk出现了故障,这三种数据就一起没了,这就是可用性问题         //因此,Dubbo3的架构设计会对不同类型的数据进行分离,形成了三种数据:注册数据、元数据、配置数据         //于是就有了三大中心:注册中心、配置中心、元数据中心,这样就可以把三种不同类型的数据,放到不同的地方去         //在扩展性方面,当服务实例数据太多时要进行扩容或者切换存储技术,此时对另外两种数据是没有直接影响的         //在可用性方面,一旦作为注册中心的zk突然挂了,此时配置中心可能是Nacos,对它来说也没有直接影响的     }      private void registerShutdownHook() {         dubboShutdownHook.register();     }     ... }
Dubbo源码—1.服务发布的主要流程
public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationModel> implements ApplicationDeployer {     ...     //启动元数据中心     private void startMetadataCenter() {         //先分析元数据中心是否需要用注册中心来做元数据中心         useRegistryAsMetadataCenterIfNecessary();         ApplicationConfig applicationConfig = getApplication();          //获取元数据类型metadataType         String metadataType = applicationConfig.getMetadataType();          //进行元数据的配置,里面包含了MetadataReport的配置         Collection<MetadataReportConfig> metadataReportConfigs = configManager.getMetadataConfigs();         if (CollectionUtils.isEmpty(metadataReportConfigs)) {             return;         }          //从applicationModel中获取一个BeanFactory         //再根据BeanFactory获取一个元数据上报组件的实例MetadataReportInstance         MetadataReportInstance metadataReportInstance = applicationModel.getBeanFactory().getBean(MetadataReportInstance.class);         List<MetadataReportConfig> validMetadataReportConfigs = new ArrayList<>(metadataReportConfigs.size());         for (MetadataReportConfig metadataReportConfig : metadataReportConfigs) {             ConfigValidationUtils.validateMetadataConfig(metadataReportConfig);             validMetadataReportConfigs.add(metadataReportConfig);         }          //对于唯一的一个MetadataReport,会在这里进行初始化         //把配置的metadataReport地址和config传递进init()方法进行初始化         metadataReportInstance.init(validMetadataReportConfigs);         if (!metadataReportInstance.inited()) {             throw new IllegalStateException(String.format("%s MetadataConfigs found, but none of them is valid.", metadataReportConfigs.size()));         }         //所以MetadataReport的启动         //其实就是根据配置去获取对应的BeanFactory,然后通过BeanFactory创建出对应的MetadataReport实例         //最后根据配置对元数据上报组件的实例MetadataReportInstance进行初始化     }     ... }  public class MetadataReportInstance implements Disposable {     ...     public void init(List<MetadataReportConfig> metadataReportConfigs) {         if (!init.compareAndSet(false, true)) {             return;         }          this.metadataType = applicationModel.getApplicationConfigManager().getApplicationOrElseThrow().getMetadataType();         if (metadataType == null) {             this.metadataType = DEFAULT_METADATA_STORAGE_TYPE;         }          //在这里会通过SPI机制的adaptive自适应,生成一个代理类         //底层会通过自适应的机制,根据url里的参数去拿到对应的实现类,来调用它的方法         //如果使用zk作为元数据中心,那么拿到的应该是一个ZooKeeperMetadataReportFactory         MetadataReportFactory metadataReportFactory = applicationModel.getExtensionLoader(MetadataReportFactory.class).getAdaptiveExtension();         for (MetadataReportConfig metadataReportConfig : metadataReportConfigs) {             init(metadataReportConfig, metadataReportFactory);         }     }      private void init(MetadataReportConfig config, MetadataReportFactory metadataReportFactory) {         ...         //这种url一般来说是针对zk的url地址         MetadataReport metadataReport = metadataReportFactory.getMetadataReport(url);         if (metadataReport != null) {             metadataReports.put(relatedRegistryId, metadataReport);         }     }     ... }  public abstract class AbstractMetadataReportFactory implements MetadataReportFactory {     ...     @Override     public MetadataReport getMetadataReport(URL url) {         ...         metadataReport = createMetadataReport(url);         ...     }      protected abstract MetadataReport createMetadataReport(URL url);     ... }  public class ZookeeperMetadataReportFactory extends AbstractMetadataReportFactory {     ...     @Override     public MetadataReport createMetadataReport(URL url) {         return new ZookeeperMetadataReport(url, zookeeperTransporter);     }     ... }  public class ZookeeperMetadataReport extends AbstractMetadataReport {     private final String root;     ZookeeperClient zkClient;     ...      public ZookeeperMetadataReport(URL url, ZookeeperTransporter zookeeperTransporter) {         super(url);         ...         zkClient = zookeeperTransporter.connect(url);     }     ... }

 

5.服务发布时执行的服务实例刷新操作

完成相关组件的初始化后,便会调用ServiceConfig的refresh()方法执行服务实例的刷新操作。

public class ServiceConfig<T> extends ServiceConfigBase<T> {     ...     @Override     public void export() {         ...         //1.执行相关组件的初始化         //通过获取到的ModuleDeployer来对相关组件进行初始化         //比如会对MetadataReport元数据上报组件进行构建和初始化,以及启动(建立跟zk的连接)         getScopeModel().getDeployer().start();         synchronized (this) {             if (this.exported) {                 return;             }             //2.执行服务实例的刷新操作             //也就是刷新ProviderConfig->MethodConfig->ArgumentConfig             if (!this.isRefreshed()) {                 //执行AbstractConfig的refresh()方法                 this.refresh();             }             ...         }     }     ... }  public abstract class AbstractConfig implements Serializable {     ...     public void refresh() {         //check and init before do refresh         //调用AbstractConfig的子类ServiceConfigBase的preProcessRefresh()方法         //初始化一个ProviderConfig,也就是Provider服务实例         preProcessRefresh();          //Model组件体系对Dubbo的运行很关键,可以认为它是SPI机制使用的入口         //而ScopeModel是Model组件体系的一个基础,ScopeModel类型是可以转换为ModuleModel、ApplicationModel         //比如像ModuleServiceRepository、ModelEnvironment、BeanFactory等很多通用的组件都可以通过ScopeModel去获取         Environment environment = getScopeModel().getModelEnvironment();//获取Environment对象         List<Map<String, String>> configurationMaps = environment.getConfigurationMaps();          //Search props starts with PREFIX in order         //接下来要获取和拼接preferredPrefix         String preferredPrefix = null;         List<String> prefixes = getPrefixes();         for (String prefix : prefixes) {             if (ConfigurationUtils.hasSubProperties(configurationMaps, prefix)) {                 preferredPrefix = prefix;                 break;             }         }         if (preferredPrefix == null) {             preferredPrefix = prefixes.get(0);         }          ...         //使用反射注入需要的方法         assignProperties(this, environment, subProperties, subPropsConfiguration);          //process extra refresh of subclass, e.g. refresh method configs         //调用AbstractInterfaceConfig的processExtraRefresh()方法         //该方法中preferredPrefix是关键,它的值可能是:dubbo.service.org.apache.dubbo.demo.DemoService         //其中dubbo.service代表dubbo服务名称的一个固定前缀,属于固定拼接的         //而中间的org.apache.dubbo.demo,则是从服务接口所在包名里截取出来的,并且最后会加上服务接口的接口名         //所以preferredPrefix会作为当前dubbo服务的全限定的名字         //而这段refresh的代码的作用,就是处理这个preferredPrefix以及其他相关的配置信息         processExtraRefresh(preferredPrefix, subPropsConfiguration);          postProcessRefresh();         refreshed.set(true);     }      protected void preProcessRefresh() {         // pre-process refresh     }      protected void processExtraRefresh(String preferredPrefix, InmemoryConfiguration subPropsConfiguration) {         // process extra refresh     }            protected void postProcessRefresh() {         // post-process refresh         checkDefault();     }     ... }  public abstract class AbstractInterfaceConfig extends AbstractMethodConfig {     private List<MethodConfig> methods;     ...      //该方法会通过反射技术,对需要发布的服务的接口方法和参数封装成MethodConfig、ArgumentConfig     @Override     protected void processExtraRefresh(String preferredPrefix, InmemoryConfiguration subPropsConfiguration) {         if (StringUtils.hasText(interfaceName)) {             //通过反射技术获取需要发布的服务的接口             Class<?> interfaceClass;             interfaceClass = ClassUtils.forName(interfaceName);             ...              //Auto create MethodConfig/ArgumentConfig according to config props             Map<String, String> configProperties = subPropsConfiguration.getProperties();             //获取需要发布的服务的接口的所有方法             Method[] methods;             methods = interfaceClass.getMethods();              //接下来对需要发布的服务的接口方法进行处理             //整理出MethodConfig对象及其对应的ArgumentConfig对象             //接口里的每个方法都要创建一个MethodConfig             //方法里的每一个参数都要创建一个ArgumentConfig             for (Method method : methods) {                 //因为服务端每次处理客户端调用时,不可能都通过反射来获取method和args的情况                 //所以在刚开始启动时就需要对接口进行解析,将所有的method和args整理到methods属性中                 if (ConfigurationUtils.hasSubProperties(configProperties, method.getName())) {                     MethodConfig methodConfig = getMethodByName(method.getName());                     //Add method config if not found                     if (methodConfig == null) {                         //需要发布的的服务的每个方法,都创建一个MethodConfig对象                         methodConfig = new MethodConfig();                         methodConfig.setName(method.getName());                         //将MethodConfig对象添加到methods属性中                         this.addMethod(methodConfig);                     }                     //Add argument config                     //dubbo.service.{interfaceName}.{methodName}.{arg-index}.xxx=xxx                     java.lang.reflect.Parameter[] arguments = method.getParameters();                     for (int i = 0; i < arguments.length; i++) {                         if (getArgumentByIndex(methodConfig, i) == null && hasArgumentConfigProps(configProperties, methodConfig.getName(), i)) {                             //方法里的每个args参数,都创建一个ArgumentConfig对象                             ArgumentConfig argumentConfig = new ArgumentConfig();                             argumentConfig.setIndex(i);                             //将ArgumentConfig对象添加到MethodConfig对象中                             methodConfig.addArgument(argumentConfig);                         }                     }                 }             }              //refresh MethodConfigs,刷新刚才解析出来的MethodConfig对象             List<MethodConfig> methodConfigs = this.getMethods();             if (methodConfigs != null && methodConfigs.size() > 0) {                 //whether ignore invalid method config                 Object ignoreInvalidMethodConfigVal = getEnvironment().getConfiguration()                     .getProperty(ConfigKeys.DUBBO_CONFIG_IGNORE_INVALID_METHOD_CONFIG, "false");                 boolean ignoreInvalidMethodConfig = Boolean.parseBoolean(ignoreInvalidMethodConfigVal.toString());                 Class<?> finalInterfaceClass = interfaceClass;                 List<MethodConfig> validMethodConfigs = methodConfigs.stream().filter(methodConfig -> {                     methodConfig.setParentPrefix(preferredPrefix);                     //关联Model组件                     methodConfig.setScopeModel(getScopeModel());                     methodConfig.refresh();                     //verify method config                     return verifyMethodConfig(methodConfig, finalInterfaceClass, ignoreInvalidMethodConfig);                 }).collect(Collectors.toList());                 this.setMethods(validMethodConfigs);             }         }     }      public void addMethod(MethodConfig methodConfig) {         if (this.methods == null) {             this.methods = new ArrayList<>();         }         this.methods.add(methodConfig);     }     ... }
Dubbo源码—1.服务发布的主要流程

 

6.服务发布时执行的服务实例初始化操作

public class ServiceConfig<T> extends ServiceConfigBase<T> {     ...     @Override     public void export() {         ...         synchronized (this) {             ...             if (this.shouldExport()) {                 //3.执行服务实例的初始化                 //也就是会把Metadata元数据给准备好,后续可以进行元数据上报                 this.init();                 ...             }         }     }      public void init() {         //通过SPI机制获取ServiceListener扩展点的所有实现类实例         //然后添加到ServiceConfig的serviceListeners字段里         if (this.initialized.compareAndSet(false, true)) {             //load ServiceListeners from extension             ExtensionLoader<ServiceListener> extensionLoader = this.getExtensionLoader(ServiceListener.class);             this.serviceListeners.addAll(extensionLoader.getSupportedExtensionInstances());         }          //初始化ServiceMetadata,也就是服务元数据         //这需要与前面设置的MetadataCenter元数据中心配合起来看         //ServiceMetadata作为服务实例的元数据,会对服务实例做一些描述,比如版本号、实现类等         initServiceMetadata(provider);         serviceMetadata.setServiceType(getInterfaceClass());         serviceMetadata.setTarget(getRef());         serviceMetadata.generateServiceKey();     }     ... }
Dubbo源码—1.服务发布的主要流程

 

7.服务发布时执行的服务实例发布操作

首先调用ServiceConfig的doExportUrls()方法发布服务,然后再调用其exported()方法进行服务发布后的处理,比如打印日志和回调监听器。

public class ServiceConfig<T> extends ServiceConfigBase<T> {     ...     @Override     public void export() {         ...         //4.执行服务实例的发布         //进行debug时,可以通过控制台打印的日志去分析运行流程         //比如通过log日志可以发现服务发布的流程可能涉及:         //一.export Dubbo Service,发布dubbo服务实例         //二.register Dubbo Service,往zk进行注册         //三.启动NettyServer,监听端口和请求处理         //四.服务发现注册         //五.MetadataReport:服务实例上报         //六.关闭JVM时的逆向处理过程         //这里可以作为一个服务发布的直接入口         doExport();     }      protected synchronized void doExport() {         ...         //发布服务         doExportUrls();         //服务发布完成后的处理,比如打印日志和回调监听器         exported();     }     ... }

 

8.执行服务实例发布操作时的主流程

首先通过ScopeModel组件体系获取服务数据存储组件,然后将要发布的服务注册到服务数据存储组件里,接着把相关信息封装成一个服务提供者,并将该服务提供者也注册到服务数据存储组件中,然后生成注册的URL,最后根据协议和生成的注册的URL来发布服务。

Dubbo源码—1.服务发布的主要流程
public class ServiceConfig<T> extends ServiceConfigBase<T> {     ...       @SuppressWarnings({"unchecked", "rawtypes"})     private void doExportUrls() {         //所谓的ScopeModel,真实的类型是ModuleModel         //下面这行代码会通过AbstractMethodConfig的getScopeModel()方法获取ScopeModel         //接着会通过ModuleModel的getServiceRepository()方法去获取ServiceRepository         //事实上,Dubbo会把它的各个组件都集中在ScopeModel(ModuleModel)里,而ScopeModel就类似于设计模式里的门面模式         //ScopeModel、ModuleModel、ApplicationModel、FrameworkModel等多个Model会组成一个Model组件体系          //1.通过ScopeModel组件体系获取服务数据存储组件ModuleServiceRepository         ModuleServiceRepository repository = getScopeModel().getServiceRepository();          //ServiceRepository是Dubbo服务的数据存储组件         //一个系统可以发布多个Dubbo服务         //每个Dubbo服务的核心就是一个接口和一个实现类          //2.把当前要发布的服务注册到Dubbo的服务数据存储组件中         ServiceDescriptor serviceDescriptor;         final boolean serverService = ref instanceof ServerService;         if (serverService) {             serviceDescriptor = ((ServerService) ref).getServiceDescriptor();             repository.registerService(serviceDescriptor);         } else {             serviceDescriptor = repository.registerService(getInterfaceClass());         }          //ProviderModel也就是服务提供者,由于这里是暴露服务出去的,所以属于Provider         //3.把所有相关的信息封装成一个服务提供者ProviderModel         providerModel = new ProviderModel(             serviceMetadata.getServiceKey(),             ref,//ref代表的是实际实现的类,通过泛型传入             serviceDescriptor,//表示服务实例相关的信息             getScopeModel(),             serviceMetadata,              interfaceClassLoader         );         providerModel.setConfig(this);         providerModel.setDestroyRunner(getDestroyRunner());         //3.将服务提供者ProviderModel注册到服务数据存储组件中         repository.registerProvider(providerModel);          //4.生成注册的URL:包含2181的端口号、注册到zk中                 //service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=989®istry=zookeeper×tamp=1724302222103        //registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=989®istry=zookeeper×tamp=1724302222103         List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);          for (ProtocolConfig protocolConfig : protocols) {             String pathKey = URL.buildKey(                 getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path),                  group,                  version             );             //stub service will use generated service name             if (!serverService) {                 //In case user specified path, register service one more time to map it to path.                 //将接口注册到服务数据存储组件中                 repository.registerService(pathKey, interfaceClass);             }             //5.调用doExportUrlsFor1Protocol()方法,根据协议和注册的URL来发布服务             doExportUrlsFor1Protocol(protocolConfig, registryURLs);         }         providerModel.setServiceUrls(urls);     }      private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {         Map<String, String> map = buildAttributes(protocolConfig);         //remove null key and null value         map.keySet().removeIf(key -> StringUtils.isEmpty(key) || StringUtils.isEmpty(map.get(key)));         //init serviceMetadata attachments         //将map数据放入serviceMetadata中,这与元数据相关         serviceMetadata.getAttachments().putAll(map);         //根据ProtocolConfig构建URL         URL url = buildUrl(protocolConfig, map);         //发布服务         exportUrl(url, registryURLs);     }     ... }  //ScopeModel、ModuleModel、ApplicationModel、FrameworkModel等多个Model会组成一个Model组件体系 public class ModuleModel extends ScopeModel {     public static final String NAME = "ModuleModel";     //ApplicationModel内部封装了其他很多组件,在这里是一个引用关系,通过构造方法传入进来     private final ApplicationModel applicationModel;     //包含了ServiceModule环境相关的数据,里面封装的都是各种各样的配置信息     private ModuleEnvironment moduleEnvironment;     //serviceRepository是一个服务仓储组件,存储了一些服务相关的数据     private ModuleServiceRepository serviceRepository;     //这是module配置管理器,用于存放一些服务相关的配置数据     private ModuleConfigManager moduleConfigManager;     //这是ModuleDeployer组件,用于管理其他的一些组件和模块的生命周期     private ModuleDeployer deployer;     ... }  public class ApplicationModel extends ScopeModel {     ...     //包含了多个ModuleModel     private final List<ModuleModel> moduleModels = new CopyOnWriteArrayList<>();     private final List<ModuleModel> pubModuleModels = new CopyOnWriteArrayList<>();     //环境变量、配置信息     private Environment environment;     //服务配置相关的一些信息     private ConfigManager configManager;     //服务数据相关的一些存储     private ServiceRepository serviceRepository;     //属于application层级的一些组件的生命周期管理     private ApplicationDeployer deployer;     //父级组件     private final FrameworkModel frameworkModel;     //内部的一个ModuleModel组件     private ModuleModel internalModule;     //默认的一个ModuleModel组件     private volatile ModuleModel defaultModule;     //internal module index is 0, default module index is 1     private AtomicInteger moduleIndex = new AtomicInteger(0);     //是一个锁     private Object moduleLock = new Object();     ... }  public class FrameworkModel extends ScopeModel {     ...     //它没有父层级了,所以只能通过static静态变量,类级别去引用自己的FrameworkModel集合     private static List<FrameworkModel> allInstances = new CopyOnWriteArrayList<>();     //包含了多个ApplicationModel     private List<ApplicationModel> applicationModels = new CopyOnWriteArrayList<>();     //通过Framework、Application、Module各个层级都可以获取到service相关的配置和数据     private FrameworkServiceRepository serviceRepository;     ... }  public class ModuleServiceRepository {     private final ModuleModel moduleModel;     //services,代表服务相关的数据,StubServiceDescriptor     private final ConcurrentMap<String, List<ServiceDescriptor>> services = new ConcurrentHashMap<>();     //consumers(key - group/interface:version, value - consumerModel list)     //代表服务的调用方(consumer即消费方/调用方)     private final ConcurrentMap<String, List<ConsumerModel>> consumers = new ConcurrentHashMap<>();     //providers,代表服务提供方     private final ConcurrentMap<String, ProviderModel> providers = new ConcurrentHashMap<>();     //FrameworkServiceRepository存储的也是一些服务相关的数据     private final FrameworkServiceRepository frameworkServiceRepository;     ... }  public class ModuleServiceRepository {     //services,代表服务相关的数据,StubServiceDescriptor     private final ConcurrentMap<String, List<ServiceDescriptor>> services = new ConcurrentHashMap<>();     ...      public ServiceDescriptor registerService(ServiceDescriptor serviceDescriptor) {         return registerService(serviceDescriptor.getServiceInterfaceClass(), serviceDescriptor);     }      public ServiceDescriptor registerService(Class<?> interfaceClazz) {         ServiceDescriptor serviceDescriptor = new ReflectionServiceDescriptor(interfaceClazz);         return registerService(interfaceClazz, serviceDescriptor);     }      public ServiceDescriptor registerService(Class<?> interfaceClazz, ServiceDescriptor serviceDescriptor) {         List<ServiceDescriptor> serviceDescriptors = services.computeIfAbsent(interfaceClazz.getName(), k -> new CopyOnWriteArrayList<>());         synchronized (serviceDescriptors) {             Optional<ServiceDescriptor> previous = serviceDescriptors.stream()                 .filter(s -> s.getServiceInterfaceClass().equals(interfaceClazz)).findFirst();             if (previous.isPresent()) {                 return previous.get();             } else {                 serviceDescriptors.add(serviceDescriptor);                 return serviceDescriptor;             }         }     }     ... }

 

9.服务发布过程中ProxyFactory生成Invoker

Dubbo源码—1.服务发布的主要流程
public class ServiceConfig<T> extends ServiceConfigBase<T> {     ...     private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {         Map<String, String> map = buildAttributes(protocolConfig);         //remove null key and null value         map.keySet().removeIf(key -> StringUtils.isEmpty(key) || StringUtils.isEmpty(map.get(key)));         //init serviceMetadata attachments         //将map数据放入serviceMetadata中,这与元数据相关         serviceMetadata.getAttachments().putAll(map);         //根据ProtocolConfig构建URL         URL url = buildUrl(protocolConfig, map);         //发布服务         exportUrl(url, registryURLs);     }      private URL buildUrl(ProtocolConfig protocolConfig, Map<String, String> params) {         //获取协议名称         String name = protocolConfig.getName();         if (StringUtils.isEmpty(name)) {             //默认使用Dubbo协议             name = DUBBO;         }         //获取host值         String host = findConfiguredHosts(protocolConfig, provider, params);         //获取port值         Integer port = findConfiguredPort(protocolConfig, provider, this.getExtensionLoader(Protocol.class), name, params);         //根据上面获取的host、port以及前文获取的map集合组装URL         URL url = new ServiceConfigURL(name, null, null, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), params);         //通过Configurator覆盖或添加新的参数         if (this.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {             url = this.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);         }         url = url.setScopeModel(getScopeModel());         url = url.setServiceModel(providerModel);         return url;     }      private void exportUrl(URL url, List<URL> registryURLs) {         //从URL中获取scope参数,其中可选值有none、remote、local三个,分别代表不发布、发布到本地以及发布到远端         String scope = url.getParameter(SCOPE_KEY);         //scope不为none,才进行发布         if (!SCOPE_NONE.equalsIgnoreCase(scope)) {             //scope为local,只发布到本地             if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {                 exportLocal(url);             }             //export to remote if the config is not local (export to local only when config is local)             //scope为remote,发布到远端的注册中心             if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {                 //进行远程发布                 url = exportRemote(url, registryURLs);                 if (!isGeneric(generic) && !getScopeModel().isInternal()) {                     //通过MetadataUtils推送这个服务实例的元数据到元数据中心                     //元数据中心是一个动态的配置中心,可以从里面获取参数,也可以添加监听器监听配置项的变更                     MetadataUtils.publishServiceDefinition(url, providerModel.getServiceModel(), getApplicationModel());                 }             }         }         this.urls.add(url);     }      private void exportLocal(URL url) {         //创建新URL         URL local = URLBuilder.from(url)             .setProtocol(LOCAL_PROTOCOL)             .setHost(LOCALHOST_VALUE)             .setPort(0)             .build();         local = local.setScopeModel(getScopeModel()).setServiceModel(providerModel);         //本地发布         doExportUrl(local, false);         //exportLocal,指的是发布到本地         //具体就是在jvm内部完成了组件之间的一些交互关系和发布         logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);     }      private URL exportRemote(URL url, List<URL> registryURLs) {         //如果当前配置了至少一个注册中心         if (CollectionUtils.isNotEmpty(registryURLs)) {             //URL里有很多的信息,比如协议、各种参数等             //URL可以在后续代码运行过程中提供配置和信息             //接下来会向每个注册中心发布服务             for (URL registryURL : registryURLs) {                 //registryURL.getProtocol()会获取协议                 if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {                     url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");                 }                 //injvm协议只在exportLocal()中有用,不会将服务发布到注册中心,所以这里忽略injvm协议                 if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {                     continue;                 }                 //设置服务URL的dynamic参数                 url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));                 //创建monitorUrl,并作为monitor参数添加到服务URL中                 URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);                 if (monitorUrl != null) {                     url = url.putAttribute(MONITOR_KEY, monitorUrl);                 }                 //For providers, this is used to enable custom proxy to generate invoker                 //设置服务URL的proxy参数,即生成动态代理方式(jdk或是javassist),作为参数添加到RegistryURL中                 String proxy = url.getParameter(PROXY_KEY);                 if (StringUtils.isNotEmpty(proxy)) {                     registryURL = registryURL.addParameter(PROXY_KEY, proxy);                 }                 doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);             }         } else {             //不存在注册中心,仅发布服务,不会将服务信息发布到注册中心             doExportUrl(url, true);         }         return url;     }      private void doExportUrl(URL url, boolean withMetaData) {         //动态代理技术有很多种,比如cglib,jdk         //而动态代理就是:面向一个接口,动态生成该接口的一个实现类,然后根据这个实现类再动态生成对应的对象         //这个对象就是动态代理的对象,所以该对象会代理自己背后的一个实现类         //当这个对象被调用时,背后的实现类也会被调用         //ProxyFactory,Proxy就是动态代理         //下面传入的ref指的是实现类         //下面传入的interfaceClass指的是接口         //下面传入的url就是服务实例对外暴露出去的一些核心信息         //proxyFactory.getInvoker()获取到的是Invoker调用组件         //当Dubbo的NettyServer监听到网络连接进行请求处理时,需要有一个调用组件去根据请求进行调用         //Invoker调用组件可以认为是ProxyFactory基于DemoService接口生成的动态代理         //当需要根据请求调用接口时,底层就会回调自己写的实现类DemoServiceImpl         //proxyFactory.getInvoker()会封装一个AbstractProxyInvoker,对本地实现类进行代理         //默认情况下,会通过Javassist技术生成Wrapper,该Wrapper会将本地实现类包装进去         //调用AbstractProxyInvoker的invoke方法时,最终就会基于Javassist动态生成的Wrapper进行调用         //下面这一行代码是为服务实现类的对象创建相应的Invoker         //其中传入的服务url会作为export参数添加到RegistryURL中         //这里的proxyFactory就是ProxyFactory接口的适配器,会通过SPI机制进行初始化         //比如下面就会调用JavassistProxyFactory.getInvoker()方法         //proxyFactory.getInvoker()会获取到Invoker调用组件(生成Invoker动态代理)         //所以下面这一行代码会为本地实现类的对象创建相应的Invoker(封装着AbstractProxyInvoker)         Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);         if (withMetaData) {             //DelegateProviderMetaDataInvoker是个装饰类             //它会将当前ServiceConfig和Invoker关联起来             invoker = new DelegateProviderMetaDataInvoker(invoker, this);         }          //调用Protocol的实现进行发布,protocolSPI是Protocol接口的适配器         //进行本地发布时,使用的是InjvmProtocol + InjvmExporter         //进行远程发布时,使用的是RegistryProtocol,它会对DubboProtocol进行包装和装饰         //RegistryProtocol会先执行来处理服务注册的一些事情         //DubboProtocol会后执行来启动NettyServer网络服务器         Exporter<?> exporter = protocolSPI.export(invoker);         exporters.add(exporter);     }     ... }  public class JavassistProxyFactory extends AbstractProxyFactory {     ...     @Override     public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {         //下面会通过Wrapper创建一个包装类对象         //该对象是动态构建出来的,它属于Wrapper的一个子类,里面会拼接一个关键的方法invokeMethod(),拼接代码由javassist动态生成         final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);          //下面会创建一个实现了AbstractProxyInvoker的匿名内部类         //其doInvoker()方法会直接委托给Wrapper对象的invokeMethod()方法         return new AbstractProxyInvoker<T>(proxy, type, url) {             @Override             protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {                 //当AbstractProxyInvoker.invoke()方法被调用时,便会执行到这里                 //这里会通过类似于JDK反射的技术,调用本地实现类如DemoServiceImpl.sayHello()                 //这个wrapper对象是由javassist技术动态生成的,已经对本地实现类进行包装                 //这个动态生成的wrapper对象会通过javassist技术自己特有的方法                 //在invokerMethod()方法被调用时执行本地实现类的目标方法                 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);             }         };     }     ... }  public abstract class AbstractProxyInvoker<T> implements Invoker<T> {     ...     //当Netty Server接受到了请求后,经过解析就会知道是要调用什么     //然后会把解析出来的数据放入Invocation中,通过AbstractProxyInvoker的invoke()方法来进行调用     @Override     public Result invoke(Invocation invocation) throws RpcException {         ...         //执行doInvoke()方法,调用业务实现         Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());         ...         //将value值封装成CompletableFuture对象         CompletableFuture<Object> future = wrapWithFuture(value, invocation);         //再次转换,转换为CompletableFuture<AppResponse>类型         CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {         AppResponse result = new AppResponse(invocation);         ...         //将CompletableFuture封装成AsyncRpcResult返回         return new AsyncRpcResult(appResponseFuture, invocation);     }      protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;     ... }

 

10.服务发布过程中Protocol组件发布Invoker

(1)Protocol协议接口

(2)Protocol组件发布Invoker

 

(1)Protocol协议接口

@SPI(value = "dubbo", scope = ExtensionScope.FRAMEWORK) public interface Protocol {     //默认端口     int getDefaultPort();      //Protocol接收到一个请求之后,必须要记录请求的源地址     //对同一个服务实例(url)发布一次和发布多次,是没有任何区别的     //export()方法会将一个Invoker发布出去     //export()方法的实现需要是幂等的,即同一个服务暴露多次和暴露一次的效果是相同的     @Adaptive     <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;      //Protocol必须根据一个url和接口类型,获取到对应的Invoker     //refer()方法会引用一个Invoker     //refer()方法会根据参数返回一个Invoker对象,Consumer端可以通过这个Invoker请求到Provider端的服务     @Adaptive     <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;      //销毁export()方法以及refer()方法使用到的Invoker对象,释放当前Protocol对象底层占用的资源     void destroy();      //返回当前Protocol底层的全部ProtocolServer     default List<ProtocolServer> getServers() {         return Collections.emptyList();     } }
Dubbo源码—1.服务发布的主要流程

(2)Protocol组件发布Invoker

本地发布时使用InjvmProtocol + InjvmExporter,远程发布时使用RegistryProtocol + DestroyableExporter。

 

RegistryProtocol的export()方法被远程发布调用的时候,会调用到DubboProtocol的export()方法,并最终调用到HeaderExchanger的bind()方法执行NettyTransporter的bind()方法构建Netty服务器。

Dubbo源码—1.服务发布的主要流程
public class ServiceConfig<T> extends ServiceConfigBase<T> {     ...     private void doExportUrl(URL url, boolean withMetaData) {         //动态代理技术有很多种,比如cglib,jdk         //而动态代理就是:面向一个接口,动态生成该接口的一个实现类,然后根据这个实现类再动态生成对应的对象         //这个对象就是动态代理的对象,所以该对象会代理自己背后的一个实现类         //当这个对象被调用时,背后的实现类也会被调用          //ProxyFactory,Proxy就是动态代理         //下面传入的ref指的是实现类         //下面传入的interfaceClass指的是接口         //下面传入的url就是服务实例对外暴露出去的一些核心信息          //proxyFactory.getInvoker()获取到的是Invoker调用组件         //当Dubbo的NettyServer监听到网络连接进行请求处理时,需要有一个调用组件去根据请求进行调用         //Invoker调用组件可以认为是ProxyFactory基于DemoService接口生成的动态代理         //当需要根据请求调用接口时,底层就会回调自己写的实现类DemoServiceImpl          //proxyFactory.getInvoker()会封装一个AbstractProxyInvoker,对本地实现类进行代理         //默认情况下,会通过Javassist技术生成Wrapper,该Wrapper会将本地实现类包装进去         //调用AbstractProxyInvoker的invoke方法时,最终就会基于Javassist动态生成的Wrapper进行调用          //下面这一行代码是为服务实现类的对象创建相应的Invoker         //其中传入的服务url会作为export参数添加到RegistryURL中         //这里的proxyFactory就是ProxyFactory接口的适配器,会通过SPI机制进行初始化         //比如下面就会调用JavassistProxyFactory.getInvoker()方法         //proxyFactory.getInvoker()会获取到Invoker调用组件(生成Invoker动态代理)         //所以下面这一行代码会为本地实现类的对象创建相应的Invoker(封装着AbstractProxyInvoker)         Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);         if (withMetaData) {             //DelegateProviderMetaDataInvoker是个装饰类             //它会将当前ServiceConfig和Invoker关联起来             invoker = new DelegateProviderMetaDataInvoker(invoker, this);         }          //调用Protocol的实现进行发布,protocolSPI是Protocol接口的适配器         //进行本地发布时,使用的是InjvmProtocol + InjvmExporter         //进行远程发布时,使用的是RegistryProtocol,它会对DubboProtocol进行包装和装饰         //RegistryProtocol会先执行来处理服务注册的一些事情         //DubboProtocol会后执行来启动NettyServer网络服务器         Exporter<?> exporter = protocolSPI.export(invoker);         exporters.add(exporter);     }     ... }
//-> Protocol$Adaptive.export() //-> ProtocolSerializationWrapper.export() //-> ProtocolFilterWrapper.export() //-> ProtocolListenerWrapper.export() //-> InjvmProtocol.export() public class InjvmProtocol extends AbstractProtocol implements Protocol {     ...     @Override     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {         return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);     }     ... }  public class InjvmExporter<T> extends AbstractExporter<T> {     private final String key;     //这就是在JVM里存放了     private final Map<String, Exporter<?>> exporterMap;      InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {         super(invoker);         this.key = key;         this.exporterMap = exporterMap;         exporterMap.put(key, this);     }     ... }
//-> Protocol$Adaptive.export() //-> ProtocolSerializationWrapper.export() //-> ProtocolFilterWrapper.export() //-> ProtocolListenerWrapper.export() //-> RegistryProtocol.export() //-> Protocol$Adaptive.export() //-> ProtocolSerializationWrapper.export() //-> ProtocolFilterWrapper.export() //-> ProtocolListenerWrapper.export() //-> DubboProtocol.export() //-> DubboProtocol.openServer() //-> DubboProtocol.createServer() //-> Exchangers.bind() //-> HeaderExchanger.bind() //-> Transporters.bind() //-> NettyTransporter.bind() //-> NettyServer  @Activate public class ProtocolSerializationWrapper implements Protocol {     private Protocol protocol;      public ProtocolSerializationWrapper(Protocol protocol) {         this.protocol = protocol;     }      @Override     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {         getFrameworkModel(invoker.getUrl().getScopeModel()).getServiceRepository().registerProviderUrl(invoker.getUrl());         //下面会调用ProtocolFilterWrapper.export()方法         return protocol.export(invoker);     }     ... }  @Activate(order = 100) public class ProtocolFilterWrapper implements Protocol {     private final Protocol protocol;      public ProtocolFilterWrapper(Protocol protocol) {         if (protocol == null) {             throw new IllegalArgumentException("protocol == null");         }         this.protocol = protocol;     }      @Override     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {         if (UrlUtils.isRegistry(invoker.getUrl())) {             return protocol.export(invoker);         }          FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());         //下面会调用ProtocolListenerWrapper.export()方法         return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));     }      private <T> FilterChainBuilder getFilterChainBuilder(URL url) {         return ScopeModelUtil.getExtensionLoader(FilterChainBuilder.class, url.getScopeModel()).getDefaultExtension();     }     ... }  @Activate public class DefaultFilterChainBuilder implements FilterChainBuilder {     //build consumer/provider filter chain     @Override     public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {         Invoker<T> last = originalInvoker;         URL url = originalInvoker.getUrl();         List<ModuleModel> moduleModels = getModuleModelsFromUrl(url);         List<Filter> filters;          //通过SPI机制来获取Filter         if (moduleModels != null && moduleModels.size() == 1) {             filters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModels.get(0)).getActivateExtension(url, key, group);         } else if (moduleModels != null && moduleModels.size() > 1) {             filters = new ArrayList<>();             List<ExtensionDirector> directors = new ArrayList<>();             for (ModuleModel moduleModel : moduleModels) {                 List<Filter> tempFilters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModel).getActivateExtension(url, key, group);                 filters.addAll(tempFilters);                 directors.add(moduleModel.getExtensionDirector());             }             filters = sortingAndDeduplication(filters, directors);         } else {             filters = ScopeModelUtil.getExtensionLoader(Filter.class, null).getActivateExtension(url, key, group);         }          //构建Filter链条         if (!CollectionUtils.isEmpty(filters)) {             for (int i = filters.size() - 1; i >= 0; i--) {                 final Filter filter = filters.get(i);                 final Invoker<T> next = last;                 last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);             }             return new CallbackRegistrationInvoker<>(last, filters);         }         return last;     }     ... }  @Activate(order = 200) public class ProtocolListenerWrapper implements Protocol {     private final Protocol protocol;      public ProtocolListenerWrapper(Protocol protocol) {         if (protocol == null) {             throw new IllegalArgumentException("protocol == null");         }         this.protocol = protocol;     }      @Override     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {         if (UrlUtils.isRegistry(invoker.getUrl())) {             return protocol.export(invoker);         }          //下面会调用RegistryProtocol.export()         return new ListenerExporterWrapper<T>(protocol.export(invoker),             Collections.unmodifiableList(ScopeModelUtil.getExtensionLoader(ExporterListener.class, invoker.getUrl().getScopeModel())             .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));     }     ... }  public class RegistryProtocol implements Protocol, ScopeModelAware {     ...     @Override     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {         ...         //export invoker         //下面进行导出服务,底层会通过会执行DubboProtocol.export()方法,启动对应的Server         //也就是会涉及到对另外一个protocol组件的调用,远程发布服务时其实就是执行DubboProtocol的export方法         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);         ...     }      private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {         String key = getCacheKey(originInvoker);         return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {             Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);             //下面会调用DubboProtocol的export()方法             return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);         });     }     ... }  public class DubboProtocol extends AbstractProtocol {     ...     @Override     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {         checkDestroyed();         URL url = invoker.getUrl();          //export service.         //创建ServiceKey         String key = serviceKey(url);         //exporter组件,代表了指定的invoker被发布出去         //下面代码会将上层传入的Invoker对象封装成DubboExporter对象,然后记录到exporterMap集合中         DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);         ...         //启动ProtocolServer,这个就是打开对外的网络服务器,可以对外提供网络请求处理         openServer(url);         //序列化的优化处理         optimizeSerialization(url);          return exporter;     }      private void openServer(URL url) {         checkDestroyed();         //find server.         //获取host:port这个地址         String key = url.getAddress();         //client can export a service which only for server to invoke         boolean isServer = url.getParameter(IS_SERVER_KEY, true);          //只有Server端才能启动Server对象         if (isServer) {             ProtocolServer server = serverMap.get(key);             //无ProtocolServer监听该地址             if (server == null) {                 //DoubleCheck,防止并发问题                 synchronized (this) {                     server = serverMap.get(key);                     if (server == null) {                         //调用createServer()方法创建ProtocolServer对象                         serverMap.put(key, createServer(url));                         return;                     }                 }             }             //server supports reset, use together with override             server.reset(url);         }     }      private ProtocolServer createServer(URL url) {         ...         ExchangeServer server;         //通过Exchangers门面类,创建ExchangeServer对象         server = Exchangers.bind(url, requestHandler);         ...         //将ExchangeServer封装成DubboProtocolServer返回         DubboProtocolServer protocolServer = new DubboProtocolServer(server);         loadServerProperties(protocolServer);         return protocolServer;     }     ... }  public class DubboExporter<T> extends AbstractExporter<T> {     private final String key;     private final Map<String, Exporter<?>> exporterMap;      public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {         super(invoker);         this.key = key;         this.exporterMap = exporterMap;         exporterMap.put(key, this);     }     ... }  public class Exchangers {     ...     public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {         url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");         //先获取到一个Exchanger组件,再用这个Exchanger组件去进行bind,拿到对应的ExchangeServer         //getExchanger()会通过SPI机制,获取到HeaderExchanger,然后将DubboProtocol的requestHandler传入bind()方法中         return getExchanger(url).bind(url, handler);     }      public static Exchanger getExchanger(URL url) {         String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);         //根据SPI机制,通过model组件体系去拿到对应的SPI扩展实现类实例         return url.getOrDefaultFrameworkModel().getExtensionLoader(Exchanger.class).getExtension(type);     }     ... }  public class HeaderExchanger implements Exchanger {     ...     @Override     public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {         //Exchanger这一层的代码可以理解为是位于上层的代码,它会把一些RpcInvocation调用转为请求/响应的模型,以及进行同步转异步的处理         //从Exchanger这层开始,便进入网络模型的范围,引入了请求的概念,并最终会通过底层的网络框架把请求发送出去         //因此需要获取到网络框架底层的Server和Client,并将它们封装到Exchanger组件如HeaderExchangeServer/HeaderExchangeClient中          //为什么需要不同的Transporter?         //在Exchanger这一层里其实是可以使用不同的网络技术的,比如Netty、Mina这些网络通信框架         //由于Netty、Mina这些不同的框架,它们的用法和API都是不同的,所以在Exchanger这一层,不能把Netty、Mina的API直接提供过来         //为了把这些不同的网络框架技术进行统一的封装,需要做一层Transporter,由Transporter来实现抽象统一的底层网络框架的使用标准         //所以Exchanger这一层是基于Transporter这一层提供的标准模型来实现请求/响应处理          //下面的Transporters.bind()会返回一个NettyServer         return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));     }     ... }  public class Transporters {     ...      public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {         ChannelHandler handler;         if (handlers.length == 1) {             handler = handlers[0];         } else {             handler = new ChannelHandlerDispatcher(handlers);         }         return getTransporter(url).bind(url, handler);     }      public static Transporter getTransporter(URL url) {         //下面使用了getAdaptiveExtension()的自适应机制,针对接口动态生成代码然后创建代理类         //代理类的方法,会根据url的参数动态提取对应的实现类的name名称,以及获取真正的需要使用的实现类         //有了真正的实现类后,就可以去调用实现类的extension实例的方法了         //比如下面会获取到一个NettyTransporter实例         return url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getAdaptiveExtension();     }     ... }  //不同的框架可以有不同的Transporter //每个框架对应的Transporter可以创建自己的Server和Client public class NettyTransporter implements Transporter {     public static final String NAME = "netty";      @Override     public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {         return new NettyServer(url, handler);     }      @Override     public Client connect(URL url, ChannelHandler handler) throws RemotingException {         //传入的handler装饰了DubboProtocol的requestHandler,返回一个NettyClient         return new NettyClient(url, handler);     } }

 

11.服务发布过程中NettyServer的构造流程

public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {     ...     //业务线程池     private Set<ExecutorService> executors = new ConcurrentHashSet<>();     private InetSocketAddress localAddress;     private InetSocketAddress bindAddress;     private int accepts;     private ExecutorRepository executorRepository;      public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {         //调用父类的构造方法         super(url, handler);          //通过使用SPI机制,从applicationModel组件中根据扩展接口ExecutorRepository去获取ExtensionLoader,然后拿到其默认实现类         this.executorRepository = url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();          //根据传入的URL初始化localAddress和bindAddress         this.localAddress = getUrl().toInetSocketAddress();         String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());         int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());         if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {             bindIp = ANYHOST_VALUE;         }         this.bindAddress = new InetSocketAddress(bindIp, bindPort);          //初始化accepts等字段         this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);          //调用doOpen()这个抽象方法,启动该Server         doOpen();          //获取该Server关联的线程池,通过DefaultExecutorRepository创建一个FixedThreadPool线程池出来         this.executors.add(executorRepository.createExecutorIfAbsent(url));     }      protected abstract void doOpen() throws Throwable;     ... }  public class NettyServer extends AbstractServer {     ...     private Map<String, Channel> channels;     private ServerBootstrap bootstrap;     private io.netty.channel.Channel channel;     private EventLoopGroup bossGroup;     private EventLoopGroup workerGroup;     private final int serverShutdownTimeoutMills;      //NettyServer在构建的过程中,会构建和打开真正的网络服务器     //这里是基于netty4技术去实现了网络服务器构建和打开的     //一旦打开后,Netty Server就开始监听指定的端口号     //当发现有请求过来就可以去进行处理,也就是通过ProxyInvoker去调用本地实现类的目标方法     //入参handler其实就是DubboProtocol中的requestHandler     public NettyServer(URL url, ChannelHandler handler) throws RemotingException {         //you can customize name and type of client thread pool by THREAD_NAME_KEY and THREAD_POOL_KEY in CommonConstants.         //the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler         super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));         //read config before destroy         serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());     }      //Init and start netty server     @Override     protected void doOpen() throws Throwable {         //创建ServerBootstrap         bootstrap = new ServerBootstrap();         //EventLoop,也可以理解为网络服务器,它会监听一个本地的端口号         //外部系统针对本地服务器端口号发起连接、通信、网络事件时,监听的端口号就会不停的产生网络事件         //EventLoop网络服务器,还会不停轮询监听到的网络事件         //boss的意思是负责监听端口号是否有外部系统的连接请求,它是一个EventLoopGroup线程池         //如果发现了网络事件,就需要进行请求处理,可以通过workerGroup里的多个线程进行并发处理         //创建boss EventLoopGroup,线程数是1         bossGroup = createBossGroup();          //创建worker EventLoopGroup,线程数是CPU核数+1,但最多不会超过32个线程         workerGroup = createWorkerGroup();         //创建NettyServerHandler         //它是一个Netty中的ChannelHandler实现,不是Dubbo Remoting层的ChannelHandler接口的实现         final NettyServerHandler nettyServerHandler = createNettyServerHandler();          //获取当前NettyServer创建的所有Channel         //channels集合中的Channel不是Netty中的Channel对象,而是Dubbo Remoting层的Channel对象         channels = nettyServerHandler.getChannels();         //初始化ServerBootstrap,指定boss和worker EventLoopGroup         initServerBootstrap(nettyServerHandler);         //绑定指定的地址和端口         ChannelFuture channelFuture = bootstrap.bind(getBindAddress());          //等待bind操作完成         channelFuture.syncUninterruptibly();         channel = channelFuture.channel();     }      protected EventLoopGroup createBossGroup() {         return NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);     }      protected EventLoopGroup createWorkerGroup() {         return NettyEventLoopFactory.eventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), EVENT_LOOP_WORKER_POOL_NAME);     }      protected NettyServerHandler createNettyServerHandler() {         return new NettyServerHandler(getUrl(), this);     }      protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {         boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);         bootstrap.group(bossGroup, workerGroup)         .channel(NettyEventLoopFactory.serverSocketChannelClass())         .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)         .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)         .childOption(ChannelOption.SO_KEEPALIVE, keepalive)         .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)         .childHandler(new ChannelInitializer<SocketChannel>() {             @Override             protected void initChannel(SocketChannel ch) throws Exception {                 //连接空闲超时时间                 int idleTimeout = UrlUtils.getIdleTimeout(getUrl());                 //NettyCodecAdapter中会创建Decoder和Encoder                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);                 if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {                     ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));                 }                 ch.pipeline()                 //注册Decoder和Encoder                 .addLast("decoder", adapter.getDecoder())                 .addLast("encoder", adapter.getEncoder())                 //注册IdleStateHandler                 .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))                 //注册NettyServerHandler                 .addLast("handler", nettyServerHandler);             }         });     }     ... }

 

12.服务发布过程中RegistryProtocol的服务注册

(1)首先注册service-discovery-registry的URL

(2)然后注册registry的URL

 

服务发布过程中需要进行注册的URL有两个:

service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=989®istry=zookeeper×tamp=1724302222103 registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=989®istry=zookeeper×tamp=1724302222103
Dubbo源码—1.服务发布的主要流程

(1)首先注册service-discovery-registry的URL

Dubbo源码—1.服务发布的主要流程

此时,在RegistryProtocol.export()方法中,进行服务注册时使用的Registry其实是一个ListenerRegistryWrapper装饰器,装饰着使用了ZookeeperServiceDiscovery的ServiceDiscoveryRegistry。

 

ZookeeperServiceDiscovery.doRegister()处理的URL注册其实就是在本地进行注册。

//-> RegistryProtocol.export() //-> RegistryProtocol.getRegistry() //-> RegistryFactory$Adaptive.getRegistry() //-> RegistryFactoryWrapper.getRegistry() //-> ServiceDiscoveryRegistryFactory.getRegistry() //-> ServiceDiscoveryRegistryFactory.createRegistry() //-> new ServiceDiscoveryRegistry() //-> ServiceDiscoveryRegistry.createServiceDiscovery() //=> ListenerRegistryWrapper、ServiceDiscoveryRegistry、ZookeeperServiceDiscovery  //-> RegistryProtocol.register() //-> ListenerRegistryWrapper.register() //-> ServiceDiscoveryRegistry.register() //-> AbstractServiceDiscovery.register() //-> ZookeeperServiceDiscovery.doRegister()
public class RegistryProtocol implements Protocol, ScopeModelAware {     ...     @Override     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {         //将"registry://"协议(Remote URL)转换成"zookeeper://"协议(Registry URL)         URL registryUrl = getRegistryUrl(originInvoker);          //url to export locally         //获取export参数,其中存储了一个"dubbo://"协议的Provider URL         URL providerUrl = getProviderUrl(originInvoker);          //Subscribe the override data         //FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service.         //Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.         final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);         final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);          Map<URL, NotifyListener> overrideListeners = getProviderConfigurationListener(providerUrl).getOverrideListeners();         overrideListeners.put(registryUrl, overrideSubscribeListener);         providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);          //1.export invoker         //下面进行导出服务,底层会通过会执行DubboProtocol.export()方法,启动对应的Server         //也就是会涉及到对另外一个Protocol组件的调用,远程发布服务时其实就是执行DubboProtocol的export方法         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);          //2.url to registry 完成服务注册的事情         //下面会根据RegistryURL获取对应的注册中心Registry对象,其中会依赖RegistryFactory         //远程发布时,下面的registry其实是一个ListenerRegistryWrapper装饰器,装饰着使用了ZookeeperServiceDiscovery的ServiceDiscoveryRegistry         //在基于注册中心的url地址去构建对应的注册中心组件时,默认是基于zk的         //而构建一个基于zk的注册中心组件,同时跟zk完成连接的建立,则由curator5框架来实现         final Registry registry = getRegistry(registryUrl);          //获取将要发布到注册中心上的Provider URL,其中会删除一些多余的参数信息         final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);          //decide if we need to delay publish (provider itself and registry should both need to register)         //根据register参数值决定是否注册服务         boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);         if (register) {             //调用Registry.register()方法将registeredProviderUrl发布到注册中心             register(registry, registeredProviderUrl);         }          //register stated url on provider model         //将Provider相关信息记录到的ProviderModel中         registerStatedUrl(registryUrl, registeredProviderUrl, register);         exporter.setRegisterUrl(registeredProviderUrl);         exporter.setSubscribeUrl(overrideSubscribeUrl);         if (!registry.isServiceDiscovery()) {             //Deprecated! Subscribe to override rules in 2.6.x or before.             //向注册中心进行订阅override数据,主要是监听该服务的configurators节点             registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);         }          //触发RegistryProtocolListener监听器         notifyExport(exporter);         //Ensure that a new exporter instance is returned every time export         return new DestroyableExporter<>(exporter);     }      protected Registry getRegistry(final URL registryUrl) {         //通过SPI自适应机制,去拿到对应的extension实例         //这里的registryFactory为RegistryFactory$Adaptive         RegistryFactory registryFactory = ScopeModelUtil.getExtensionLoader(RegistryFactory.class, registryUrl.getScopeModel()).getAdaptiveExtension();         //调用RegistryFactory$Adaptive的getRegistry()方法         return registryFactory.getRegistry(registryUrl);     }      private void register(Registry registry, URL registeredProviderUrl) {         //下面会调用ListenerRegistryWrapper.register()方法         registry.register(registeredProviderUrl);     }      private void registerStatedUrl(URL registryUrl, URL registeredProviderUrl, boolean registered) {         ProviderModel model = (ProviderModel) registeredProviderUrl.getServiceModel();         model.addStatedUrl(             new ProviderModel.RegisterStatedURL(                 registeredProviderUrl,                 registryUrl,                 registered             )         );     }     ... }  public class RegistryFactory$Adaptive implements org.apache.dubbo.registry.RegistryFactory {     public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0)  {         if (arg0 == null) throw new IllegalArgumentException("url == null");         org.apache.dubbo.common.URL url = arg0;         String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );         //注册service-discovery-registry的URL时,extName=service-discovery-registry         if (extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url (" + url.toString() + ") use keys([protocol])");         ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.registry.RegistryFactory.class);         //此时获取到的extension为RegistryFactoryWrapper         //RegistryFactoryWrapper的registryFactory为ServiceDiscoveryRegistryFactory         org.apache.dubbo.registry.RegistryFactory extension = (org.apache.dubbo.registry.RegistryFactory)scopeModel.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName);         //调用RegistryFactoryWrapper的getRegistry()方法         return extension.getRegistry(arg0);     } }  public class RegistryFactoryWrapper implements RegistryFactory {     //此时的registryFactory为ServiceDiscoveryRegistryFactory     private RegistryFactory registryFactory;      public RegistryFactoryWrapper(RegistryFactory registryFactory) {         this.registryFactory = registryFactory;     }      @Override     public Registry getRegistry(URL url) {         //调用ServiceDiscoveryRegistryFactory的getRegistry()方法获取一个ServiceDiscoveryRegistry         //所以这里会返回一个封装了ServiceDiscoveryRegistry的ListenerRegistryWrapper         return new ListenerRegistryWrapper(registryFactory.getRegistry(url),             Collections.unmodifiableList(                 url.getOrDefaultApplicationModel()                 .getExtensionLoader(RegistryServiceListener.class)                 .getActivateExtension(url, "registry.listeners")             )         );     } }  public abstract class AbstractRegistryFactory implements RegistryFactory, ScopeModelAware {     ...     @Override     public Registry getRegistry(URL url) {         ...         registry = createRegistry(url);         ...         return registry;     }     ... }  public class ServiceDiscoveryRegistryFactory extends AbstractRegistryFactory {     @Override     protected Registry createRegistry(URL url) {         if (UrlUtils.hasServiceDiscoveryRegistryProtocol(url)) {             //将"service-discovery-registry://"协议替换为"zookeeper://"             String protocol = url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);             url = url.setProtocol(protocol).removeParameter(REGISTRY_KEY);         }         return new ServiceDiscoveryRegistry(url, applicationModel);     } }  public class ServiceDiscoveryRegistry extends FailbackRegistry {     private final ServiceDiscovery serviceDiscovery;     ...      public ServiceDiscoveryRegistry(URL registryURL, ApplicationModel applicationModel) {         super(registryURL);         this.serviceDiscovery = createServiceDiscovery(registryURL);         this.serviceNameMapping = (AbstractServiceNameMapping) ServiceNameMapping.getDefaultExtension(registryURL.getScopeModel());         super.applicationModel = applicationModel;     }      protected ServiceDiscovery createServiceDiscovery(URL registryURL) {         //根据registryURL获取对应的ServiceDiscovery实现         //此时由于url的协议已经由"service-discovery-registry://"变为"zookeeper://"         //所以会获取一个ZookeeperServiceDiscovery进行返回         return getServiceDiscovery(registryURL.addParameter(INTERFACE_KEY, ServiceDiscovery.class.getName())             .removeParameter(REGISTRY_TYPE_KEY));     }     ... }  public class ListenerRegistryWrapper implements Registry {     private final Registry registry;     ...      public void register(URL url) {         ...         //此时下面会调用ServiceDiscoveryRegistry.register()方法         registry.register(url);     } }  public class ServiceDiscoveryRegistry extends FailbackRegistry {     //RegistryProtocol.export()方法中获取Registry时,这里会是一个ZookeeperServiceDiscovery     private final ServiceDiscovery serviceDiscovery;     ...      public final void register(URL url) {         //检测URL中的side参数是否为provider         if (!shouldRegister(url)) {             return;         }         doRegister(url);     }      public void doRegister(URL url) {         //将元数据发布到MetadataService         url = addRegistryClusterKey(url);         //下面会调用ZookeeperServiceDiscovery.register()方法         //此时只是将url添加到对象的属性中而已         serviceDiscovery.register(url);     } }  public abstract class AbstractServiceDiscovery implements ServiceDiscovery {     protected volatile ServiceInstance serviceInstance;     protected volatile MetadataInfo metadataInfo;     ...      @Override     public void register(URL url) {         //将要注册的url添加到metadataInfo属性         metadataInfo.addService(url);     }     ... }  public class ZookeeperServiceDiscovery extends AbstractServiceDiscovery {     ...     ... }  public class MetadataInfo implements Serializable {     //key format is '{group}/{interface name}:{version}:{protocol}'     //map里存放了多个服务实例的ServiceInfo     private final Map<String, ServiceInfo> services;     ...      public synchronized void addService(URL url) {         if (this.loader == null) {             this.loader = url.getOrDefaultApplicationModel().getExtensionLoader(MetadataParamsFilter.class);         }         List<MetadataParamsFilter> filters = loader.getActivateExtension(url, "params-filter");         //generate service level metadata         ServiceInfo serviceInfo = new ServiceInfo(url, filters);         this.services.put(serviceInfo.getMatchKey(), serviceInfo);         //extract common instance level params         extractInstanceParams(url, filters);         if (exportedServiceURLs == null) {             exportedServiceURLs = new ConcurrentSkipListMap<>();         }         addURL(exportedServiceURLs, url);         updated = true;     }     ... }

(2)然后注册registry的URL

Dubbo源码—1.服务发布的主要流程

此时,在RegistryProtocol.export()方法中,进行服务注册时使用的Registry其实是一个ListenerRegistryWrapper装饰器,装饰着一个ZooKeeperRegistry。

 

ZooKeeperRegistry.doRegister()处理的URL注册其实就是往注册中心注册。

//-> RegistryProtocol.export() //-> RegistryProtocol.getRegistry() //-> RegistryFactory$Adaptive.getRegistry() //-> RegistryFactoryWrapper.getRegistry() //-> ZookeeperRegistryFactory.getRegistry() //-> ZookeeperRegistryFactory.createRegistry() //-> new ZookeeperRegistry() //=> ListenerRegistryWrapper、ZookeeperRegistry  //-> RegistryProtocol.register() //-> ListenerRegistryWrapper.register() //-> FailbackRegistry.register() //-> ZookeeperRegistry.doRegister()
public class RegistryProtocol implements Protocol, ScopeModelAware {     ...     @Override     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {         //将"registry://"协议(Remote URL)转换成"zookeeper://"协议(Registry URL)         URL registryUrl = getRegistryUrl(originInvoker);          //url to export locally         //获取export参数,其中存储了一个"dubbo://"协议的Provider URL         URL providerUrl = getProviderUrl(originInvoker);          //Subscribe the override data         //FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service.         //Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.         final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);         final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);          Map<URL, NotifyListener> overrideListeners = getProviderConfigurationListener(providerUrl).getOverrideListeners();         overrideListeners.put(registryUrl, overrideSubscribeListener);         providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);          //1.export invoker         //下面进行导出服务,底层会通过会执行DubboProtocol.export()方法,启动对应的Server         //也就是会涉及到对另外一个Protocol组件的调用,远程发布服务时其实就是执行DubboProtocol的export方法         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);          //2.url to registry 完成服务注册的事情         //下面会根据RegistryURL获取对应的注册中心Registry对象,其中会依赖RegistryFactory         //远程发布时,下面的registry其实是一个ListenerRegistryWrapper装饰器,装饰着使用了ZookeeperServiceDiscovery的ServiceDiscoveryRegistry         //在基于注册中心的url地址去构建对应的注册中心组件时,默认是基于zk的         //而构建一个基于zk的注册中心组件,同时跟zk完成连接的建立,则由curator5框架来实现         final Registry registry = getRegistry(registryUrl);          //获取将要发布到注册中心上的Provider URL,其中会删除一些多余的参数信息         final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);          //decide if we need to delay publish (provider itself and registry should both need to register)         //根据register参数值决定是否注册服务         boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);         if (register) {             //调用Registry.register()方法将registeredProviderUrl发布到注册中心             register(registry, registeredProviderUrl);         }          //register stated url on provider model         //将Provider相关信息记录到的ProviderModel中         registerStatedUrl(registryUrl, registeredProviderUrl, register);         exporter.setRegisterUrl(registeredProviderUrl);         exporter.setSubscribeUrl(overrideSubscribeUrl);         if (!registry.isServiceDiscovery()) {             //Deprecated! Subscribe to override rules in 2.6.x or before.             //向注册中心进行订阅override数据,主要是监听该服务的configurators节点             registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);         }          //触发RegistryProtocolListener监听器         notifyExport(exporter);          //Ensure that a new exporter instance is returned every time export         return new DestroyableExporter<>(exporter);     }      protected Registry getRegistry(final URL registryUrl) {         //通过SPI自适应机制,去拿到对应的extension实例         //这里的registryFactory为RegistryFactory$Adaptive         RegistryFactory registryFactory = ScopeModelUtil.getExtensionLoader(RegistryFactory.class, registryUrl.getScopeModel()).getAdaptiveExtension();         //调用RegistryFactory$Adaptive的getRegistry()方法         return registryFactory.getRegistry(registryUrl);     }      private void register(Registry registry, URL registeredProviderUrl) {         //下面会调用ListenerRegistryWrapper.register()方法         registry.register(registeredProviderUrl);     }      private void registerStatedUrl(URL registryUrl, URL registeredProviderUrl, boolean registered) {         ProviderModel model = (ProviderModel) registeredProviderUrl.getServiceModel();         model.addStatedUrl(new ProviderModel.RegisterStatedURL(             registeredProviderUrl,             registryUrl,             registered));     }     ... }  public class RegistryFactory$Adaptive implements org.apache.dubbo.registry.RegistryFactory {     public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0)  {         if (arg0 == null) throw new IllegalArgumentException("url == null");         org.apache.dubbo.common.URL url = arg0;         String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );         //注册registry的URL时,由于RegistryProtocol.export()方法会将"registry://"协议替换为"zookeeper://"协议         //所以此时extName=zookeeper         if (extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url (" + url.toString() + ") use keys([protocol])");         ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.registry.RegistryFactory.class);         //此时获取到的extension为RegistryFactoryWrapper         //RegistryFactoryWrapper的registryFactory为ZookeeperRegistryFactory         org.apache.dubbo.registry.RegistryFactory extension = (org.apache.dubbo.registry.RegistryFactory)scopeModel.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName);         //调用RegistryFactoryWrapper的getRegistry()方法         return extension.getRegistry(arg0);     } }  public class RegistryFactoryWrapper implements RegistryFactory {     //此时的registryFactory为ZookeeperRegistryFactory     private RegistryFactory registryFactory;      public RegistryFactoryWrapper(RegistryFactory registryFactory) {         this.registryFactory = registryFactory;     }      @Override     public Registry getRegistry(URL url) {         //调用ZookeeperRegistryFactory的getRegistry()方法获取一个ZookeeperRegistry         //所以这里会返回一个封装了ZookeeperRegistry的ListenerRegistryWrapper         return new ListenerRegistryWrapper(registryFactory.getRegistry(url),             Collections.unmodifiableList(                 url.getOrDefaultApplicationModel()                     .getExtensionLoader(RegistryServiceListener.class)                     .getActivateExtension(url, "registry.listeners")             )         );     } }  public abstract class AbstractRegistryFactory implements RegistryFactory, ScopeModelAware {     ...     @Override     public Registry getRegistry(URL url) {         ...         registry = createRegistry(url);         ...         return registry;     }     ... }  public class ZookeeperRegistryFactory extends AbstractRegistryFactory {     private ZookeeperTransporter zookeeperTransporter;      public ZookeeperRegistryFactory() {         this(ApplicationModel.defaultModel());     }      public ZookeeperRegistryFactory(ApplicationModel applicationModel) {         this.applicationModel = applicationModel;         this.zookeeperTransporter = ZookeeperTransporter.getExtension(applicationModel);     }      @Override     public Registry createRegistry(URL url) {         return new ZookeeperRegistry(url, zookeeperTransporter);     }     ... }  public class ListenerRegistryWrapper implements Registry {     private final Registry registry;     ...      public void register(URL url) {         ...         //此时下面会调用ZookeeperRegistry.register()方法         //由于ZookeeperRegistry继承自FailbackRegistry         //所以会调用FailbackRegistry.register()方法         registry.register(url);     } }  public abstract class FailbackRegistry extends AbstractRegistry {     ...     @Override     public void register(URL url) {         super.register(url);         removeFailedRegistered(url);         removeFailedUnregistered(url);         doRegister(url);     }      public abstract void doRegister(URL url);     ... }  public class ZookeeperRegistry extends CacheableFailbackRegistry {     //跟zk建立网络连接的客户端     private ZookeeperClient zkClient;      //刚开始构建这个zookeeper registry,核心的就是去连接zk,与zk建立连接     public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {         //首先根据传入的url执行父类的构造函数         //该url就是zk的连接地址,比如zookeeper://localhost:2181/         super(url);         ...         //基于zk的API去构建与zk之间的连接         this.zkClient = zookeeperTransporter.connect(url);         ...     }      @Override     public void doRegister(URL url) {         //一个url代表了一个Provider服务实例的所有信息、配置和属性         //或者说一个url就代表了一个Provider服务实例         //服务注册中的核心注册,即Dubbo往zk进行服务实例注册的方法就在此处         try {             checkDestroyed();             //对于一个应用而言,它的注册其实就是往zk中创建的一个znode节点             zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));         } catch (Throwable e) {             throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);         }     }     ... }
Dubbo源码—1.服务发布的主要流程

 

13.Dubbo服务发布的完整流程总结

Dubbo源码—1.服务发布的主要流程
Dubbo源码—1.服务发布的主要流程

 

举报
发表评论

评论已关闭。

相关文章

当前内容话题
  • 0