Java的Eureka系列:Server端【处理服务注册请求】
前端
Java的Eureka系列:Server端【处理服务注册请求】
本文主要解析下Eureka Server 处理服务注册流程源码,Eureka Server处理服务注册个人认为是2部分,一是将服务实例信息注册到本机的注册表
中(其实就是个map数据结构),二是同步给其他Eureka Server 节点
。
1. Eureka Server 注册表详解
Eureka Server服务启动的时候,会创建一个注册表对象,也就是PeerAwareInstanceRegistryImpl
,在Spring Cloud 封装的Eureka Server中是InstanceRegistry
(继承的PeerAwareInstanceRegistryImpl
)。它里面有个核心的数据结构。
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
这个 registry map
就是存放我们服务实例的地方,它的key 是String 类型
,也就是我们的appName
, value是个Map<String, Lease<InstanceInfo>>
,里面这个map也是个ConcurrentHashMap 。里面这个map的key是个id: instanceId,value为Lease维护的是服务的租约信息。我们来看下Lease 里面都有啥:
// 只有服务端维护的实例租约信息类
public class Lease<T> {
enum Action {
Register, Cancel, Renew
};
public static final int DEFAULT_DURATION_IN_SECS = 90;
// 注册的实例信息
private T holder;
// 实例剔除时间戳
private long evictionTimestamp;
// 实例注册时间
private long registrationTimestamp;
// 实例启动时间,这个是服务注册的时候传输过来的
private long serviceUpTimestamp;
// Make it volatile so that the expiration task would see this quicker
// 实例最近一次的续约时间戳
private volatile long lastUpdateTimestamp;
// 租期,默认是90s,如果客户端有自己的租期,就用客户端自己带过来的
private long duration;
}
这里挑几个重点的介绍
-
holder
: 是你注册的实例信息,host,ip,port等等一些信息 -
lastUpdateTimestamp
:最后一次续约的信息。所谓的续约就是心跳,比如说你一个服务注册到eureka server 上面,你会默认每隔30s时间来续约一次,告诉一下eureka server我这个服务实例还活着,如果你长时间没有向eureka server 进行服务续约,eureka server就会从注册表中将这个服务实例信息剔除。 -
serviceUpTimestamp
:服务启动时间,这个是服务注册的时候传输过来的。 -
duration
:可以认为是租期,默认是90s。如果你客户端有自己的租期,就用客户端带过来的。
2. 服务注册源码
2.1入口
Eureka Server使用的web框架是jersey(这玩意定位就是restful 框架,跟springmvc 差不多,不需要太多的关注)接收服务注册com.netflix.eureka.resources.ApplicationResource
的addInstance
方法。
服务注册:POST请求,path为:“apps/" appName
private final PeerAwareInstanceRegistry registry;
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
// 此处省略一堆代码
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
可以看到调用了注册表PeerAwareInstanceRegistry
的register方法
。
2.2 注册表注册
PeerAwareInstanceRegistry#register
:
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
// 默认90s 持续时间 也就是90s的过期时间
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
// 如果客户端存在过期时间的话,就用客户端的
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// todo 1.调用父类AbstractInstanceRegistry 的注册方法进行注册,更新本地注册表
super.register(info, leaseDuration, isReplication);
// todo 2.集群间节点同步
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
-
先是看看客户端有没有带过来过期时间,如果没有带过来就是用Eureka Server 默认的过期时间90s,表示90s后没有续约的话服务实例信息可能会被从注册表中剔除(
为啥是可能会? 这个与服务剔除机制有关,它会有一定的延时
) -
接着就是调用
父类的register方法
,进行服务注册
,其实就是更新本地服务注册表
。 -
最后调用
replicateToPeers
方法将服务信息同步给集群的其他节点。
2.2.1 register方法
先看一下 super.register(info, leaseDuration, isReplication);
这行代码,也就是调用父类(AbstractInstanceRegistry)的register方法
进行服务注册本地注册表的变更。AbstractInstanceRegistry#register
:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
// 获取读锁
read.lock();
try {
/** ---------------------------第一步--------------------------------------------------*/
// 先从注册表中 获取对应appName的 map集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
// 不存在就创建 并且塞入注册表map中
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
//存在就不会put ,并将存在的那个返回来,不存在的话进行put,返回null
// 试想该地使用putIfAbsent 的作用: double check, 虽然使用了ConcurrentHashMap,
// 保证了元素增删改查没问题,但是还是不能保证 gNewMap 只存在一个。
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 获取该实例id 对应的租约信息
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
// 如果是存在
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
// 网络抖动
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
// 不存在的话
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
// 这个是与自我保护机制有关的
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
// 每有一个新的客户端注册进来,就会 1,表示未来要发送心跳的客户端 1
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews 1;
// todo 更新下 自己能容忍的最少心跳数量
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
// 服务启动时间ServiceUp
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 塞到注册表中
gMap.put(registrant.getId(), lease);
//放到registed 队列中
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() "(" registrant.getId() ")"));
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
"overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
// 从本地缓存中获取该实例状态
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
// 存在的话,设置进去
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
// 实例状态是up的话,设置下服务启动时间
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
/** ---------第2 步 ------------*/
// 塞入最近改变队列中
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
/** --------- 第3步 ------------*/
// 本地缓存失效
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
这个注册方法很长,我按照重要程度大概分为3步。需要注意的是整个方法被 读锁
包起来了。
- 更新注册表信息。
- 将信息塞入最近变更队列中, 更新最后更新时间
- 本地缓存失效(本地缓存很重要,服务发现的时候先会去本地的一级二级缓存中获取,没有的话采取注册表中找)
更新注册表实例信息
// 先从注册表中 获取对应appName的 map集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
// 不存在就创建 并且塞入注册表map中
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
//存在就不会put ,并将存在的那个返回来,不存在的话进行put,返回null
// 试想该地使用putIfAbsent 的作用: double check, 虽然使用了ConcurrentHashMap,
// 保证了元素增删改查没问题,但是还是不能保证 gNewMap 只存在一个。
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
先是从注册表(也就是第一节介绍的那个注册表map )根据appName
获取对应 实例列表map
。
这里这段代码就是先根据appName 获取下 所有实例map,如果不存在的话,就创建。
// 获取该实例id 对应的租约信息
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// 如果是存在
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
// 如不存在的话
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
// 这个是与自我保护机制有关的
if (this.expectedNumberOfClientsSendingRenews > 0) {
// 每有一个新的客户端注册进来,就会 1,表示未来要发送心跳的客户端 1
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews 1;
// 更新下 自己能容忍的最少心跳数量
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
接着就是根据实例id获取下实例租约信息,如果之前就在注册表中存在的话,看看要不要使用之前的instance信息,前提是没有变更过,通过变更时间来判断的。
如果是第一次注册(或者是之前注册过,然后服务下线了)的话,这里面肯定是没有的,就会将expectedNumberOfClientsSendingRenews
自加1 ,expectedNumberOfClientsSendingRenews
这个变量表示的是需要续约的客户端数量。最后调用updateRenewsPerMinThreshold
方法,更新一下下次接收续约(心跳)最少的阈值(这个是与 服务剔除、自我保护机制有关的,保护机制后面我们开一篇文章重点说明)。
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
// 服务启动时间ServiceUp
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 塞到注册表中
gMap.put(registrant.getId(), lease);
重新生成租约信息,如果是注册表中已经存在这个实例,就将原来的实例启动时间 设置到新的租约信息中,然后存储到注册表中。
放入最近改变队列中
// 塞入最近改变队列中
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 更新最后更新时间
registrant.setLastUpdatedTimestamp();
这个放入最近改变队列中,为了后续使用,就放3分钟,到期后就从队列中剔除了。
更新最后更新时间,这个也是非常重要的,后续要根据这个时间进行服务剔除。
失效本地缓存
// 本地缓存失效
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
这块我们知道就可以了,这个本地缓存服务发现的时候会用到,会先从本地的多级缓存中获取这些实例信息。这里实例信息改变了,就需要清除下本地的缓存,保证一致性。
2.2.2 集群间同步replicateToPeers()
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
// isReplication:是否是集群节点间同步复制的请求
if (isReplication) {
// 如果是集群节点间同步复制的请求,最近一分钟处理同步复制请求数 1
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
// 如果本地配置的集群节点为空,则不进行同步复制
// 如果是集群节点间同步复制的请求,则不进行同步复制(设想一下,如果集群节点间不断同步复制,那么会形成一个循环永不结束)
return;
}
// peerEurekaNodes:存放集群节点
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
// 如果 url 表示的是当前主机,就不要复制给自己了
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// todo 同步复制给集群节点处理
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
同步复制给集群节点处理replicateInstanceActionsToPeers
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry;
CurrentRequestVersion.set(Version.V2);
// 此时为 action = Action.Register
switch (action) {
case Cancel:
// 下架
node.cancel(appName, id);
break;
case Heartbeat:
// 心跳续租
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
// 注册
node.register(info);
break;
case StatusUpdate:
// 更改状态
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
// 删除状态
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
} finally {
CurrentRequestVersion.remove();
}
}
node.register()
// PeerEurekaNode.class
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() getLeaseRenewalOf(info);
// batchingDispatcher:批量处理执行器,把任务放入队列中,后台有专门的线程对队列进行处理
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
// 发起注册请求
return replicationClient.register(info);
}
},
expiryTime
);
}
replicationClient.register()
// AbstractJerseyEurekaHttpClient.class
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
// 添加 HEADER_REPLICATION = true 标记,表明这是一个集群节点间的同步复制请求
addExtraHeaders(resourceBuilder);
// 发起请求
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
// JerseyReplicationClient.class
@Override
protected void addExtraHeaders(Builder webResource) {
webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
}
集群节点间同步复制处理下架、心跳续租、删除状态、更新状态都和注册类似。后面不在说明。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanehjc
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01