引言
本文接着撸Distro协议,上文中分析了在Nacos server启动时会进行全量数据同步和数据校验,具体数据即客户端注册节点信息含命名空间、分组名称、服务名称、节点Instance信息等。什么时候会触发增量同步?增量同步都干了些啥,下文接着撸撸增量数据同步。
内容提要
增量数据同步
- 在Nacos节点启动时通过事件驱动模式订阅了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件
- 当节点收到ClientChangedEvent事件时,会向集群中其他节点发送更新Client信息请求,其他节点收到后更新缓存
- 当节点收到ClientVerifyFailedEvent事件时,向该Event指定的目标节点发起新增该Event指定的Client信息请求,目标节点收到后更新到自己缓存中
- 当节点收到ClientDisconnectEvent事件时,会向集群中其他节点发送删除Client信息请求,其他节点收到后将该Client缓存删除
增量事件触发
- 当有服务注册或者注销时会触发ClientEvent.ClientChangedEvent事件,即客户端调用naming.registerInstance或者naming.deregisterInstance
- 定时任务每隔3秒钟定时检查缓存中的所有连接,如果超过保鲜期20秒则再次发起连接请求,连接未成功则注销关闭该连接并发布ClientEvent.ClientDisconnectEvent事件
- Nacos集群之间通过每5秒发送心跳校验数据请求(具体为本节点负责Client信息),其他节点接受到校验请求,如果缓存中存在该client表示校验成功,同时更新保鲜时间;否则校验失败,回调返回失败Response,请求节点收到失败的Response后会发布ClientVerifyFailedEvent事件
增量数据同步
将代码翻到DistroClientDataProcessor类中,该类继承了SmartSubscriber,遵循Subscriber/Notify模式,即事件驱动模式。该模式前面文章中分析过,当有订阅的事件时会进行回调通知。
订阅的事件
DistroClientDataProcessor订阅了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件。
1 2 3 4 5 6 7 8
| @Override public List<Class<? extends Event>> subscribeTypes() { List<Class<? extends Event>> result = new LinkedList<>(); result.add(ClientEvent.ClientChangedEvent.class); result.add(ClientEvent.ClientDisconnectEvent.class); result.add(ClientEvent.ClientVerifyFailedEvent.class); return result; }
|
当有上述三个事件产生时,DefaultPublisher回调onEvent方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void onEvent(Event event) { if (EnvUtil.getStandaloneMode()) { return; } if (!upgradeJudgement.isUseGrpcFeatures()) { return; } if (event instanceof ClientEvent.ClientVerifyFailedEvent) { syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event); } else { syncToAllServer((ClientEvent) event); } }
|
注解@1 将ClientVerifyFailedEvent同步给校验失败的节点,操作类型为ADD
注解@2 将同步给集群中的其他节
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private void syncToAllServer(ClientEvent event) { Client client = event.getClient(); if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) { return; } if (event instanceof ClientEvent.ClientDisconnectEvent) { DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.DELETE); } else if (event instanceof ClientEvent.ClientChangedEvent) { DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.CHANGE); } }
|
注解@3 当客户端断开连接事件ClientDisconnectEvent时,向其他节点同步DELETE操作
注解@4 当客户端变更事件ClientChangedEvent时,向其他节点同步CHANGE操作
接着看下不同操作类型的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Override public boolean process(NacosTask task) { if (!(task instanceof DistroDelayTask)) { return true; } DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); switch (distroDelayTask.getAction()) { case DELETE: DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask); return true; case CHANGE: case ADD: DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true; default: return false; } }
|
向指定的集群节点同步更新数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Override public boolean syncData(DistroData data, String targetServer) { if (isNoExistTarget(targetServer)) { return true; } DistroDataRequest request = new DistroDataRequest(data, data.getType()); Member member = memberManager.find(targetServer); if (checkTargetServerStatusUnhealthy(member)) { Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer); return false; } try { Response response = clusterRpcClientProxy.sendRequest(member, request); return checkResponse(response); } catch (NacosException e) { Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e); } return false; }
|
异步更新操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Override public void syncData(DistroData data, String targetServer, DistroCallback callback) { if (isNoExistTarget(targetServer)) { callback.onSuccess(); } DistroDataRequest request = new DistroDataRequest(data, data.getType()); Member member = memberManager.find(targetServer); try { clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member)); } catch (NacosException nacosException) { callback.onFailed(nacosException); } }
|
节点收到这些操作请求如何处理呢?
代码翻到DistroDataRequestHandler#handle(),集群中节点收到请求后处理逻辑在这里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Override public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException { try { switch (request.getDataOperation()) { case VERIFY: return handleVerify(request.getDistroData(), meta); case SNAPSHOT: return handleSnapshot(); case ADD: case CHANGE: case DELETE: return handleSyncData(request.getDistroData()); case QUERY: return handleQueryData(request.getDistroData()); default: return new DistroDataResponse(); } } catch (Exception e) { Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e); DistroDataResponse result = new DistroDataResponse(); result.setErrorCode(ResponseCode.FAIL.getCode()); result.setMessage("handle distro request with exception"); return result; } }
|
可以看出ADD、CHANGE和DELETE均由handleSyncData处理。
1 2 3 4 5 6 7 8
| private DistroDataResponse handleSyncData(DistroData distroData) { DistroDataResponse result = new DistroDataResponse(); if (!distroProtocol.onReceive(distroData)) { result.setErrorCode(ResponseCode.FAIL.getCode()); result.setMessage("[DISTRO-FAILED] distro data handle failed"); } return result; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Override public boolean processData(DistroData distroData) { switch (distroData.getType()) { case ADD: case CHANGE: ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class) .deserialize(distroData.getContent(), ClientSyncData.class); handlerClientSyncData(clientSyncData); return true; case DELETE: String deleteClientId = distroData.getDistroKey().getResourceKey(); Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId); clientManager.clientDisconnected(deleteClientId); return true; default: return false; } }
|
注解@5 将同步过来的Client信息进行缓存
1 2 3 4 5 6 7
| private void handlerClientSyncData(ClientSyncData clientSyncData) { Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId()); clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes()); Client client = clientManager.getClient(clientSyncData.getClientId()); upgradeClient(client, clientSyncData); }
|
需要的是从其他节点通过过来的Client信息,ConnectionBasedClient属性isNative为false表示该连接时从其他节点同步过来的;true表示该连接客户端直接连接的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public boolean syncClientConnected(String clientId, ClientSyncAttributes attributes) { String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE); ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type); return clientConnected(clientFactory.newSyncedClient(clientId, attributes)); }
@Override public ConnectionBasedClient newSyncedClient(String clientId, ClientSyncAttributes attributes) { return new ConnectionBasedClient(clientId, false); }
@Override public boolean clientConnected(Client client) { Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId()); if (!clients.containsKey(client.getClientId())) { clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client); } return true; }
|
注解@5.1 更新Client的Service以及Instance信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| private void upgradeClient(Client client, ClientSyncData clientSyncData) {
List<String> namespaces = clientSyncData.getNamespaces(); List<String> groupNames = clientSyncData.getGroupNames(); List<String> serviceNames = clientSyncData.getServiceNames(); List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos(); Set<Service> syncedService = new HashSet<>(); for (int i = 0; i < namespaces.size(); i++) { Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i)); Service singleton = ServiceManager.getInstance().getSingleton(service); syncedService.add(singleton); InstancePublishInfo instancePublishInfo = instances.get(i); if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) { client.addServiceInstance(singleton, instancePublishInfo); NotifyCenter.publishEvent( new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId())); } } for (Service each : client.getAllPublishedService()) { if (!syncedService.contains(each)) { client.removeServiceInstance(each); NotifyCenter.publishEvent( new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId())); } } }
|
注解@6 响应删除操作,从clients缓存中移除。
1 2 3 4 5 6 7 8 9 10 11
| @Override public boolean clientDisconnected(String clientId) { Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId); ConnectionBasedClient client = clients.remove(clientId); if (null == client) { return true; } client.release(); NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client)); return true; }
|
小结: 增量同步的逻辑如下:当本节点DistroClientDataProcessor收到ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件时,会向Nacos集群的其他节点同步Client信息;集群中其他节点收到同步信息后更新或者删除本地缓存的Client信息;通过增量同步的Client信息isNative为false表示不是由客户端直连的。
增量事件触发
在Nacos server启动时从运行时内存信息可以看出,总共缓存了17个事件类型。当然也包括ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent。

ClientChangedEvent事件触发
当处理服务注册和注销事件时会触发ClientChangeEvent事件,详见InstanceRequestHandler#handle处理逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException { Service service = Service .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true); switch (request.getType()) { case NamingRemoteConstants.REGISTER_INSTANCE: return registerInstance(service, request, meta); case NamingRemoteConstants.DE_REGISTER_INSTANCE: return deregisterInstance(service, request, meta); default: throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType())); } }
|
注解@7 处理注册请求,会调用到addServiceInstance方法,该方法中发布了ClientEvent.ClientChangedEvent事件。
1 2 3 4 5 6 7 8
| public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) { if (null == publishers.put(service, instancePublishInfo)) { MetricsMonitor.incrementInstanceCount(); } NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this)); Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId()); return true; }
|
注解@8 处理注销请求,会调用到removeServiceInstance方法,该方法中发布了ClientEvent.ClientChangedEvent事件
1 2 3 4 5 6 7 8 9
| public InstancePublishInfo removeServiceInstance(Service service) { InstancePublishInfo result = publishers.remove(service); if (null != result) { MetricsMonitor.decrementInstanceCount(); NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this)); } Loggers.SRV_LOG.info("Client remove for service {}, {}", service, getClientId()); return result; }
|
小结: 当有服务注册或者注销时会触发ClientEvent.ClientChangedEvent事件。
ClientDisconnectEvent事件触发
下面一段代码通过检测连接是否超过保鲜期,超过保鲜期的会被注销关闭,翻到代码ConnectionManager#start()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
| @PostConstruct public void start() { RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { int totalCount = connections.size(); Loggers.REMOTE_DIGEST.info("Connection check task start"); MetricsMonitor.getLongConnectionMonitor().set(totalCount); Set<Map.Entry<String, Connection>> entries = connections.entrySet(); int currentSdkClientCount = currentSdkClientCount(); boolean isLoaderClient = loadClient >= 0; int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit;
int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0);
List<String> expelClient = new LinkedList<>();
Map<String, AtomicInteger> expelForIp = new HashMap<>(16);
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue(); String appName = client.getMetaInfo().getAppName(); String clientIp = client.getMetaInfo().getClientIp(); if (client.getMetaInfo().isSdkSource() && !expelForIp.containsKey(clientIp)) { int countLimitOfIp = connectionLimitRule.getCountLimitOfIp(clientIp); if (countLimitOfIp < 0) { int countLimitOfApp = connectionLimitRule.getCountLimitOfApp(appName); countLimitOfIp = countLimitOfApp < 0 ? countLimitOfIp : countLimitOfApp; } if (countLimitOfIp < 0) { countLimitOfIp = connectionLimitRule.getCountLimitPerClientIpDefault(); }
if (countLimitOfIp >= 0 && connectionForClientIp.containsKey(clientIp)) { AtomicInteger currentCountIp = connectionForClientIp.get(clientIp); if (currentCountIp != null && currentCountIp.get() > countLimitOfIp) { expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp)); } } } }
if (expelForIp.size() > 0) { Loggers.REMOTE_DIGEST.info("Over limit ip expel info,", expelForIp); }
Set<String> outDatedConnections = new HashSet<>(); long now = System.currentTimeMillis(); for (Map.Entry<String, Connection> entry : entries) { Connection client = entry.getValue(); String clientIp = client.getMetaInfo().getClientIp(); AtomicInteger integer = expelForIp.get(clientIp); if (integer != null && integer.intValue() > 0) { integer.decrementAndGet(); expelClient.add(client.getMetaInfo().getConnectionId()); expelCount--; } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) { outDatedConnections.add(client.getMetaInfo().getConnectionId()); }
}
if (expelCount > 0) { for (Map.Entry<String, Connection> entry : entries) { Connection client = entry.getValue(); if (!expelForIp.containsKey(client.getMetaInfo().clientIp) && client.getMetaInfo() .isSdkSource() && expelCount > 0) { expelClient.add(client.getMetaInfo().getConnectionId()); expelCount--; outDatedConnections.remove(client.getMetaInfo().getConnectionId()); } } }
String serverIp = null; String serverPort = null; if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) { String[] split = redirectAddress.split(Constants.COLON); serverIp = split[0]; serverPort = split[1]; }
for (String expelledClientId : expelClient) { try { Connection connection = getConnection(expelledClientId); if (connection != null) { ConnectResetRequest connectResetRequest = new ConnectResetRequest(); connectResetRequest.setServerIp(serverIp); connectResetRequest.setServerPort(serverPort); connection.asyncRequest(connectResetRequest, null); }
} catch (ConnectionAlreadyClosedException e) { unregister(expelledClientId); } catch (Exception e) { Loggers.REMOTE_DIGEST.error("Error occurs when expel connection :", expelledClientId, e); } }
Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size()); if (CollectionUtils.isNotEmpty(outDatedConnections)) { Set<String> successConnections = new HashSet<>(); final CountDownLatch latch = new CountDownLatch(outDatedConnections.size()); for (String outDateConnectionId : outDatedConnections) { try { Connection connection = getConnection(outDateConnectionId); if (connection != null) { ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest(); connection.asyncRequest(clientDetectionRequest, new RequestCallBack() { @Override public Executor getExecutor() { return null; }
@Override public long getTimeout() { return 1000L; }
@Override public void onResponse(Response response) { latch.countDown(); if (response != null && response.isSuccess()) { connection.freshActiveTime(); successConnections.add(outDateConnectionId); } }
@Override public void onException(Throwable e) { latch.countDown(); } }); } else { latch.countDown(); }
} catch (ConnectionAlreadyClosedException e) { latch.countDown(); } catch (Exception e) { latch.countDown(); } }
latch.await(3000L, TimeUnit.MILLISECONDS); Loggers.REMOTE_DIGEST .info("Out dated connection check successCount={}", successConnections.size());
for (String outDateConnectionId : outDatedConnections) { if (!successConnections.contains(outDateConnectionId)) { Loggers.REMOTE_DIGEST .info("[{}]Unregister Out dated connection....", outDateConnectionId); unregister(outDateConnectionId); } } }
if (isLoaderClient) { loadClient = -1; redirectAddress = null; }
} catch (Throwable e) { } } }, 1000L, 3000L, TimeUnit.MILLISECONDS);
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public synchronized void unregister(String connectionId) { Connection remove = this.connections.remove(connectionId); if (remove != null) { String clientIp = remove.getMetaInfo().clientIp; AtomicInteger atomicInteger = connectionForClientIp.get(clientIp); if (atomicInteger != null) { int count = atomicInteger.decrementAndGet(); if (count <= 0) { connectionForClientIp.remove(clientIp); } } remove.close(); Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId); clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); } }
public void notifyClientDisConnected(final Connection connection) { for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) { try { clientConnectionEventListener.clientDisConnected(connection); } catch (Throwable throwable) { Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}", clientConnectionEventListener.getName(), throwable); } } }
@Override public boolean clientDisconnected(String clientId) { Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId); ConnectionBasedClient client = clients.remove(clientId); if (null == client) { return true; } client.release(); NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client)); return true; }
|
小结: 连接可以配置限制规则具体在${usr.home}/nacos/data/loader/limitRule文件配置,默认无限制;通过定时任务每隔3秒钟定时检查缓存中的所有连接包括通过来源sdk的连接和集群的连接;如果连接超过保鲜期20秒,并再次发起连接请求,未能连接成功则注销关闭该连接;注销关闭时发布ClientEvent.ClientDisconnectEvent事件。
ClientVerifyFailedEvent事件触发
上一篇文章中梳理了Nacos集群中,每个节点会对集群中其他节点每隔5秒发送校验数据,也就是心跳。当校验的结果会进行回调(gRPC为例),我们翻着看看这部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) { if (isNoExistTarget(targetServer)) { callback.onSuccess(); } DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY); Member member = memberManager.find(targetServer); try { DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer, verifyData.getDistroKey().getResourceKey(), callback, member); clusterRpcClientProxy.asyncRequest(member, request, wrapper); } catch (NacosException nacosException) { callback.onFailed(nacosException); } }
|
重点看下DistroVerifyCallbackWrapper部分,校验失败发布ClientVerifyFailedEvent事件。
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void onResponse(Response response) { if (checkResponse(response)) { NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp()); distroCallback.onSuccess(); } else { Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId); NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer)); NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp()); distroCallback.onFailed(null); } }
|
最后看下ClientVerifyFailedEvent这个类,关注下成员变量包含了clientId和targetServer。当收到ClientVerifyFailedEvent时用于向targetServer目标节点添加客户端clientId信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public static class ClientVerifyFailedEvent extends ClientEvent {
private static final long serialVersionUID = 2023951686223780851L;
private final String clientId; private final String targetServer; public ClientVerifyFailedEvent(String clientId, String targetServer) { super(null); this.clientId = clientId; this.targetServer = targetServer; } public String getClientId() { return clientId; } public String getTargetServer() { return targetServer; } }
|
小结: Nacos集群之间通过每5秒发送心跳校验数据请求(具体为本节点负责Client信息),其他节点接受到校验请求,如果缓存中存在该client表示校验成功,同时更新保鲜时间;否则校验失败,回调返回失败Response,请求节点收到失败的Response后会发布ClientVerifyFailedEvent事件。