引言
Nacos注册中心的主要流程基本上撸完了,下面开始撸配置中心。本文从示例入手走查了客户端的初始化流程,Listener的注册逻辑和执行逻辑。
内容提要
示例
- 通过示例构建ConfigService、注册了Listener分析其流程
Client初始化概览
- 支持多种获取server地址方式(本地、endpoint)
- 支持多种namespace设置(本地、阿里云)
- 支持超时时间、重试时间等参数设置
- 支持用户名和密码验证
- 长轮询会从BlockingQueue中获取元素,队列有元素立即执行executeConfigListen,队列无元素阻塞5秒钟执行executeConfigListen()
Listener注册逻辑
- client添加Listener后会在cacheMap中缓存CacheData
- cacheMap中key由「dataId+group+tenant」拼接而成
- 每个CacheData会绑定注册的Listener列表
- 每个CacheData会绑定taskId,3000个不同的CacheData对应一个taskId
- 设置isSyncWithServer=false表示 cache md5 data不是来自server同步
- BlockingQueue中添加new Object() 供长轮询判断立即执行使用
配置变更执行逻辑
- 执行逻辑由executeConfigListen方法实现
- 当CacheData从Server同步后,会校验md5是否变更了,变更则回调注册的Listener完成通知
- 注册Listener后会构建与server的RPC通道rpcClient
- 向server发起变更查询请求configChangeListenRequest
- Server端通过比较缓存的md5值,返回client变更的key列表
- Client通过变更的key列表向server发起配置查询请求ConfigQueryRequest
- 获取变更内容,并回调注册的Listener完成通知
- 回调注册的Listener是通过线程池异步执行Runnble Job实现的
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Test public void test01() throws Exception { String serverAddr = "localhost:8848"; String dataId = "test"; String group = "DEFAULT_GROUP"; Properties properties = new Properties(); properties.put("serverAddr", serverAddr); ConfigService configService = NacosFactory.createConfigService(properties); configService.addListener(dataId, group, new Listener() { @Override public void receiveConfigInfo(String configInfo) { System.out.println("receive:" + configInfo); }
@Override public Executor getExecutor() { return null; } }); System.in.read(); }
|
备注: 示例中构建了ConfigService,注入Listener接受server配置变更通知。
Client初始化概览
NacosConfigService构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public NacosConfigService(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); initNamespace(properties); ServerListManager serverListManager = new ServerListManager(properties); serverListManager.start(); this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties); agent = new ServerHttpAgent(serverListManager); }
|
注解@1 设置namespace可以通过properties.setProperty(PropertyKeyConst.NAMESPACE),代码中会兼容阿里云环境,在此忽略,默认为空。
注解@2 初始化namespace、server地址等信息
注解@3 启动主要用于endpoint方式定时获取server地址,当本地传入isFixed=true
注解@4 clientWorker初始化
ClientWorker初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager, final Properties properties) throws NacosException {
this.configFilterChainManager = configFilterChainManager;
init(properties);
agent = new ConfigRpcTransportClient(properties, serverListManager);
ScheduledExecutorService executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker"); t.setDaemon(true); return t; } }); agent.setExecutor(executorService);
agent.start();
}
|
注解@5 初始化超时时间、重试时间等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private void init(Properties properties) {
timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT), Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);
taskPenaltyTime = ConvertUtils .toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME);
this.enableRemoteSyncConfig = Boolean .parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG)); }
|
注解@6 gRPC config agent初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public ConfigTransportClient(Properties properties, ServerListManager serverListManager) {
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { this.encode = Constants.ENCODE; } else { this.encode = encodeTmp.trim(); } this.tenant = properties.getProperty(PropertyKeyConst.NAMESPACE);
this.serverListManager = serverListManager;
this.securityProxy = new SecurityProxy(properties, ConfigHttpClientManager.getInstance().getNacosRestTemplate());
}
|
注解@7 gRPC agent启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public void start() throws NacosException { if (securityProxy.isEnabled()) { securityProxy.login(serverListManager.getServerUrls());
this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { securityProxy.login(serverListManager.getServerUrls()); } }, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
}
startInternal(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Override public void startInternal() throws NacosException { executor.schedule(new Runnable() { @Override public void run() { while (true) { try { listenExecutebell.poll(5L, TimeUnit.SECONDS); executeConfigListen(); } catch (Exception e) { LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e); } } } }, 0L, TimeUnit.MILLISECONDS);
}
|
小结: 线程会一直运行,从BlockingQueue中获取元素。队里不为空,获取后立即执行executeConfigListen();队列为空等待5秒后执行
executeConfigListen()。
Listener注册逻辑
executeConfigListen的逻辑有点复杂,先看示例代码中的添加Listener部分。
1 2 3 4 5 6 7 8 9 10 11
| configService.addListener(dataId, group, new Listener() { @Override public void receiveConfigInfo(String configInfo) { System.out.println("receive:" + configInfo); }
@Override public Executor getExecutor() { return null; } });
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
group = null2defaultGroup(group);
String tenant = agent.getTenant();
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
synchronized (cache) { for (Listener listener : listeners) { cache.addListener(listener); } cache.setSyncWithServer(false); agent.notifyListenConfig(); } }
|
注解@8 构建缓存数据CacheData并放入cacheMap中,缓存的key为 「dataId+group+tenant」例如:test+DEFAULT_GROUP。
每个CacheData会绑定对应的taskId,每3000个CacheData对应一个taskId。其实从后面的代码中可以看出,每个taskId会对应一个gRPC Client。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
CacheData cache = getCache(dataId, group, tenant); if (null != cache) { return cache; } String key = GroupKey.getKeyTenant(dataId, group, tenant); synchronized (cacheMap) { CacheData cacheFromMap = getCache(dataId, group, tenant); if (null != cacheFromMap) { cache = cacheFromMap; cache.setInitializing(true); } else { cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant); int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize(); cache.setTaskId(taskId); if (enableRemoteSyncConfig) { String[] ct = getServerConfig(dataId, group, tenant, 3000L, false); cache.setContent(ct[0]); } } Map<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get()); copy.put(key, cache); cacheMap.set(copy);
} LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
return cache; }
|
具体缓存内容
属性 |
含义 |
name |
ConfigTransportClient名称,config_rpc_client |
configFilterChainManager |
filter拦截链条,可以执行一些列拦截器 |
dataId |
dataId |
group |
group名称,默认为DEFAULT_GROUP |
tenant |
租户名称 |
listeners |
添加的Listener列表,线程安全CopyOnWriteArrayList |
content |
启动时会从本地文件读入,默认为null |
md5 |
content的md5字符串 |
小结:添加监听器逻辑如下:构建CacheData,并缓存在cacheMap中,key是由「dataId+group+tenant」组成;每个CacheData会绑定了Listener列表,也绑定了taskId,3000个不同的CacheData对应一个taskId,对应一个gRPC通道实例;设置isSyncWithServer=false表示 cache md5 data不是来自server同步,BlockingQueue中添加new Object() 供前面提到的长轮询判断使用。
配置变更执行逻辑
上文中提到一个线程一直在轮询,轮询执行executeConfigListen方法,这个方法比较关键。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
| public void executeConfigListen() {
Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16); Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16); long now = System.currentTimeMillis(); boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL; for (CacheData cache : cacheMap.get().values()) { synchronized (cache) { if (cache.isSyncWithServer()) { cache.checkListenerMd5(); if (!needAllSync) { continue; } } if (!CollectionUtils.isEmpty(cache.getListeners())) { if (!cache.isUseLocalConfigInfo()) { List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId())); if (cacheDatas == null) { cacheDatas = new LinkedList<CacheData>(); listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); } cacheDatas.add(cache);
} } else if (CollectionUtils.isEmpty(cache.getListeners())) {
if (!cache.isUseLocalConfigInfo()) { List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId())); if (cacheDatas == null) { cacheDatas = new LinkedList<CacheData>(); removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); } cacheDatas.add(cache);
} } }
}
boolean hasChangedKeys = false; if (!listenCachesMap.isEmpty()) { for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) { String taskId = entry.getKey(); List<CacheData> listenCaches = entry.getValue();
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches); configChangeListenRequest.setListen(true); try { RpcClient rpcClient = ensureRpcClient(taskId); ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy( rpcClient, configChangeListenRequest); if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
Set<String> changeKeys = new HashSet<String>(); if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) { hasChangedKeys = true; for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) { String changeKey = GroupKey .getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant()); changeKeys.add(changeKey); boolean isInitializing = cacheMap.get().get(changeKey).isInitializing(); refreshContentAndCheck(changeKey, !isInitializing); }
}
for (CacheData cacheData : listenCaches) { String groupKey = GroupKey .getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant()); if (!changeKeys.contains(groupKey)) { synchronized (cacheData) { if (!cacheData.getListeners().isEmpty()) { cacheData.setSyncWithServer(true); continue; } } }
cacheData.setInitializing(false); }
} } catch (Exception e) {
LOGGER.error("Async listen config change error ", e); try { Thread.sleep(50L); } catch (InterruptedException interruptedException) { } } } } if (!removeListenCachesMap.isEmpty()) { for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) { String taskId = entry.getKey(); List<CacheData> removeListenCaches = entry.getValue(); ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches); configChangeListenRequest.setListen(false); try { RpcClient rpcClient = ensureRpcClient(taskId); boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest); if (removeSuccess) { for (CacheData cacheData : removeListenCaches) { synchronized (cacheData) { if (cacheData.getListeners().isEmpty()) { ClientWorker.this .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant); } } } }
} catch (Exception e) { LOGGER.error("async remove listen config change error ", e); } try { Thread.sleep(50L); } catch (InterruptedException interruptedException) { } } } if (needAllSync) { lastAllSyncTime = now; } if (hasChangedKeys) { notifyListenConfig(); } }
|
注解@9 isSyncWithServer初始为false,在下文代码中校验结束后会设置为true,表示md5 cache data同步来自server。如果为true会校验Md5.
1 2 3 4 5 6 7
| void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, wrap); } } }
|
注解@9.1 配置内容有变更时,回调到我们示例中注册的Listener中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| private void safeNotifyListener(final String dataId, final String group, final String content, final String type, final String md5, final ManagerListenerWrap listenerWrap) { final Listener listener = listenerWrap.listener; if (listenerWrap.inNotifying) { return; } Runnable job = new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); ClassLoader appClassLoader = listener.getClass().getClassLoader(); try { if (listener instanceof AbstractSharedListener) { AbstractSharedListener adapter = (AbstractSharedListener) listener; adapter.fillContext(dataId, group); } Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent(); listenerWrap.inNotifying = true;
listener.receiveConfigInfo(contentTmp); if (listener instanceof AbstractConfigChangeListener) { Map data = ConfigChangeHandler.getInstance() .parseChangeData(listenerWrap.lastContent, content, type); ConfigChangeEvent event = new ConfigChangeEvent(data); ((AbstractConfigChangeListener) listener).receiveConfigChange(event); listenerWrap.lastContent = content; }
listenerWrap.lastCallMd5 = md5; } catch (NacosException ex) { } catch (Throwable t) { } finally { listenerWrap.inNotifying = false; Thread.currentThread().setContextClassLoader(myClassLoader); } } };
final long startNotify = System.currentTimeMillis(); try { if (null != listener.getExecutor()) { listener.getExecutor().execute(job); } else { try { INTERNAL_NOTIFIER.submit(job); } catch (RejectedExecutionException rejectedExecutionException) { job.run(); } catch (Throwable throwable) { job.run(); } } } catch (Throwable t) { } final long finishNotify = System.currentTimeMillis(); }
|
注解@9.2 回调注册Listener的receiveConfigInfo方法或者receiveConfigChange逻辑
注解@9.3 优先使用我们示例中注册提供的线程池执行job,如果没有设置使用默认线程池「INTERNAL_NOTIFIER」,默认5个线程
备注: 当CacheData从server同步后,会校验md5是否变更了,当变更时会回调到我们注册的Listener完成通知。通知任务被封装成Runnable任务,执行线程池可以自定义,默认为5个线程。
注解@10.1 每个taskId构建rpcClient,例如:taskId= config-0-c70e0314-4770-43f5-add4-f258a4083fd7;结合上下文每3000个CacheData对应一个rpcClient。
注解@10.2 向server发起configChangeListenRequest,server端由ConfigChangeBatchListenRequestHandler处理,还是比较md5
是否变更了,变更后server端返回变更的key列表。
注解@10.3 当server返回变更key列表时执行refreshContentAndCheck方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private void refreshContentAndCheck(CacheData cacheData, boolean notify) { try { String[] ct = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify); cacheData.setContent(ct[0]); if (null != ct[1]) { cacheData.setType(ct[1]); } if (notify) { } cacheData.checkListenerMd5(); } catch (Exception e) { } }
|
注解@10.3.1 向server发起ConfigQueryRequest,查询配置内容
注解@10.3.2 回调注册的Listener逻辑见 注解@9
注解@10.4 key没有变化的,内容由server同步,设置SyncWithServer=true,下一轮逻辑会由 注解@9 部分执行
备注: 从整个注解@10 注册Listener后,会构建与server的RPC通道rpcClient;向server发起变更查询请求configChangeListenRequest,server端通过比较缓存的md5值,返回client变更的key列表;client通过变更的key列表向server发起配置查询请求ConfigQueryRequest,获取变更内容,并回调我们注册的Listener。