引言
上篇文章分析了Nacos服务端启动的逻辑,本文分析启动时加载了哪些Handler,以及处理连接请求和注册请求的逻辑。
内容提要
加载RequestHandler
- 容器启动时加载了13个RequestHandler由他们实际处理各种请求逻辑后面章节再细撸
- 当然也加载了InstanceRequest->InstanceRequestHandler
服务端响应客户端连接
- 处理客户端的连接请求由GrpcBiStreamRequestAcceptor#requestBiStream负责
- 构建和缓存了GrpcConnection对象和Client对象
服务端处理注册请求
- 处理注册请求的逻辑由InstanceRequestHandler执行
- 建立起了client与service、instance的关系
- 发布了三个事件ClientEvent.ClientChangedEvent、ClientOperationEvent.ClientRegisterServiceEvent、MetadataEvent.InstanceMetadataEvent
加载RequestHandler
com.alibaba.nacos.core.remote#RequestHandlerRegistry实现了ApplicationListener,在spring启动后回调onApplicationEvent。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Override public void onApplicationEvent(ContextRefreshedEvent event) { Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class); Collection<RequestHandler> values = beansOfType.values(); for (RequestHandler requestHandler : values) {
Class<?> clazz = requestHandler.getClass(); boolean skip = false; while (!clazz.getSuperclass().equals(RequestHandler.class)) { if (clazz.getSuperclass().equals(Object.class)) { skip = true; break; } clazz = clazz.getSuperclass(); } if (skip) { continue; }
try { Method method = clazz.getMethod("handle", Request.class, RequestMeta.class); if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) { TpsControl tpsControl = method.getAnnotation(TpsControl.class); String pointName = tpsControl.pointName(); TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(pointName); tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint); } } catch (Exception e) { }
Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0]; registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler); } }
|
注解@1: 获取RequestHandler的众多实现类
注解@2: 跳过超类RequestHandler
注解@3: 获取需要带有「TpsControl」注解的方法,表示需要对流量Tps控制
注解@4: 将Handler装载到缓存 例如:InstanceRequest->InstanceRequestHandler
小结: 在spring启动时转载了众多的RequestHandler,这些Handler是真正的逻辑所在。下面列表时加载的13个RequestHandler
Key |
Value |
备注 |
ConfigBatchListenRequest |
ConfigChangeBatchListenRequestHandler |
|
ConfigChangeClusterSyncRequest |
ConfigChangeClusterSyncRequestHandler |
|
ConfigPublishRequest |
ConfigPublishRequestHandler |
|
ConfigQueryRequest |
ConfigQueryRequestHandler |
|
ConfigRemoveRequest |
ConfigRemoveRequestHandler |
|
HealthCheckRequest |
HealthCheckRequestHandler |
|
ServerLoaderInfoRequest |
ServerLoaderInfoRequestHandler |
|
ServerReloadRequest |
ServerReloaderRequestHandler |
|
DistroDataRequest |
DistroDataRequestHandler |
|
InstanceRequest |
InstanceRequestHandler |
|
ServiceListRequest |
ServiceListRequestHandler |
|
ConfigPublishRequest |
ConfigPublishRequestHandler |
|
SubscribeServiceRequest |
SubscribeServiceRequestHandler |
|
服务端响应客户端连接
client连接server
在前面《Nacos2# 服务注册与发现客户端示例与源码解析(二)》中client会与server建立连接具体方法为connectToServer()。
在分下《Nacos3# 服务注册与发现服务端启动源码解析》时protobuf文件装载了两类grpc请求类型,其中一类为双向流调用方式BiRequestStream#biRequestStream。
客户端发起connectToServer()时正式双向流调用类型:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public Connection connectToServer(ServerInfo serverInfo) { BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc .newStub(newChannelStubTemp.getChannel()); } private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub, final GrpcConnection grpcConn) { return streamStub.requestBiStream(new StreamObserver<Payload>() { } }
|
那在nacos server端如何响应的呢?
Nacos server响应连接
在分析《Nacos3# 服务注册与发现服务端启动源码解析》服务端grpc启动构建暴露的服务定义信息时添加了requestBiStream
1 2 3 4 5 6 7 8 9
| private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) { final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall( (responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver)); final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition .builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build(); }
|
接下来看其处理连接请求逻辑。坐标GrpcBiStreamRequestAcceptor#requestBiStream
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| @Override public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {
StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() { final String connectionId = CONTEXT_KEY_CONN_ID.get(); final Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get(); final int remotePort = CONTEXT_KEY_CONN_REMOTE_PORT.get(); String remoteIp = CONTEXT_KEY_CONN_REMOTE_IP.get(); String clientIp = "";
@Override public void onNext(Payload payload) { clientIp = payload.getMetadata().getClientIp(); traceDetailIfNecessary(payload);
Object parseObj = null; try { parseObj = GrpcUtils.parse(payload); } catch (Throwable throwable) { } if (parseObj instanceof ConnectionSetupRequest) { ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj; Map<String, String> labels = setUpRequest.getLabels(); String appName = "-"; if (labels != null && labels.containsKey(Constants.APPNAME)) { appName = labels.get(Constants.APPNAME); }
ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(), remoteIp, remotePort, localPort,ConnectionType.GRPC.getType(),setUpRequest.getClientVersion(), appName, setUpRequest.getLabels()); metaInfo.setTenant(setUpRequest.getTenant()); Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get()); connection.setAbilities(setUpRequest.getAbilities()); boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted(); if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) { try { connection.request(new ConnectResetRequest(), 3000L); connection.close(); } catch (Exception e) { if (connectionManager.traced(clientIp)) { } } return; }
} else if (parseObj != null && parseObj instanceof Response) { Response response = (Response) parseObj; if (connectionManager.traced(clientIp)) { } RpcAckCallbackSynchronizer.ackNotify(connectionId, response); connectionManager.refreshActiveTime(connectionId); } else { return; }
}
return streamObserver; }
|
注解@5 构建连接对象
注解@6 注册连接信息,一个是缓存连接信息到Map connections中;另一个是触发client连接通知
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public synchronized boolean register(String connectionId, Connection connection) {
if (connection.isConnected()) { connections.put(connectionId, connection); connectionForClientIp.get(connection.getMetaInfo().clientIp).getAndIncrement();
clientConnectionEventListenerRegistry.notifyClientConnected(connection); return true;
} return false;
}
|
client连接通知处理逻辑:坐标ConnectionBasedClientManager#clientConnected。主要构建Client对象并缓存,clientId=connectId
1 2 3 4 5 6 7 8 9
| public void clientConnected(Connection connect) { if (!RemoteConstants.LABEL_MODULE_NAMING.equals(connect.getMetaInfo().getLabel(RemoteConstants.LABEL_MODULE))) { return; } String type = connect.getMetaInfo().getConnectType(); ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type); clientConnected(clientFactory.newClient(connect.getMetaInfo().getConnectionId())); }
|
1 2 3 4 5 6 7 8
| public boolean clientConnected(Client client) { Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId()); if (!clients.containsKey(client.getClientId())) { clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client); } return true; }
|
小结: Nacos Server响应客户端连接主要干了两件事:@1 构建和缓存GrpcConnection对象,包含ConnectionMeta、channel和StreamObserver;@2 构建和缓存Client对象包含;clientId即connectionId。
ConnectionMeta属性
属性 |
含义 |
connectType |
ConnectionType,例如:GRPC |
clientIp |
客户端IP,例如:clientIp=192.168.0.110 |
remoteIp |
远程节点IP,例如:remoteIp=127.0.0.1 |
remotePort |
远程节点端口,例如:remotePort=54314 |
localPort |
本地节点端口,例如:localPort=9848 |
version |
client 版本 |
connectionId |
连接标识 |
createTime |
连接创建时间 |
lastActiveTime |
最新保活时间 |
appName |
app名称默认为“-” |
tenant |
租户 |
ConnectionBasedClient属性
属性 |
含义 |
connectionId |
连接标识 |
isNative |
true表示client直接连接到该server;false表示该连接是被从其他server同步而来 |
lastRenewTime |
当isNative为false时,上次从源server校验的时间戳 |
lastUpdatedTime |
最新保鲜时间 |
服务端处理注册请求
在上面文章《服务注册与发现服务端启动源码解析》有提到由RequestAcceptor.request会处理
1 2 3 4 5
| final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls .asyncUnaryCall((request, responseObserver) -> { grpcCommonRequestAcceptor.request(request, responseObserver); });
|
下面看下request处理逻辑,忽略一些校验
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Override public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) { traceIfNecessary(grpcRequest, true); String type = grpcRequest.getMetadata().getType();
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type); String connectionId = CONTEXT_KEY_CONN_ID.get(); boolean requestValid = connectionManager.checkValid(connectionId); Object parseObj = null; try { parseObj = GrpcUtils.parse(grpcRequest); } catch (Exception e) { return; } if (parseObj == null) { } if (!(parseObj instanceof Request)) { return; } Request request = (Request) parseObj; try { Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get()); RequestMeta requestMeta = new RequestMeta(); requestMeta.setClientIp(connection.getMetaInfo().getClientIp()); requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get()); requestMeta.setClientVersion(connection.getMetaInfo().getVersion()); requestMeta.setLabels(connection.getMetaInfo().getLabels()); connectionManager.refreshActiveTime(requestMeta.getConnectionId()); Response response = requestHandler.handleRequest(request, requestMeta); Payload payloadResponse = GrpcUtils.convert(response); traceIfNecessary(payloadResponse, false); responseObserver.onNext(payloadResponse); responseObserver.onCompleted(); } catch (Throwable e) { return; }
}
|
注解@7 根据请求类型获取处理的RequestHandler,处理注册的为InstanceRequest->InstanceRequestHandler
注解@8 解析成Request对象
注解@9 装载clientIP、connId、version、其他map信息
注解@10 刷新连接保鲜时间
注解@11 执行RequestHandler逻辑,具体为registerInstance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Override @Secured(action = ActionTypes.WRITE, parser = NamingResourceParser.class) public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException { Service service = Service .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true); switch (request.getType()) { case NamingRemoteConstants.REGISTER_INSTANCE: return registerInstance(service, request, meta); case NamingRemoteConstants.DE_REGISTER_INSTANCE: return deregisterInstance(service, request, meta); default: throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType())); } }
|

接着分析注册干了点啥?坐标:EphemeralClientOperationServiceImpl#registerInstance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Override public void registerInstance(Service service, Instance instance, String clientId) { Service singleton = ServiceManager.getInstance().getSingleton(service); Client client = clientManager.getClient(clientId); InstancePublishInfo instanceInfo = getPublishInfo(instance); client.addServiceInstance(singleton, instanceInfo); client.setLastUpdatedTime(); NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId)); NotifyCenter .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false)); }
|
注解@12 singletonRepository缓存Singleton(service),namespaceSingletonMaps建立namespace和Singleton关系
注解@13 从缓存中获取client信息,也就是上一小节建立连接被缓存的client
注解@14 建立client与service、instance关系并发布事件ClientEvent.ClientChangedEvent
1 2 3 4 5 6 7 8
| public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) { if (null == publishers.put(service, instancePublishInfo)) { MetricsMonitor.incrementInstanceCount(); } NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this)); Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId()); return true; }
|
注解@15 发布ClientOperationEvent.ClientRegisterServiceEvent事件
注解@16 发布MetadataEvent.InstanceMetadataEvent事件
小结: 处理注册请求的逻辑由InstanceRequestHandler执行,建立起了client与service、instance的关系。并通过NotifyCenter发布了三个事件:@1 ClientEvent.ClientChangedEvent;@2 ClientOperationEvent.ClientRegisterServiceEvent事件;@3 MetadataEvent.InstanceMetadataEvent事件。
那谁订阅了这些事件呢?他们又做了啥?下回接着撸。