• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Java的Eureka系列:Server端【处理服务注册请求】

武飞扬头像
juejin
帮助122

前端

Java的Eureka系列:Server端【处理服务注册请求】

本文主要解析下Eureka Server 处理服务注册流程源码,Eureka Server处理服务注册个人认为是2部分,一是将服务实例信息注册到本机的注册表中(其实就是个map数据结构),二是同步给其他Eureka Server 节点

1. Eureka Server 注册表详解

Eureka Server服务启动的时候,会创建一个注册表对象,也就是PeerAwareInstanceRegistryImpl ,在Spring Cloud 封装的Eureka Server中是InstanceRegistry (继承的PeerAwareInstanceRegistryImpl)。它里面有个核心的数据结构。

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

这个 registry map就是存放我们服务实例的地方,它的key 是String 类型,也就是我们的appNamevalue是个Map<String, Lease<InstanceInfo>>,里面这个map也是个ConcurrentHashMap 。里面这个map的key是个id: instanceId,value为Lease维护的是服务的租约信息。我们来看下Lease 里面都有啥:

// 只有服务端维护的实例租约信息类
public class Lease<T> {

    enum Action {
        Register, Cancel, Renew
    };

    public static final int DEFAULT_DURATION_IN_SECS = 90;

    // 注册的实例信息
    private T holder;
    // 实例剔除时间戳
    private long evictionTimestamp;
    // 实例注册时间
    private long registrationTimestamp;
    // 实例启动时间,这个是服务注册的时候传输过来的
    private long serviceUpTimestamp;
    // Make it volatile so that the expiration task would see this quicker
    // 实例最近一次的续约时间戳
    private volatile long lastUpdateTimestamp;
    // 租期,默认是90s,如果客户端有自己的租期,就用客户端自己带过来的
    private long duration;
}

这里挑几个重点的介绍

  • holder : 是你注册的实例信息,host,ip,port等等一些信息

  • lastUpdateTimestamp :最后一次续约的信息。所谓的续约就是心跳,比如说你一个服务注册到eureka server 上面,你会默认每隔30s时间来续约一次,告诉一下eureka server我这个服务实例还活着,如果你长时间没有向eureka server 进行服务续约,eureka server就会从注册表中将这个服务实例信息剔除。

  • serviceUpTimestamp :服务启动时间,这个是服务注册的时候传输过来的。

  • duration :可以认为是租期,默认是90s。如果你客户端有自己的租期,就用客户端带过来的。

2. 服务注册源码

2.1入口

Eureka Server使用的web框架是jersey(这玩意定位就是restful 框架,跟springmvc 差不多,不需要太多的关注)接收服务注册com.netflix.eureka.resources.ApplicationResourceaddInstance 方法。

服务注册:POST请求,path为:“apps/" appName

private final PeerAwareInstanceRegistry registry;
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
	// 此处省略一堆代码
    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible
}

可以看到调用了注册表PeerAwareInstanceRegistryregister方法

2.2 注册表注册

PeerAwareInstanceRegistry#register:

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    // 默认90s 持续时间 也就是90s的过期时间
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    // 如果客户端存在过期时间的话,就用客户端的
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // todo 1.调用父类AbstractInstanceRegistry 的注册方法进行注册,更新本地注册表
    super.register(info, leaseDuration, isReplication);
    // todo 2.集群间节点同步
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
  • 先是看看客户端有没有带过来过期时间,如果没有带过来就是用Eureka Server 默认的过期时间90s,表示90s后没有续约的话服务实例信息可能会被从注册表中剔除(为啥是可能会? 这个与服务剔除机制有关,它会有一定的延时

  • 接着就是调用父类的register方法进行服务注册,其实就是更新本地服务注册表

  • 最后调用replicateToPeers 方法将服务信息同步给集群的其他节点。

2.2.1 register方法

先看一下 super.register(info, leaseDuration, isReplication); 这行代码,也就是调用父类(AbstractInstanceRegistry)的register方法进行服务注册本地注册表的变更。AbstractInstanceRegistry#register:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    // 获取读锁
    read.lock();
    try {
        /** ---------------------------第一步--------------------------------------------------*/
        // 先从注册表中 获取对应appName的 map集合
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        // 不存在就创建 并且塞入注册表map中
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            //存在就不会put ,并将存在的那个返回来,不存在的话进行put,返回null
            // 试想该地使用putIfAbsent 的作用: double check, 虽然使用了ConcurrentHashMap,
            // 保证了元素增删改查没问题,但是还是不能保证 gNewMap 只存在一个。
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        // 获取该实例id 对应的租约信息
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        // 如果是存在
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

            // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
            // InstanceInfo instead of the server local copy.
            // 网络抖动
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"  
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        // 不存在的话
        } else {
            // The lease does not exist and hence it is a new registration
            synchronized (lock) {
                // 这个是与自我保护机制有关的
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    // Since the client wants to register it, increase the number of clients sending renews
                    // 每有一个新的客户端注册进来,就会 1,表示未来要发送心跳的客户端 1
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews   1;
                    // todo 更新下 自己能容忍的最少心跳数量
                    updateRenewsPerMinThreshold();
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }

        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            // 服务启动时间ServiceUp
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // 塞到注册表中
        gMap.put(registrant.getId(), lease);
        //放到registed 队列中
        recentRegisteredQueue.add(new Pair<Long, String>(
                System.currentTimeMillis(),
                registrant.getAppName()   "("   registrant.getId()   ")"));
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                              "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        // 从本地缓存中获取该实例状态
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            // 存在的话,设置进去
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        // 实例状态是up的话,设置下服务启动时间
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        registrant.setActionType(ActionType.ADDED);
        /** ---------第2 步 ------------*/
        // 塞入最近改变队列中
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
        /** --------- 第3步 ------------*/
        // 本地缓存失效
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
}

这个注册方法很长,我按照重要程度大概分为3步。需要注意的是整个方法被 读锁 包起来了。

  • 更新注册表信息。
  • 将信息塞入最近变更队列中, 更新最后更新时间
  • 本地缓存失效(本地缓存很重要,服务发现的时候先会去本地的一级二级缓存中获取,没有的话采取注册表中找)

更新注册表实例信息

// 先从注册表中 获取对应appName的 map集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
// 不存在就创建 并且塞入注册表map中
if (gMap == null) {
    final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
    //存在就不会put ,并将存在的那个返回来,不存在的话进行put,返回null
    // 试想该地使用putIfAbsent 的作用: double check, 虽然使用了ConcurrentHashMap,
    // 保证了元素增删改查没问题,但是还是不能保证 gNewMap 只存在一个。
    gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
    if (gMap == null) {
        gMap = gNewMap;
    }
}

先是从注册表(也就是第一节介绍的那个注册表map )根据appName 获取对应 实例列表map

这里这段代码就是先根据appName 获取下 所有实例map,如果不存在的话,就创建。

// 获取该实例id 对应的租约信息
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// 如果是存在
if (existingLease != null && (existingLease.getHolder() != null)) {
    Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
    Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
    if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
        logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"  
                " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
        logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
        registrant = existingLease.getHolder();
    }
    // 如不存在的话
} else {
    // The lease does not exist and hence it is a new registration
    synchronized (lock) {
        // 这个是与自我保护机制有关的
        if (this.expectedNumberOfClientsSendingRenews > 0) {
            // 每有一个新的客户端注册进来,就会 1,表示未来要发送心跳的客户端 1
            // Since the client wants to register it, increase the number of clients sending renews
            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews   1;
            // 更新下 自己能容忍的最少心跳数量
            updateRenewsPerMinThreshold();
        }
    }
    logger.debug("No previous lease information found; it is new registration");
}

接着就是根据实例id获取下实例租约信息,如果之前就在注册表中存在的话,看看要不要使用之前的instance信息,前提是没有变更过,通过变更时间来判断的。

如果是第一次注册(或者是之前注册过,然后服务下线了)的话,这里面肯定是没有的,就会将expectedNumberOfClientsSendingRenews 自加1 ,expectedNumberOfClientsSendingRenews 这个变量表示的是需要续约的客户端数量。最后调用updateRenewsPerMinThreshold 方法,更新一下下次接收续约(心跳)最少的阈值(这个是与 服务剔除、自我保护机制有关的,保护机制后面我们开一篇文章重点说明)。

Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
    // 服务启动时间ServiceUp
    lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 塞到注册表中
gMap.put(registrant.getId(), lease);

重新生成租约信息,如果是注册表中已经存在这个实例,就将原来的实例启动时间 设置到新的租约信息中,然后存储到注册表中。

放入最近改变队列中

// 塞入最近改变队列中
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 更新最后更新时间
registrant.setLastUpdatedTimestamp();

这个放入最近改变队列中,为了后续使用,就放3分钟,到期后就从队列中剔除了。

更新最后更新时间,这个也是非常重要的,后续要根据这个时间进行服务剔除。

失效本地缓存

 // 本地缓存失效
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());

这块我们知道就可以了,这个本地缓存服务发现的时候会用到,会先从本地的多级缓存中获取这些实例信息。这里实例信息改变了,就需要清除下本地的缓存,保证一致性。

2.2.2 集群间同步replicateToPeers()

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        // isReplication:是否是集群节点间同步复制的请求
        if (isReplication) {
            // 如果是集群节点间同步复制的请求,最近一分钟处理同步复制请求数 1
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            // 如果本地配置的集群节点为空,则不进行同步复制
            // 如果是集群节点间同步复制的请求,则不进行同步复制(设想一下,如果集群节点间不断同步复制,那么会形成一个循环永不结束)
            return;
        }

        // peerEurekaNodes:存放集群节点
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            // 如果 url 表示的是当前主机,就不要复制给自己了
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // todo 同步复制给集群节点处理
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

同步复制给集群节点处理replicateInstanceActionsToPeers

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry;
        CurrentRequestVersion.set(Version.V2);
        // 此时为 action = Action.Register
        switch (action) {
            case Cancel:
                // 下架
                node.cancel(appName, id);
                break;
            case Heartbeat:
                // 心跳续租
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                // 注册
                node.register(info);
                break;
            case StatusUpdate:
                // 更改状态
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                // 删除状态
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    } finally {
        CurrentRequestVersion.remove();
    }
}

node.register()

// PeerEurekaNode.class
public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis()   getLeaseRenewalOf(info);
    // batchingDispatcher:批量处理执行器,把任务放入队列中,后台有专门的线程对队列进行处理
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    // 发起注册请求
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}

replicationClient.register()

// AbstractJerseyEurekaHttpClient.class
public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/"   info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        // 添加 HEADER_REPLICATION = true 标记,表明这是一个集群节点间的同步复制请求
        addExtraHeaders(resourceBuilder);
        // 发起请求
        response = resourceBuilder
                .header("Accept-Encoding", "gzip")
                .type(MediaType.APPLICATION_JSON_TYPE)
                .accept(MediaType.APPLICATION_JSON)
                .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                    response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}


// JerseyReplicationClient.class
@Override
protected void addExtraHeaders(Builder webResource) {
    webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
}

集群节点间同步复制处理下架、心跳续租、删除状态、更新状态都和注册类似。后面不在说明。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanehjc
系列文章
更多 icon
同类精品
更多 icon
继续加载