引言 Nacos支持众多健康检查类型,心跳、HTTP、TCP、MySQL等类型,这些都作用于什么场景?他们又是如何事项的呢?本文就撸一撸这个。
内容提要 临时节点续约
临时节点续约通过gRPC连接保鲜实现
执行频率5秒一次
检查结果健康刷新保鲜时间
检查结果不可用标记节点不健康
当节点不健康时重新连接时会从server列表选择下一个节点连接
持久节点心跳检测
心跳执行器通过每隔五秒中向Nacos Server发起HTTP请求
如果返回的server not found会向Nacos Server发起注册请求重新注册
持久节点探活
Nacos探活只有在持久节点注册时才会支持
探活支持HTTP、TCP、Mysql三种探活类型
HTTP通过检测返回200状态码标记是否健康
TPC通过Channel连接方式标记是否健康
Mysql则保证当前节点为主节点,可用于主从切换场景
临时节点续约 在《Nacos2# 服务注册与发现客户端示例与源码解析(二)》分析gRPC Client启动逻辑时有分析连接健康检查逻辑。具体代码在RpcClient#start()中,下面再次聚焦下。
client连接心跳检查 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 clientEventExecutor.submit(new Runnable() { @Override public void run () { while (true ) { try { ReconnectContext reconnectContext = reconnectionSignal .poll(keepAliveTime, TimeUnit.MILLISECONDS); if (reconnectContext == null ) { if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) { boolean isHealthy = healthCheck(); if (!isHealthy) { if (currentConnection == null ) { continue ; } LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Server healthy check fail,currentConnection={}" , name, currentConnection.getConnectionId()); rpcClientStatus.set(RpcClientStatus.UNHEALTHY); reconnectContext = new ReconnectContext(null , false ); lastActiveTimeStamp = System.currentTimeMillis(); continue ; } } else { continue ; } } } catch (Throwable throwable) { } } } });
服务端响应 代码翻到GrpcRequestAcceptor#request部分,执行RequestHandler逻辑。
1 Response response = requestHandler.handleRequest(request, requestMeta);
1 2 3 4 @TpsControl(pointName = "HealthCheck") public HealthCheckResponse handle (HealthCheckRequest request, RequestMeta meta) { return new HealthCheckResponse(); }
当服务端收到健康检查请求时,通过HealthCheckRequestHandler#handle返回HealthCheckResponse。
节点选择 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 while (startUpRetryTimes > 0 && connectToServer == null ) { try { startUpRetryTimes--; ServerInfo serverInfo = nextRpcServer(); LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}" , name, serverInfo); connectToServer = connectToServer(serverInfo); } catch (Throwable e) { LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Fail to connect to server on start up, error message={}, start up retry times left: {}" , name, e.getMessage(), startUpRetryTimes); } }
在选择节点时从server地址列表中自增选择下一个。
1 2 3 4 5 @Override public String genNextServer () { int index = currentIndex.incrementAndGet() % getServerList().size(); return getServerList().get(index); }
清理无效连接 详见:RpcScheduledExecutor#start();定时任务会清理无效连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() { @Override public void run () { connection.asyncRequest(clientDetectionRequest, new RequestCallBack() { connection.freshActiveTime(); successConnections.add(outDateConnectionId); unregister(outDateConnectionId); } }, 1000L , 3000L , TimeUnit.MILLISECONDS);
小结: 在临时节点注册时,客户端gRPC启动时会启动一个守护线程用户健康检查;健康检查的频率为5秒执行一次;当检查结果健康则刷新保鲜时间;检查结果不可用标记gRPC客户端状态为unhealthy;不健康节点在发起连接时会从server地址列表中选择下一个发起连接;
服务端也会定时清理超过保鲜时间连接。
永久节点心跳检测 永久节点心跳检测在《Nacos2# 服务注册与发现客户端示例与源码解析(二)》HTTP心跳检测器有详细分析,这里把内容要点摘录如下:
HTTP心跳检测只适用于注册的节点持久节点,临时节点会使用grpc代理
心跳执行器通过每隔五秒中向Nacos Server发起HTTP请求
如果返回的server not found会向Nacos Server发起注册请求重新注册
持久节点探活 持久节点探活支持HTTP、TCP和Mysql几种类型,下面以HTTP为例分析其运行逻辑。
探活示例 当注册时节点设置为不健康即Healthy设置为false,当服务端探活正常后将节点设置为健康即Healthy为true。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @SpringBootApplication public class BootApplication {public static void main (String[] args) throws Exception { SpringApplication.run(BootApplication.class, args); System.setProperty("serverAddr" , "127.0.0.1:8848" ); System.setProperty("namespace" , "public" ); Properties properties = new Properties(); properties.setProperty("serverAddr" , System.getProperty("serverAddr" )); properties.setProperty("namespace" , System.getProperty("namespace" )); Instance instance = new Instance(); instance.setClusterName("clusterDemo3" ); instance.setHealthy(false ); instance.setIp(getIpAddress()); instance.setPort(8282 ); instance.setWeight(100 ); instance.setServiceName("AppNacosDemo3" ); instance.setEphemeral(false ); Map<String, String> map = new HashMap<String, String>(); map.put("unit" , "shunit" ); instance.setMetadata(map); NamingService naming = NamingFactory.createNamingService(properties); naming.registerInstance("AppNacosDemo3" , instance); } private static String getIpAddress () throws SocketException { Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces(); while (allNetInterfaces.hasMoreElements()) { NetworkInterface netInterface = allNetInterfaces.nextElement(); if (!netInterface.isLoopback() && !netInterface.isVirtual() && netInterface.isUp()) { Enumeration<InetAddress> addresses = netInterface.getInetAddresses(); while (addresses.hasMoreElements()) { InetAddress ip = addresses.nextElement(); final String hostAddress = ip.getHostAddress(); if (ip instanceof Inet4Address) { return hostAddress; } } } } return "" ; } } @RestController public class HelloController { @RequestMapping(value = "/bike",method = RequestMethod.GET) public void hello () { System.out.println("receive message from server." ); } }
探活地址设置
探活输出
1 2 receive message from server. receive message from server.
小结: 启动时注册节点为非健康节点,Nacos通过检查路径请求返回200正确后将节点设置为健康状态。
源码分析 当持久节点注册时,会请求到InstanceController#register方法。
1 2 3 4 5 6 7 8 @Override public void registerInstance (String namespaceId, String serviceName, Instance instance) { boolean ephemeral = instance.isEphemeral(); String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral); createIpPortClientIfAbsent(clientId, ephemeral); Service service = getService(namespaceId, serviceName, ephemeral); clientOperationService.registerInstance(service, instance, clientId); }
1 2 3 4 5 6 7 8 9 10 11 12 13 public IpPortBasedClient (String clientId, boolean ephemeral) { this .ephemeral = ephemeral; this .clientId = clientId; this .responsibleId = getResponsibleTagFromId(); if (ephemeral) { beatCheckTask = new ClientBeatCheckTaskV2(this ); HealthCheckReactor.scheduleCheck(beatCheckTask); } else { healthCheckTaskV2 = new HealthCheckTaskV2(this ); HealthCheckReactor.scheduleCheck(healthCheckTaskV2); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override public void doHealthCheck () { try { for (Service each : client.getAllPublishedService()) { if (switchDomain.isHealthCheckEnabled(each.getGroupedServiceName())) { InstancePublishInfo instancePublishInfo = client.getInstancePublishInfo(each); ClusterMetadata metadata = getClusterMetadata(each, instancePublishInfo); ApplicationUtils.getBean(HealthCheckProcessorV2Delegate.class).process(this , each, metadata); } } } catch (Throwable e) { } finally { if (!cancelled) { HealthCheckReactor.scheduleCheck(this ); if (this .getCheckRtWorst() > 0 ) { } } } } } }
备注:定时任务调度,不断向服务的注册节点发送探活请求。
探活处理器选择
备注:由运行时缓存情况可以看出,支持TPC、HTTP、MYSQL三种类型探活处理。
HTTP探活
代码坐标HttpHealthCheckProcessor#process
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Override public void process (HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) { HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient() .getInstancePublishInfo(service); if (null == instance) { return ; } try { if (!instance.tryStartCheck()) { return ; } Http healthChecker = (Http) metadata.getHealthChecker(); int ckPort = metadata.isUseInstancePortForCheck() ? instance.getPort() : metadata.getHealthyCheckPort(); URL host = new URL("http://" + instance.getIp() + ":" + ckPort); URL target = new URL(host, healthChecker.getPath()); Map<String, String> customHeaders = healthChecker.getCustomHeaders(); Header header = Header.newInstance(); header.addAll(customHeaders); ASYNC_REST_TEMPLATE.get(target.toString(), header, Query.EMPTY, String.class, new HttpHealthCheckCallback(instance, task, service)); MetricsMonitor.getHttpHealthCheckMonitor().incrementAndGet(); } catch (Throwable e) { instance.setCheckRt(switchDomain.getHttpHealthParams().getMax()); healthCheckCommon.checkFail(task, service, "http:error:" + e.getMessage()); healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task, switchDomain.getHttpHealthParams()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public void onReceive (RestResult<String> result) { instance.setCheckRt(System.currentTimeMillis() - startTime); int httpCode = result.getCode(); if (HttpURLConnection.HTTP_OK == httpCode) { healthCheckCommon.checkOk(task, service, "http:" + httpCode); healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task, switchDomain.getHttpHealthParams()); } else if (HttpURLConnection.HTTP_UNAVAILABLE == httpCode || HttpURLConnection.HTTP_MOVED_TEMP == httpCode) { } else { } } public void checkOk (HealthCheckTaskV2 task, Service service, String msg) { healthStatusSynchronizer.instanceHealthStatusChange(true , task.getClient(), service, instance); }
备注:向节点发起HTTP请求,返回状态码为200表示,将节点标志为健康状态;否则标记非健康状态。
TCP探活
TPC探活的代码详见TcpHealthCheckProcessor,不再详细分析。大体逻辑为通过与注册实例建立channel,不断ping 注册实例的端口是否可用,从而判断服务是否健康。
MYSQL探活
备注:主要检查当前节点为主库,不能访问到从库,可能在主从切换中使用。
总结: 本文就临时节点续约、持久节点心跳、持久节点的探活代码实现做了熟练。相信通过代码走查,对其使用场景和实现不再陌生。