java语言中Nacos注册中心:Client端【更新本地服务】
前言
java语言中Nacos注册中心:Client端【更新本地服务】
0. 环境
- nacos版本:1.4.1
- Spring Cloud : 2020.0.2
- Spring Boot :2.4.4
- Spring Cloud alibaba: 2.2.5.RELEASE
1. 更新本地服务
在spring.factories
: 在这个NacosDiscoveryClientConfiguration
注入了NacosWatch
:
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class })
@AutoConfigureAfter(NacosDiscoveryAutoConfiguration.class)
public class NacosDiscoveryClientConfiguration {
@Bean
public DiscoveryClient nacosDiscoveryClient(
NacosServiceDiscovery nacosServiceDiscovery) {
return new NacosDiscoveryClient(nacosServiceDiscovery);
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
matchIfMissing = true)
public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties,
ObjectProvider<ThreadPoolTaskScheduler> taskExecutorObjectProvider) {
return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties,
taskExecutorObjectProvider);
}
}
NacosWatch
实现了SmartLifecycle
,在环境准备好了之后,会调用start方法
:
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle {
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
event -> new EventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
List<Instance> instances = ((NamingEvent) event)
.getInstances();
Optional<Instance> instanceOptional = selectCurrentInstance(
instances);
instanceOptional.ifPresent(currentInstance -> {
resetIfNeeded(currentInstance);
});
}
}
});
NamingService namingService = nacosServiceManager
.getNamingService(properties.getNacosProperties());
try {
// todo 订阅
namingService.subscribe(properties.getService(), properties.getGroup(),
Arrays.asList(properties.getClusterName()), eventListener);
}
catch (Exception e) {
log.error("namingService subscribe failed, properties:{}", properties, e);
}
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::nacosServicesWatch, this.properties.getWatchDelay());
}
}
}
com.alibaba.nacos.client.naming.NacosNamingService#subscribe()
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","),
listener);
}
com.alibaba.nacos.client.naming.core.HostReactor#subscribe
public void subscribe(String serviceName, String clusters, EventListener eventListener) {
notifier.registerListener(serviceName, clusters, eventListener);
getServiceInfo(serviceName, clusters);
}
核心方法getServiceInfo
:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " failoverReactor.isFailoverSwitch());
// key的格式为:groupId@@微服务名称@@clusters名称
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// todo 从当前Client本地注册表中获取当前服务
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
// 若本地注册表中没有该服务,则创建一个
if (null == serviceObj) {
// 创建一个空的服务(没有任何提供者实例instance的ServiceInfo)
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
// updatingMap 是一个临时缓存,主要使用这个map的key
// map的key不能重复的特性
// 只要这个服务名称在这个map中,说明这个服务正在更新中
updatingMap.put(serviceName, new Object());
// todo 更新本地注册表ServiceName的服务
updateServiceNow(serviceName, clusters);
// 更新完毕,从updatingMap中删除
updatingMap.remove(serviceName);
// 若当前注册表中已经有这个服务,那么查看一下临时map下
// 是否存在该服务,若存在,说明当前服务正在更新中,所以本次操作先等待一段时间,默认5s
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" serviceName ", clusters:" clusters, e);
}
}
}
}
// todo 启动一个定时任务,定时更新本地注册表中的当前服务
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
主要有2个步骤,第一是更新本地服务,第二是开启定时任务。
1.1 更新本地服务
核心方法:updateServiceNow(serviceName, clusters);
:
private void updateServiceNow(String serviceName, String clusters) {
try {
// todo
updateService(serviceName, clusters);
} catch (NacosException e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " serviceName, e);
}
}
public void updateService(String serviceName, String clusters) throws NacosException {
// 本地注册表中获取当前服务
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// todo 提交get请求,获取服务ServiceInfo
// 需要注意,返回的是json串
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result)) {
// todo 将来自Server的ServiceInfo更新到本地
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
-
获取Service对应的所有主机信息:
serverProxy.queryList
:public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); return reqApi(UtilAndComs.nacosUrlBase "/instance/list", params, HttpMethod.GET); }
获取某一服务所有的主机实例:GET请求 url:/nacos/v1/ns/instance/list
-
将来自Server的ServiceInfo更新到本地
processServiceJson(result)
:// todo 来自Server的数据是最新 数据 public ServiceInfo processServiceJson(String json) { // 转成ServiceInfo类 ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); // 从本地注册表中获取对应服务 ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (pushEmptyProtection && !serviceInfo.validate()) { //empty or error push, just ignore return oldService; } boolean changed = false; // 当前注册表中存在该服务,想办法将来自server端的数据更新到本地注册表中 if (oldService != null) { // 为了安全起见,这种情况几乎是不存在的 if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " oldService.getLastRefTime() ", new-t: " serviceInfo.getLastRefTime()); } // 来自server的serviceInfo替换到注册表中的当前服务 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 遍历本地注册表中当前服务所有instance实例 Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { // 将当前遍历instance主机的ip:port作为key,instance为value // 写入到一个新的map中 oldHostMap.put(host.toInetAddr(), host); } // 遍历来自server的当前服务 所有instance实例 Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size()); for (Instance host : serviceInfo.getHosts()) { // 将当前遍历instance主机的ip:port作为key,instance为value // 写入到一个新的map中 newHostMap.put(host.toInetAddr(), host); } // 该set集合中存放的是,两个map(oldHostMap与newHostMap)中都有的ip:port, // 但它们的instance不相同,此时会将来自于server的instance写入到这个set Set<Instance> modHosts = new HashSet<Instance>(); // 只有newHostMap中存在的instance,即在server端新增的instance Set<Instance> newHosts = new HashSet<Instance>(); // 只有oldHostMap中存在的instance,即在server端被删除的instance Set<Instance> remvHosts = new HashSet<Instance>(); List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>( newHostMap.entrySet()); // 遍历来自于server的主机 for (Map.Entry<String, Instance> entry : newServiceHosts) { Instance host = entry.getValue(); // ip:port String key = entry.getKey(); // 在注册表中存在该ip:port,但这两个instance又不同,则将这个instance写入到modHosts if (oldHostMap.containsKey(key) && !StringUtils .equals(host.toString(), oldHostMap.get(key).toString())) { modHosts.add(host); continue; } // 若注册表中不存在该ip:port,说明这个主机是新增的,则将其写入到newHosts if (!oldHostMap.containsKey(key)) { newHosts.add(host); } } // 遍历来自于本地注册表的主机 for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue; } // 注册表中存在,但来自于server的serviceInfo中不存在, // 说明这个instance被干掉了,将其写入到remvHosts if (!newHostMap.containsKey(key)) { remvHosts.add(host); } } if (newHosts.size() > 0) { changed = true; NAMING_LOGGER.info("new ips(" newHosts.size() ") service: " serviceInfo.getKey() " -> " JacksonUtils.toJson(newHosts)); } if (remvHosts.size() > 0) { changed = true; NAMING_LOGGER.info("removed ips(" remvHosts.size() ") service: " serviceInfo.getKey() " -> " JacksonUtils.toJson(remvHosts)); } if (modHosts.size() > 0) { changed = true; // todo 变更心跳信息BeatInfo updateBeatInfo(modHosts); NAMING_LOGGER.info("modified ips(" modHosts.size() ") service: " serviceInfo.getKey() " -> " JacksonUtils.toJson(modHosts)); } serviceInfo.setJsonFromServer(json); // 只要发生了变更,就将这个发生变更的serviceInfo记录到一个缓存队列 if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); } // 本地注册表中没有这个服务,直接将来自Server的serviceInfo写入到本地注册表 } else { changed = true; NAMING_LOGGER.info("init new ips(" serviceInfo.ipCount() ") service: " serviceInfo.getKey() " -> " JacksonUtils.toJson(serviceInfo.getHosts())); // 将来自于server的serviceInfo写入到注册表 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 将这个发生变更的serviceInfo记录到一个缓存队列 NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { NAMING_LOGGER.info("current ips:(" serviceInfo.ipCount() ") service: " serviceInfo.getKey() " -> " JacksonUtils.toJson(serviceInfo.getHosts())); } return serviceInfo; }
主要是更新本地的服务为最新的消息,实例Instance发生变更发送
InstancesChangeEvent
事件。
1.2 开启定时任务
// todo 启动一个定时任务,定时更新本地注册表中的当前服务
scheduleUpdateIfAbsent(serviceName, clusters);
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
// futureMap是一个缓存map,其key为 groupId@@微服务名称@@clusters
// value是一个定时异步操作对象
// 这种结构称之为:双重检测锁,DCL,Double Check Lock
// 该结构是为了避免在并发情况下,多线程重复写入数据
// 该结构的特征:
// 1)有两个不为null的判断
// 2)有共享集合
// 3)有synchronized代码块
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
// 创建一个定时异步操作对象,并启动这个定时任务
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
// 将这个定时异步操作对象写入到缓存map
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
我们看一下UpdateTask
的run方法
:
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private final String clusters;
private final String serviceName;
/**
* the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
*/
private int failCount = 0;
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}
private void incFailCount() {
int limit = 6;
if (failCount == limit) {
return;
}
failCount ;
}
private void resetFailCount() {
failCount = 0;
}
@Override
public void run() {
long delayTime = DEFAULT_DELAY;
try {
// 从本地注册表中获取当前服务
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
// 若本地注册表中不存在该服务,则从server获取到后,更新到本地注册表
if (serviceObj == null) {
// 从server获取当前服务,并更新到本地注册表
updateService(serviceName, clusters);
return;
}
// 处理本地注册表中存在当前服务的情况
// 1)serviceObj.getLastRefTime() 获取到的是当前服务最后被访问的时间,这个时间
// 是来自于本地注册表的,其记录的是所有提供这个服务的instance中最后一个instance
// 被访问的时间
// 2)缓存lastRefTime 记录的是当前instance最后被访问的时间
// 若1)时间 小于 2)时间,说明当前注册表应该更新的
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateService(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
// 将来自于注册表的这个最后访问时间更新到当前client的缓存
lastRefTime = serviceObj.getLastRefTime();
if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task
NAMING_LOGGER.info("update task is stopped, service:" serviceName ", clusters:" clusters);
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis();
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: " serviceName, e);
} finally {
// 开启下一次的定时任务
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
}
}
核心还是updateService(serviceName, clusters)
,前面我们已经分析了。
2. 方法调用图
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanehie
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01