引言 Nacos在业界注册中心的选型中举足轻重,值得去深入分析和研究。本文就注册和发现客户端的初始话逻辑从源码角度分析其做了什么事情,另外,其服务发现的设计架构可作为我们相似场景设计的模型作为参考。
内容提要 可配置参数
默认命名空间public,可通过System.setProperty(PropertyKeyConst.NAMESPACE, “”)设置
默认web根目录为/nacos/v1/ns,可通过System.setProperty(PropertyKeyConst.CONTEXT_PATH, “”)设置
默认日志文件名称为naming.log,可通过System.setProperty(UtilAndComs.NACOS_NAMING_LOG_NAME, “”)设置
支持通过动态刷新EndPoint获取server地址列表,EndPoint地址可通过properties.setProperty(PropertyKeyConst.ENDPOINT,””)设置,EndPoint刷新的频率是30秒
支持直接传入Server地址properties.setProperty(PropertyKeyConst.SERVER_ADD,””)
服务发现逻辑 服务发现逻辑也就是当实例变更时通知给订阅者逻辑,详细逻辑如下:
当我们开启订阅时subscribe时,会通过调度器生成一个UpdateTask;UpdateTask每个6秒钟(最长为1分钟)会从注册中心获取实例Instance列表,当检测到实例Instance列表有变更时会通过NotifyCenter.publishEvent发布实例变更事件
NotifyCenter是个门面类,对DefaultPublisher的操作,以及DefaultPublisher与关联事件的映射,例如:会绑定ChangeEvent与EventPublisher的关系;上面发布的实例变更事件实际为添加到DefaultPublisher的阻塞队列
DefaultPublisher中维护一个订阅者集合subscribers;DefaultPublisher中维护一个事件阻塞队列queue默认大小为16384;DefaultPublisher同时也是一个线程类初始化时通过for死循环从阻塞队列queue中获取Event,并循环回调订阅者subscribers执行该Event
subscribers执行Event,具体回调到InstancesChangeNotifier#onEvent,进而回调到我们订阅时提供的AbstractEventListener#onEvent,从而实现我们的发现逻辑。
故障转移逻辑
在ServiceInfoHolder初始化初始化时,会生成本地缓存目录 ${user.home}/nacos/naming
每10秒钟将ServiceInfo备份到缓存文件中
故障转移开启生效实例化延迟5秒钟会从本地文件将ServiceInfo读入缓存serviceMap
如果配置参数「namingLoadCacheAtStart」设置为true启动时会从本地缓存文件读取ServiceInfo信息,默认为false
注册与发现示例 服务注册示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Test public void registerTest () throws Exception { System.setProperty("serverAddr" , "127.0.0.1:8848" ); System.setProperty("namespace" , "public" ); Properties properties = new Properties(); properties.setProperty("serverAddr" , System.getProperty("serverAddr" )); properties.setProperty("namespace" , System.getProperty("namespace" )); NamingService naming = NamingFactory.createNamingService(properties); naming.registerInstance("nacos.test.3" , "11.11.11.11" , 8888 , "TEST1" ); System.out.println(naming.getAllInstances("nacos.test.3" )); System.in.read(); }
输出
1 [Instance{instanceId='null', ip='11.11.11.11', port=8888, weight=1.0, healthy=true, enabled=true, ephemeral=true, clusterName='TEST1', serviceName='DEFAULT_GROUP@@nacos.test.3', metadata={}}]
服务发现示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Test public void subscribeTest () throws Exception { System.setProperty("serverAddr" , "127.0.0.1:8848" ); System.setProperty("namespace" , "public" ); Properties properties = new Properties(); properties.setProperty("serverAddr" , System.getProperty("serverAddr" )); properties.setProperty("namespace" , System.getProperty("namespace" )); NamingService naming = NamingFactory.createNamingService(properties); Executor executor = new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { @Override public Thread newThread (Runnable r) { Thread thread = new Thread(r); thread.setName("test-thread" ); return thread; } }); naming.subscribe("nacos.test.3" , new AbstractEventListener() { @Override public Executor getExecutor () { return executor; } @Override public void onEvent (Event event) { System.out.println("订阅到的服务:" + ((NamingEvent) event).getServiceName()); System.out.println("订阅到的实例:" + ((NamingEvent) event).getInstances()); } }); System.in.read(); }
输出
1 2 订阅的服务:nacos.test.3 订阅的实例:[Instance{instanceId='null', ip='11.11.11.11', port=8888, weight=1.0, healthy=true, enabled=true, ephemeral=true, clusterName='TEST1', serviceName='DEFAULT_GROUP@@nacos.test.3', metadata={}}]
NacosNamingService初始化源码分析 反射实例化
1 2 3 Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService" ); Constructor constructor = driverImplClass.getConstructor(Properties.class); NamingService vendorImpl = (NamingService) constructor.newInstance(properties);
1 2 3 public NacosNamingService (Properties properties) throws NacosException { init(properties); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void init (Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); this .namespace = InitUtils.initNamespaceForNaming(properties); InitUtils.initSerialization(); InitUtils.initWebRootContext(properties); initLogName(properties); this .changeNotifier = new InstancesChangeNotifier(); NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384 ); NotifyCenter.registerSubscriber(changeNotifier); this .serviceInfoHolder = new ServiceInfoHolder(namespace, properties); this .clientProxy = new NamingClientProxyDelegate(this .namespace, serviceInfoHolder, properties, changeNotifier); }
代码注解:
注解@1 校验contextPath非法字符,默认路径为/nacos
注解@2 获取命名空间,可以通过System.setProperty和Properties设置命名空间,默认为public
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public static String initNamespaceForNaming (Properties properties) { String tmpNamespace = null ; String isUseCloudNamespaceParsing = properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING, System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING, String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING))); if (Boolean.parseBoolean(isUseCloudNamespaceParsing)) { tmpNamespace = TenantUtil.getUserTenantForAns(); tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call () { String namespace = System.getProperty(SystemPropertyKeyConst.ANS_NAMESPACE); LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace); return namespace; } }); tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call () { String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE); LogUtils.NAMING_LOGGER.info("initializer namespace from System Environment :" + namespace); return namespace; } }); } tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call () { String namespace = System.getProperty(PropertyKeyConst.NAMESPACE); LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace); return namespace; } }); if (StringUtils.isEmpty(tmpNamespace) && properties != null ) { tmpNamespace = properties.getProperty(PropertyKeyConst.NAMESPACE); } tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call () { return UtilAndComs.DEFAULT_NAMESPACE_ID; } }); return tmpNamespace; }
注解@3 设置web root context,其中:
webContext:/nacos
nacosUrlBase:webContext + “/v1/ns”,默认 /nacos/v1/ns
nacosUrlInstance:nacosUrlBase + “/instance”,默认为 /nacos/v1/ns/instance
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void initWebRootContext (Properties properties) { final String webContext = properties.getProperty(PropertyKeyConst.CONTEXT_PATH); TemplateUtils.stringNotEmptyAndThenExecute(webContext, new Runnable() { @Override public void run () { UtilAndComs.webContext = ContextPathUtil.normalizeContextPath(webContext); UtilAndComs.nacosUrlBase = UtilAndComs.webContext + "/v1/ns" ; UtilAndComs.nacosUrlInstance = UtilAndComs.nacosUrlBase + "/instance" ; } }); initWebRootContext(); }
@注解4 自定义日志名称,可以通过properties或者System中设置com.alibaba.nacos.naming.log.filename指定名称,默认为naming.log
1 2 3 4 5 6 7 8 9 10 11 12 private void initLogName (Properties properties) { logName = System.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME); if (StringUtils.isEmpty(logName)) { if (properties != null && StringUtils .isNotEmpty(properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME))) { logName = properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME); } else { logName = "naming.log" ; } } }
小结: 默认为命名空间为public,可以通过System.setProperty(PropertyKeyConst.NAMESPACE, “”) 和Properties设置;默认web root context为 「/nacos/v1/ns」,可以通过参数System.setProperty(PropertyKeyConst.CONTEXT_PATH, “”) 设置;nacos日志文件名称默认为naming.log,可以通过参数System.setProperty(UtilAndComs.NACOS_NAMING_LOG_NAME, “”) 设置。
@注解5 通过NotifyCenter注册了一个Publisher和Subscriber,另起一小节。
NotifyCenter与DefaultPublisher源码分析 NotifyCenter静态块赋值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 static { String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size" ; ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384 ); String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size" ; shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024 ); final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class); Iterator<EventPublisher> iterator = publishers.iterator(); if (iterator.hasNext()) { clazz = iterator.next().getClass(); } else { clazz = DefaultPublisher.class; } publisherFactory = new BiFunction<Class<? extends Event>, Integer, EventPublisher>() { @Override public EventPublisher apply (Class<? extends Event> cls, Integer buffer) { try { EventPublisher publisher = clazz.newInstance(); publisher.init(cls, buffer); return publisher; } catch (Throwable ex) { LOGGER.error("Service class newInstance has error : {}" , ex); throw new NacosRuntimeException(SERVER_ERROR, ex); } } }; try { INSTANCE.sharePublisher = new DefaultSharePublisher(); INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize); } catch (Throwable ex) { LOGGER.error("Service class newInstance has error : {}" , ex); } ThreadUtils.addShutdownHook(new Runnable() { @Override public void run () { shutdown(); } }); }
@注解5.4 DefaultPublisher的初始化,其本身继承了Thread,初始化了ArrayBlockingQueue其大小为ringBufferSize默认16384
1 2 3 4 5 6 7 8 9 10 @Override public void init (Class<? extends Event> type, int bufferSize) { setDaemon(true ); setName("nacos.publisher-" + type.getName()); this .eventType = type; this .queueMaxSize = bufferSize; this .queue = new ArrayBlockingQueue<Event>(bufferSize); start(); }
再看下其线程启动时在做什么事情,可以看到一个for死循环不断的从队列中取出Event,并通知订阅者Subscriber执行Event
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 void openEventHandler () { try { int waitTimes = 60 ; for (; ; ) { if (shutdown || hasSubscriber() || waitTimes <= 0 ) { break ; } ThreadUtils.sleep(1000L ); waitTimes--; } for (; ; ) { if (shutdown) { break ; } final Event event = queue.take(); receiveEvent(event); UPDATER.compareAndSet(this , lastEventSequence, Math.max(lastEventSequence, event.sequence())); } } catch (Throwable ex) { LOGGER.error("Event listener exception : {}" , ex); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void receiveEvent (Event event) { final long currentEventSequence = event.sequence(); for (Subscriber subscriber : subscribers) { if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire" , event.getClass()); continue ; } notifySubscriber(subscriber, event); } } @Override public void notifySubscriber (final Subscriber subscriber, final Event event) { LOGGER.debug("[NotifyCenter] the {} will received by {}" , event, subscriber); final Runnable job = () -> subscriber.onEvent(event); final Executor executor = subscriber.executor(); if (executor != null ) { executor.execute(job); } else { try { job.run(); } catch (Throwable e) { LOGGER.error("Event callback exception: " , e); } } }
@注解5.5 DefaultSharePublisher继承自DefaultPublisher,处理SlowEvent事件,处理架构与DefaultPublisher一致。
绑定ChangeEvent与Publisher
1 2 3 this .changeNotifier = new InstancesChangeNotifier();NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384 )
Publisher的注册过程在于建立InstancesChangeEvent.class与EventPublisher的关系。
默认为Map<String, EventPublisher> publisherMap,key为com.alibaba.nacos.client.naming.event.InstancesChangeEvent,value为DefaultPublisher实例。
1 2 3 4 5 6 7 8 9 10 11 12 public static EventPublisher registerToPublisher (final Class<? extends Event> eventType, final int queueMaxSize) { if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { return INSTANCE.sharePublisher; } final String topic = ClassUtils.getCanonicalName(eventType); synchronized (NotifyCenter.class) { MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize); } return INSTANCE.publisherMap.get(topic); }
将Subscribe注册到Publisher
1 2 3 this .changeNotifier = new InstancesChangeNotifier();NotifyCenter.registerSubscriber(changeNotifier);
上面提到Publisher中维护了一个subscribers集合,这行代码即将InstancesChangeNotifier,添加到该集合,InstancesChangeNotifier继承了Subscriber。
1 2 3 4 5 6 7 8 9 10 11 12 13 private static void addSubscriber (final Subscriber consumer, Class<? extends Event> subscribeType) { final String topic = ClassUtils.getCanonicalName(subscribeType); synchronized (NotifyCenter.class) { MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize); } EventPublisher publisher = INSTANCE.publisherMap.get(topic); publisher.addSubscriber(consumer); }
小结: DefaultPublisher中维护一个订阅者集合subscribers;DefaultPublisher中维护一个事件阻塞队列queue默认大小为16384;DefaultPublisher同时也是一个线程类初始化时通过for死循环从阻塞队列queue中获取Event,并循环回调订阅者subscribers执行该Event;NotifyCenter是操作DefaultPublisher的门面类,会绑定ChangeEvent与EventPublisher的关系,并将InstancesChangeNotifier添加到了DefaultPublisher的subscribers集合。
注解@6 ServiceInfoHolder初始化,另起一小节分析
ServiceInfoHolder初始化源码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 public ServiceInfoHolder (String namespace, Properties properties) { initCacheDir(namespace); if (isLoadCacheAtStart(properties)) { this .serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this .cacheDir)); } else { this .serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16 ); } this .failoverReactor = new FailoverReactor(this , cacheDir); this .pushEmptyProtection = isPushEmptyProtect(properties); }
@注解6.1 生成缓存目录:默认为${user.home}/nacos/naming/public,可以通过System.setProperty(“JM.SNAPSHOT.PATH”)自定义根目录
@注解6.2 启动时是否从缓存目录读取信息,默认false。设置为true会读取缓存文件
@注解6.3 故障转移相关
故障转移目录:${user.home}/nacos/naming/public/failover
故障转移开关文件:${user.home}/nacos/naming/public/failover/00-00—000-VIPSRV_FAILOVER_SWITCH-000—00-00
故障转移关闭:当故障转移开关文件不存在时或者文件的值为0
故障转移开启:当故障转移开关文件存在时或者文件的值为1
故障转移检查:延迟5秒将缓存文件ServiceInfo信息读入缓存(由FailoverReactor#SwitchRefresher负责)
当故障转移开关开启,更新缓存switchParams.put(“failover-mode”, “true”),同时启动FailoverFileReader线程读取目录failover文件ServiceInfo内容。例如:DEFAULT_GROUP%40%40nacos.test.3,这些信息被读入到内存Map<String, ServiceInfo> serviceMap中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 { "name" : "nacos.test.3" , "groupName" : "DEFAULT_GROUP" , "clusters" : "" , "cacheMillis" : 10000 , "hosts" : [ { "ip" : "11.11.11.11" , "port" : 8888 , "weight" : 1 , "healthy" : true , "enabled" : true , "ephemeral" : true , "clusterName" : "TEST1" , "serviceName" : "DEFAULT_GROUP@@nacos.test.3" , "metadata" : {}, "instanceHeartBeatTimeOut" : 15000 , "ipDeleteTimeout" : 30000 , "instanceIdGenerator" : "simple" , "instanceHeartBeatInterval" : 5000 } ], "lastRefTime" : 1618601660155 , "checksum" : "" , "allIPs" : false , "reachProtectionThreshold" : false , "valid" : true }
故障数据备份:每10秒钟备份一次(FailoverReactor#DiskFileWriter),会把ServiceInfo即上面json内容备份到文件中。
SwitchRefresher工作过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 class SwitchRefresher implements Runnable { long lastModifiedMillis = 0L ; @Override public void run () { try { File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH); if (!switchFile.exists()) { switchParams.put("failover-mode" , "false" ); NAMING_LOGGER.debug("failover switch is not found, " + switchFile.getName()); return ; } long modified = switchFile.lastModified(); if (lastModifiedMillis < modified) { lastModifiedMillis = modified; String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,Charset.defaultCharset().toString()); if (!StringUtils.isEmpty(failover)) { String[] lines = failover.split(DiskCache.getLineSeparator()); for (String line : lines) { String line1 = line.trim(); if ("1" .equals(line1)) { switchParams.put("failover-mode" , "true" ); NAMING_LOGGER.info("failover-mode is on" ); new FailoverFileReader().run(); } else if ("0" .equals(line1)) { switchParams.put("failover-mode" , "false" ); NAMING_LOGGER.info("failover-mode is off" ); } } } else { switchParams.put("failover-mode" , "false" ); } } } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to read failover switch." , e); } } }
FailoverFileReader工作过程,主要将Json内容读取缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class FailoverFileReader implements Runnable { String dataString = ConcurrentDiskUtil .getFileContent(file, Charset.defaultCharset().toString()); reader = new BufferedReader(new StringReader(dataString)); String json; if ((json = reader.readLine()) != null ) { try { dom = JacksonUtils.toObj(json, ServiceInfo.class); } catch (Exception e) { NAMING_LOGGER.error("[NA] error while parsing cached dom : " + json, e); } } if (!CollectionUtils.isEmpty(dom.getHosts())) { domMap.put(dom.getKey(), dom); } if (domMap.size() > 0 ) { serviceMap = domMap; } }
DiskFileWriter工作过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class DiskFileWriter extends TimerTask { @Override public void run () { Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap(); for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) { ServiceInfo serviceInfo = entry.getValue(); if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils .equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils .equals(serviceInfo.getName(), "00-00---000-ENV_CONFIGS-000---00-00" ) || StringUtils .equals(serviceInfo.getName(), "vipclient.properties" ) || StringUtils .equals(serviceInfo.getName(), "00-00---000-ALL_HOSTS-000---00-00" )) { continue ; } DiskCache.write(serviceInfo, failoverDir); } } } public static void write (ServiceInfo dom, String dir) { try { makeSureCacheDirExists(dir); File file = new File(dir, dom.getKeyEncoded()); if (!file.exists()) { if (!file.createNewFile() && !file.exists()) { throw new IllegalStateException("failed to create cache file" ); } } StringBuilder keyContentBuffer = new StringBuilder(); String json = dom.getJsonFromServer(); if (StringUtils.isEmpty(json)) { json = JacksonUtils.toJson(dom); } keyContentBuffer.append(json); ConcurrentDiskUtil.writeFileContent(file, keyContentBuffer.toString(), Charset.defaultCharset().toString()); } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to write cache for dom:" + dom.getName(), e); } }
小结: 在ServiceInfoHolder初始化初始化时,会生成本地缓存目录 ${user.home}/nacos/naming;每10秒钟将ServiceInfo备份到缓存文件中;故障转移开启生效实例化延迟5秒钟会从本地文件将ServiceInfo读入缓存serviceMap;如果配置参数「namingLoadCacheAtStart」设置为true启动时会从本地缓存文件读取ServiceInfo信息,默认为false。
注解@7 注册客户端委派代理类初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 public NamingClientProxyDelegate (String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException { this .serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this , changeNotifier); this .serverListManager = new ServerListManager(properties); this .serviceInfoHolder = serviceInfoHolder; this .securityProxy = new SecurityProxy(properties, NamingHttpClientManager.getInstance().getNacosRestTemplate()); initSecurityProxy(); this .httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder); this .grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder); }
注解@7.1 ServiceInfoUpdateService初始化,另起一章分析
ServiceInfoUpdateService初始化源码分析 1 2 3 4 5 6 7 8 9 public ServiceInfoUpdateService (Properties properties, ServiceInfoHolder serviceInfoHolder, NamingClientProxy namingClientProxy, InstancesChangeNotifier changeNotifier) { this .executor = new ScheduledThreadPoolExecutor(initPollingThreadCount(properties), new NameThreadFactory("com.alibaba.nacos.client.naming.updater" )); this .serviceInfoHolder = serviceInfoHolder; this .namingClientProxy = namingClientProxy; this .changeNotifier = changeNotifier; }
注解@7.1.1 定时任务调度执行器,线程池大小为处理器核数的一半,可以通过参数”namingPollingThreadCount”指定
职责:调度器用于执行UpdateTask,延迟1秒执行。
1 2 3 private synchronized ScheduledFuture<?> addTask(UpdateTask task) { return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); }
UpdateTask执行逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public void run () { long delayTime = DEFAULT_DELAY; try { if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) { NAMING_LOGGER.info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters); return ; } ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey); if (serviceObj == null ) { serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0 , false ); serviceInfoHolder.processServiceInfo(serviceObj); lastRefTime = serviceObj.getLastRefTime(); return ; } if (serviceObj.getLastRefTime() <= lastRefTime) { serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0 , false ); serviceInfoHolder.processServiceInfo(serviceObj); } lastRefTime = serviceObj.getLastRefTime(); if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return ; } delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE; resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e); } finally { executor.schedule(this , Math.min(delayTime << failCount, DEFAULT_DELAY * 60 ), TimeUnit.MILLISECONDS); } }
备注: UpdateTask主要逻辑为如果服务缓存刷新时间过期,则会从注册中心查询最新服务信息,同时刷新缓存更新时间。并定时调度去更新服务注册信息,更新的频率最小为6秒,最长为1分钟。当更新无异常时更新频率为6秒,当发生异常时最长频率为1分钟。
另外如果过期还会调用serviceInfoHolder#processServiceInfo处理服务信息,下面看下其执行逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public ServiceInfo processServiceInfo (ServiceInfo serviceInfo) { String serviceKey = serviceInfo.getKey(); if (serviceKey == null ) { return null ; } ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (isEmptyOrErrorPush(serviceInfo)) { return oldService; } serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); boolean changed = isChangedServiceInfo(oldService, serviceInfo); if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) { serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo)); } MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); } return serviceInfo; }
备注: 服务实例信息会被缓存在serviceInfoMap中,key为「goupName@@ServiceName」例如:DEFAULT_GROUP@@nacos.test.3;serviceInfoMap的大小会通过prometheus simpleclient统计监控;如果服务信息有更新,会通过 NotifyCenter.publishEvent发布实例变更事件,订阅该服务的的订阅者Subscribes将会处理该事件;将缓存服务信息保存到本地文件容灾。
下面看下如何判断服务实例信息变更的:实例信息修改、删除、新增均属于实例变更。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 private boolean isChangedServiceInfo (ServiceInfo oldService, ServiceInfo newService) { if (null == oldService) { NAMING_LOGGER.info("init new ips(" + newService.ipCount() + ") service: " + newService.getKey() + " -> " + JacksonUtils.toJson(newService.getHosts())); return true ; } if (oldService.getLastRefTime() > newService.getLastRefTime()) { NAMING_LOGGER .warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + newService .getLastRefTime()); } boolean changed = false ; Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { oldHostMap.put(host.toInetAddr(), host); } Map<String, Instance> newHostMap = new HashMap<String, Instance>(newService.getHosts().size()); for (Instance host : newService.getHosts()) { newHostMap.put(host.toInetAddr(), host); } Set<Instance> modHosts = new HashSet<Instance>(); Set<Instance> newHosts = new HashSet<Instance>(); Set<Instance> remvHosts = new HashSet<Instance>(); List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>( newHostMap.entrySet()); for (Map.Entry<String, Instance> entry : newServiceHosts) { Instance host = entry.getValue(); String key = entry.getKey(); if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) { modHosts.add(host); continue ; } if (!oldHostMap.containsKey(key)) { newHosts.add(host); } } for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue ; } if (!newHostMap.containsKey(key)) { remvHosts.add(host); } } if (newHosts.size() > 0 ) { changed = true ; NAMING_LOGGER .info("new ips(" + newHosts.size() + ") service: " + newService.getKey() + " -> " + JacksonUtils .toJson(newHosts)); } if (remvHosts.size() > 0 ) { changed = true ; NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + newService.getKey() + " -> " + JacksonUtils.toJson(remvHosts)); } if (modHosts.size() > 0 ) { changed = true ; NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + newService.getKey() + " -> " + JacksonUtils.toJson(modHosts)); } return changed; }
接着看下UpdateTask是什么时候被加入的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void scheduleUpdateIfAbsent (String serviceName, String groupName, String clusters) { String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); if (futureMap.get(serviceKey) != null ) { return ; } synchronized (futureMap) { if (futureMap.get(serviceKey) != null ) { return ; } ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters)); futureMap.put(serviceKey, future); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public ServiceInfo subscribe (String serviceName, String groupName, String clusters) throws NacosException { String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName); String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters); ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey); if (null == result) { result = grpcClientProxy.subscribe(serviceName, groupName, clusters); } serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters); serviceInfoHolder.processServiceInfo(result); return result; }
备注: 也就是在我们开启订阅subscribe时就会生成一个UpdateTask被调度。
实例列表变更时会生成实例变更事件并通知订阅者执行,下面看下Subscribe是如何执行该事件的:
1 2 3 4 5 6 7 8 9 10 11 12 naming.subscribe("nacos.test.3" , new AbstractEventListener() { @Override public Executor getExecutor () { return executor; } @Override public void onEvent (Event event) { System.out.println("订阅到的1:" + ((NamingEvent) event).getServiceName()); System.out.println("订阅到的2:" + ((NamingEvent) event).getInstances()); } });
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void registerListener (String groupName, String serviceName, String clusters, EventListener listener) { String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); if (eventListeners == null ) { synchronized (lock) { eventListeners = listenerMap.get(key); if (eventListeners == null ) { eventListeners = new ConcurrentHashSet<EventListener>(); listenerMap.put(key, eventListeners); } } } eventListeners.add(listener); }
备注: 示例中传入了AbstractEventListener,同时将该EventListener缓存到listenerMap,key为「goupName@@ServiceName」例如:DEFAULT_GROUP@@nacos.test.3。
变更事件会通知到Subcribes,具体由InstancesChangeNotifier#onEvent执行,具体为使用示例中的getExecutor()执行Event。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public void onEvent (InstancesChangeEvent event) { String key = ServiceInfo .getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters()); ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); if (CollectionUtils.isEmpty(eventListeners)) { return ; } for (final EventListener listener : eventListeners) { final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event); if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null ) { ((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent)); } else { listener.onEvent(namingEvent); } } }
小结: 当我们开启订阅时subscribe时,会通过调度器生成一个UpdateTask;UpdateTask每个6秒钟(最长为1分钟)会从注册中心获取实例Instance列表,如果有变更会通过NotifyCenter.publishEvent发布实例变更事件,相关订阅者Subscribe执行该事件,也就是回调到了我们自己的onEvent方法中;另外serviceInfoMap大小通过prometheus simpleclient暴露监控指标
ServerListManager 初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public NamingClientProxyDelegate (String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException { this .serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this , changeNotifier); this .serverListManager = new ServerListManager(properties); this .serviceInfoHolder = serviceInfoHolder; this .securityProxy = new SecurityProxy(properties, NamingHttpClientManager.getInstance().getNacosRestTemplate()); initSecurityProxy(); this .httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder); this .grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder); }
@注解7.2: 东西有点多,接着来ServerListManager初始化:
1 2 3 4 5 6 public ServerListManager (Properties properties) { initServerAddr(properties); if (!serverList.isEmpty()) { currentIndex.set(new Random().nextInt(serverList.size())); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private void initServerAddr (Properties properties) { this .endpoint = InitUtils.initEndpoint(properties); if (StringUtils.isNotEmpty(endpoint)) { this .serversFromEndpoint = getServerListFromEndpoint(); refreshServerListExecutor = new ScheduledThreadPoolExecutor(1 , new NameThreadFactory("com.alibaba.nacos.client.naming.server.list.refresher" )); refreshServerListExecutor .scheduleWithFixedDelay(this ::refreshServerListIfNeed, 0 , refreshServerListInternal, TimeUnit.MILLISECONDS); } else { String serverListFromProps = properties.getProperty(PropertyKeyConst.SERVER_ADDR); if (StringUtils.isNotEmpty(serverListFromProps)) { this .serverList.addAll(Arrays.asList(serverListFromProps.split("," ))); if (this .serverList.size() == 1 ) { this .nacosDomain = serverListFromProps; } } } }
注解@7.2.1 可配置固定Endpoint的方式获取Nacos Server地址,可以通过properties.setProperty(PropertyKeyConst.ENDPOINT,””)来设置。Endpoint可以是一个服务的域名,client每隔30秒会向「http://“ + endpoint + “/nacos/serverlist」发送请求获取server list并更新列表。除了配置Endpoint外,可以通过properties.setProperty(PropertyKeyConst.SERVER_ADD,””)将nacos server地址传入到客户端。
@注解7.2.2 客户端会随机选择nacos server的一个地址
小结: 在获取Nacos Server地址列表时,支持直接传入properties.setProperty(PropertyKeyConst.SERVER_ADD,””)和通过动态刷新EndPoint来更新,刷新频率为30秒。
@注解7.3 安全代理SecurityProxy初始化,解析用户名和密码,并登陆每台server获取token;这块先不做深入分析
得,本文有点长了,剩下两个初始化,NamingHttpClientProxy和NamingGrpcClientProxy下篇接着撸。