@Component public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> { private final EphemeralClientOperationServiceImpl clientOperationService; public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) { this.clientOperationService = clientOperationService; } @Override @Secured(action = ActionTypes.WRITE) public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException { //根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名 Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true); switch (request.getType()) { case NamingRemoteConstants.REGISTER_INSTANCE: //注册实例 return registerInstance(service, request, meta); case NamingRemoteConstants.DE_REGISTER_INSTANCE: //注销实例 return deregisterInstance(service, request, meta); default: throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType())); } } private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) { //调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数; //参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名 //参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息 //参数meta.getConnectionId():这个参数很关键,它是连接ID clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId()); return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE); } private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) { //调用EphemeralClientOperationServiceImpl的注销方法deregisterInstance() clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId()); return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE); } } @Component("ephemeralClientOperationService") public class EphemeralClientOperationServiceImpl implements ClientOperationService { private final ClientManager clientManager; ... @Override public void deregisterInstance(Service service, Instance instance, String clientId) { if (!ServiceManager.getInstance().containSingleton(service)) { Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", service); return; } //从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象 Service singleton = ServiceManager.getInstance().getSingleton(service); //从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象 Client client = clientManager.getClient(clientId); if (!clientIsLegal(client, clientId)) { return; } //调用AbstractClient.removeServiceInstance()方法 //移除Client对象中的instance信息并发布ClientChangedEvent事件来同步集群节点 InstancePublishInfo removedInstance = client.removeServiceInstance(singleton); client.setLastUpdatedTime(); if (null != removedInstance) { //发布客户端注销服务实例的事件 NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId)); NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true)); } } ... } public abstract class AbstractClient implements Client { //publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务 //存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象 //key为已注册的Service,value是根据请求中的instance实例信息封装的InstancePublishInfo对象 protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1); ... @Override public InstancePublishInfo removeServiceInstance(Service service) { InstancePublishInfo result = publishers.remove(service); if (null != result) { MetricsMonitor.decrementInstanceCount(); //发布客户端改变事件,用于处理集群间的数据同步 NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this)); } Loggers.SRV_LOG.info("Client remove for service {}, {}", service, getClientId()); return result; } ... }
一.处理ClientChangedEvent事件
也就是同步数据到集群节点:
public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor { ... @Override public void onEvent(Event event) { ... if (event instanceof ClientEvent.ClientVerifyFailedEvent) { syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event); } else { syncToAllServer((ClientEvent) event); } } private void syncToAllServer(ClientEvent event) { Client client = event.getClient(); //Only ephemeral data sync by Distro, persist client should sync by raft. //临时实例使用Distro协议,持久化实例使用Raft协议 //ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步 if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) { return; } if (event instanceof ClientEvent.ClientDisconnectEvent) { //如果event是客户端注销实例时需要进行集群节点同步的事件 DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.DELETE); } else if (event instanceof ClientEvent.ClientChangedEvent) { //如果event是客户端注册实例时需要进行集群节点同步的事件 DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.CHANGE); } } ... } @Component public class DistroProtocol { private final ServerMemberManager memberManager; private final DistroTaskEngineHolder distroTaskEngineHolder; ... //Start to sync by configured delay. public void sync(DistroKey distroKey, DataOperation action) { sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis()); } //Start to sync data to all remote server. public void sync(DistroKey distroKey, DataOperation action, long delay) { //遍历集群中除自身节点外的其他节点 for (Member each : memberManager.allMembersWithoutSelf()) { syncToTarget(distroKey, action, each.getAddress(), delay); } } //Start to sync to target server. public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) { //先把要同步的集群节点targetServer包装成DistroKey对象 DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer); //然后根据DistroKey对象创建DistroDelayTask任务 DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); //接着调用NacosDelayTaskExecuteEngine.addTask()方法 //往延迟任务执行引擎DistroDelayTaskExecuteEngine中添加延迟任务DistroDelayTask distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer); } } ... }
二.处理ClientDeregisterServiceEvent事件
也就是移除注册表 + 订阅表的服务实例:
@Component public class ClientServiceIndexesManager extends SmartSubscriber { //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>(); //订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientId private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>(); ... @Override public void onEvent(Event event) { if (event instanceof ClientEvent.ClientDisconnectEvent) { handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event); } else if (event instanceof ClientOperationEvent) { handleClientOperation((ClientOperationEvent) event); } } private void handleClientOperation(ClientOperationEvent event) { Service service = event.getService(); String clientId = event.getClientId(); if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) { //处理客户端注册事件ClientRegisterServiceEvent addPublisherIndexes(service, clientId); } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) { //处理客户端注销事件ClientDeregisterServiceEvent removePublisherIndexes(service, clientId); } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) { //处理客户端订阅服务事件ClientSubscribeServiceEvent addSubscriberIndexes(service, clientId); } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) { //处理客户端取消订阅事件ClientUnsubscribeServiceEvent removeSubscriberIndexes(service, clientId); } } private void removePublisherIndexes(Service service, String clientId) { if (!publisherIndexes.containsKey(service)) { return; } //移除注册表中的服务实例 publisherIndexes.get(service).remove(clientId); //发布服务改变事件ServiceChangedEvent NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); } ... }
三.处理ServiceChangeEvent事件
也就是通知订阅了该服务的客户端:
@org.springframework.stereotype.Service public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements NamingSubscriberService { ... @Override public void onEvent(Event event) { if (!upgradeJudgement.isUseGrpcFeatures()) { return; } if (event instanceof ServiceEvent.ServiceChangedEvent) { //If service changed, push to all subscribers. //如果服务变动,会向Service服务的所有订阅者推送Service服务的实例信息,让订阅者(客户端)更新本地缓存 ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event; Service service = serviceChangedEvent.getService(); //调用NacosDelayTaskExecuteEngine.addTask()方法,往延迟任务执行引擎添加任务 delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay())); } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) { //If service is subscribed by one client, only push this client. //如果Service服务被一个客户端订阅,则只推送Service服务的实例信息给该客户端 ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event; Service service = subscribedEvent.getService(); //调用NacosDelayTaskExecuteEngine.addTask()方法,往延迟任务执行引擎添加任务 delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId())); } } ... }
import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.listener.SmartSubscriber; import org.springframework.stereotype.Component; import java.util.LinkedList; import java.util.List; //自定义的订阅者需要继承Nacos的SmartSubscriber抽象类 @Component public class TestSubscriber extends SmartSubscriber { //构造方法中需要将自定义的订阅者TestSubscriber注册到Nacos的通知中心NotifyCenter public TestSubscriber() { NotifyCenter.registerSubscriber(this); } //实现subscribeTypes()方法时,把自定义的事件TestEvent添加进去返回 @Override public List<Class<? extends Event>> subscribeTypes() { List<Class<? extends Event>> result = new LinkedList<>(); result.add(TestEvent.class); return result; } //实现onEvent()方法 //当Nacos的通知中心NotifyCenter发布一个TestEvent事件时,就会响应该方法处理订阅者的逻辑 @Override public void onEvent(Event event) { System.out.println("TestSubscriber onEvent"); } }
三.最后通过Nacos的通知中心NotifyCenter发布自定义事件
这样便完成了自定义事件、自定义订阅者通过Nacos实现发布订阅功能。
@RestController @RequestMapping("/sub/") public class SubscriberController { @GetMapping("/test") public void test() { NotifyCenter.publishEvent(new TestEvent()); } }