• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Java的Eureka系列:Server端【处理服务续约和服务下架请求】

武飞扬头像
juejin
帮助134

前端

Java的Eureka系列:Server端【处理服务续约和服务下架请求】

0. 环境

  • eureka版本:1.10.11
  • Spring Cloud : 2020.0.2
  • Spring Boot :2.4.4

1. 服务续约

服务续约请求:PUT请求, path为:apps/{appName}/{instanceId}

1.1 InstanceResource

服务端处理客户端心跳续租请求,在 InstanceResource 类的 renewLease() 方法

// InstanceResource.class
@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    // isReplication:是否是集群节点间的同步复制请求,如果是客户端服务实例发送续租请求为 null ,如果是集群节点同步复制为 true
    // overriddenStatus:覆盖状态
    // status:真实的状态
    // lastDirtyTimestamp:客户端保存的最新修改时间戳(脏)


    boolean isFromReplicaNode = "true".equals(isReplication);
    // todo 调用心跳续租方法
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

    // Not found in the registry, immediately ask for a register
    if (!isSuccess) {
        logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
        // 本地注册表中没有相应实例信息,返回404
        // 客户端在发起续租心跳请求后收到服务端返回404,会立即再进行注册
        return Response.status(Status.NOT_FOUND).build();
    }
    // Check if we need to sync based on dirty time stamp, the client
    // instance might have changed some value

    // 校验,如果我们需要根据客服端的最新修改时间戳(脏)同步,客户端实例可能需要更改数据
    Response response;
    if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
        // 客户端请求中的最新修改时间戳(脏)不为空 且 本地配置的 shouldSyncWhenTimestampDiffers = true 时
        // shouldSyncWhenTimestampDiffers:检查最新修改时间戳(脏)不同时是否同步实例信息
        //  todo 检查本地的和客户端保存的最新修改时间戳(脏),根据具体情况返回相应的请求结果
        response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        // Store the overridden status since the validation found out the node that replicates wins
        if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                && (overriddenStatus != null)
                && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                && isFromReplicaNode) {
            // 检查后,如果满足下列三个条件:
            //     1. 是集群节点同步复制到本地
            //     2. 本地注册表中相应实例的最新修改时间戳(脏)小于同步复制过来的
            //     3. 同步复制心跳续租请求中的 overriddenStatus 不为 null 也不为 UNKNOWN
            //  todo 更新本地相应的实例信息(覆盖状态)
            registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
        }
    } else {
        // 返回成功200
        response = Response.ok().build();
    }
    logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
    return response;
}

1.2 处理心跳续租

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew:

public boolean renew(final String appName, final String id, final boolean isReplication) {

    // todo 调用父类AbstractInstanceRegistry 的renew 方法,变更本地注册表服务实例租约信息
    if (super.renew(appName, id, isReplication)) {
        // todo 服务续约请求同步给集群的其他节点
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

1.2.1 super.renew()

// AbstractInstanceRegistry.class
public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    // 获取服务租约信息
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        // 获取实例租约信息
        leaseToRenew = gMap.get(id);
    }
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
      // 如果本地注册表中找不到实例租约信息,则返回 false  
      return false;
    } else {
        // 获取实例信息
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // touchASGCache(instanceInfo.getASGName());
            // 根据规则,计算出 overriddenInstanceStatus
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
              	// 计算后覆盖状态如果为 UNKNOWN ,返回 false
                // 覆盖状态为 UNKNOWN 的情况:
                //     1. overiddenStatusMap 中相应实例的 overiddenStatus 为 UNKNOWN
                //     2. 本地注册表中实例的覆盖状态为 UNKNOWN
                // 出现的这种状况的原因:
                //     1. 客户端发起过删除状态请求(此时 overiddenStatusMap 中取出来为 null,overiddenStatus 为 UNKNOWN )
                //     2. 客户端发起过修改状态请求(通过 actuator 设置 overiddenStatusMap 中相应的 overiddenStatus 为 UNKNOWN )
                // 刚注册的情况 overiddenStatusMap 中取出来为 null,只有通过外部修改状态才会有值
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                          "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                  "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                instanceInfo.getOverriddenStatus().name(),
                                instanceInfo.getId());
                // 实例信息的实例状态和覆盖状态不一致时,将实例状态的值设置为覆盖状态的值,并且不记录本地实例信息中的最新修改时间戳(脏)
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

            }
        }
        // 核心,最近一分钟处理的心跳续租数 1
        renewsLastMin.increment();
        // 核心,更新续租到期时间
        leaseToRenew.renew();
        return true;
    }
}

// Lease.class
public void renew() {
    // 更新续租到期时间,默认为当前时间 90s
    lastUpdateTimestamp = System.currentTimeMillis()   duration;
}

1.2.3 replicateToPeers()

// PeerAwareInstanceRegistryImpl.class
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    // ......
    try {
        // ......
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // ......
            // 同步复制心跳续租给集群节点
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

// PeerAwareInstanceRegistryImpl.class
private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        // 此时 action = Action.Heartbeat
        switch (action) {
            // ......
            case Heartbeat:
                // 心跳
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                // todo 同步复制心跳给当前集群节点 node
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            // ......
    } catch (Throwable t) {
        // ......
    }
}

1.2.4 node.heartbeat()

// PeerEurekaNode.class
public void heartbeat(final String appName, final String id,
                      final InstanceInfo info, final InstanceStatus overriddenStatus,
                      boolean primeConnection) throws Throwable {
    if (primeConnection) {
        // We do not care about the result for priming request.
        // PeerEurekaNode 类下的 heartbeat()方法被调用有两处:
        //     1. 服务端处理客户端续租请求时,同步复制给集群节点, primeConnection = false
        //     2. Aws 调用, primeConnection = true
        replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        return;
    }
    // 同步复制任务,实现了发送请求的处理方法和请求失败后的处理方法
    ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
        @Override
        public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
            // 发起续租请求
            return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        }

        @Override
        public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
            // 简单打印相关日志
            super.handleFailure(statusCode, responseEntity);
            if (statusCode == 404) {
                logger.warn("{}: missing entry.", getTaskName());
                if (info != null) {
                    logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
                            getTaskName(), info.getId(), info.getStatus());
                    // 请求返回404,则立即重新发起注册请求同步复制到集群节点
                    // 返回404是因为 本地注册表中相应实例的最新修改时间戳(脏) 大于 集群节点的最新修改时间戳(脏)
                    // 表明本地服务端的实例信息比集群节点的新
                    register(info);
                }
            } else if (config.shouldSyncWhenTimestampDiffers()) {
                // “实例最新修改时间戳(脏)不同时是否同步实例信息”配置开启时 
                InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
                if (peerInstanceInfo != null) {
                    // 当 本地注册表中相应实例的最新修改时间戳(脏) 小于 集群节点的最新修改时间戳(脏)
                    // 表明本地服务端的实例信息比集群节点的旧
                    // todo 根据请求返回响应体,同步集群节点的实例信息到本地注册表
                    syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                }
            }
        }
    };
    long expiryTime = System.currentTimeMillis()   getLeaseRenewalOf(info);
    // batchingDispatcher 执行器,把任务放入队列中,后台有专门的线程对队列进行处理
    batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}

1.2.5 syncInstancesIfTimestampDiffers()

// PeerEurekaNode.class
private void syncInstancesIfTimestampDiffers(String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) {
    try {
        if (infoFromPeer != null) {
            logger.warn("Peer wants us to take the instance information from it, since the timestamp differs,"
                      "Id : {} My Timestamp : {}, Peer's timestamp: {}", id, info.getLastDirtyTimestamp(), infoFromPeer.getLastDirtyTimestamp());

            if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) {
                logger.warn("Overridden Status info -id {}, mine {}, peer's {}", id, info.getOverriddenStatus(), infoFromPeer.getOverriddenStatus());
                // todo 更新本地相应的实例信息(覆盖状态)
                registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus());
            }
            // 注册实例信息到本地注册表
            registry.register(infoFromPeer, true);
        }
    } catch (Throwable e) {
        logger.warn("Exception when trying to set information from peer :", e);
    }
}

1.2.6 registry.storeOverriddenStatusIfRequired()

// AbstractInstanceRegistry.class
public void storeOverriddenStatusIfRequired(String appName, String id, InstanceStatus overriddenStatus) {
    InstanceStatus instanceStatus = overriddenInstanceStatusMap.get(id);
    // 如果本地 overriddenInstanceStatusMap 中相应实例的 overriddenStatus 为 null 或者和集群节点同步复制请求中的 overriddenStatus 不一致
    // 则更新本地的 overriddenInstanceStatusMap 和 instanceInfo 中的 overriddenStatus
    if ((instanceStatus == null) || (!overriddenStatus.equals(instanceStatus))) {
        // We might not have the overridden status if the server got
        // restarted -this will help us maintain the overridden state
        // from the replica
        logger.info("Adding overridden status for instance id {} and the value is {}",
                id, overriddenStatus.name());
        overriddenInstanceStatusMap.put(id, overriddenStatus);
        InstanceInfo instanceInfo = this.getInstanceByAppAndId(appName, id, false);
        instanceInfo.setOverriddenStatus(overriddenStatus);
        logger.info("Set the overridden status for instance (appname:{}, id:{}} and the value is {} ",
                appName, id, overriddenStatus.name());
    }
}

1.3 关键配置 syncWhenTimestampDiffers

假设,当客户端服务实例发起续租请求到服务端 A 时,服务端 A 处理完续租成功后,需要同步复制给集群的服务端 B 节点,这个时候可能存在服务端 A 和服务端 B 两边注册表中的同一实例的最新修改时间戳不一致,会有下列处理情况:

  • 如果服务端 A 的实例最新修改时间戳(脏)大于服务端 B 的的实例最新修改时间戳(脏),则服务端 B 返回404给服务端 A 。服务端 A 根据返回404立即再发起同步注册请求给 服务端 B 。
  • 如果服务端 A 的实例最新修改时间戳(脏)小于服务端 B 的的实例最新修改时间戳(脏),则服务端 B 返回409和服务端 B 中的实例信息给服务端 A 。服务端 A 根据返回409和响应体,同步服务端 B 的实例信息到本地注册表。

上述前提是需要在配置文件中,下列配置需要开启(默认开启):

eureka.server.sync-when-timestamp-differs=true

代码中对应的核心方法为validateDirtyTimestamp():

// InstanceResource.class
private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
                                        boolean isReplication) {
    // 获取本地相应实例信息
    InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
    if (appInfo != null) {
        if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
            // 客户端实例续租请求中的最新修改时间戳(脏) 和 本地注册表中实例的最新修改时间戳(脏) 不相等时
            Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};

            if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
                logger.debug(
                        "Time to sync, since the last dirty timestamp differs -"
                                  " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
                        args);
                // 如果 客户端实例续租请求中的最新修改时间戳(脏) 大于 本地注册表中相应实例的最新修改时间戳(脏)
                // 返回404,让客户端立即发起注册给当前服务端更新相应实例信息
                return Response.status(Status.NOT_FOUND).build();
            } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
                // In the case of replication, send the current instance info in the registry for the
                // replicating node to sync itself with this one.
                // 如果 客户端实例续租请求中的最新修改时间戳(脏) 小于 本地注册表中相应实例的最新修改时间戳(脏)
                // 客户端因角色不同处理可能有下列情况:
                //     1. 如果是集群节点,同步复制给当前服务端,则当前服务端返回409且响应体中包含本地注册表的相应实例信息,让集群节点更新相应信息
                //     2. 如果是服务实例,为自己发起心跳续租请求,则当前服务端返回成功200
                if (isReplication) {
                    logger.debug(
                            "Time to sync, since the last dirty timestamp differs -"
                                      " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
                            args);
                    return Response.status(Status.CONFLICT).entity(appInfo).build();
                } else {
                    return Response.ok().build();
                }
            }
        }

    }
    // 返回成功200
    return Response.ok().build();
}

1.4 总结

  • 首先,服务端接收到客户端发起的心跳续租请求后,会在本地注册表中获取相应实例信息,最近一分钟处理心跳续租请求数 1,更新续租过期时间
  • 然后,同步复制给集群节点,根据集群节点返回的结果,如果不是成功进行相应处理:
    • 集群节点返回404,表明本地注册表中相应实例信息比集群节点的新,立即再发起同步注册请求给集群节点
    • 集群节点返回409,表明本地注册表中相应实例信息比集群节点的旧,根据请求返回响应体中的实例信息,更新本地相关实例信息(覆盖状态)和注册实例信息到本地注册表
  • 最后,处理返回结果,也有不同的方式:
    • 请求中的实例最新修改时间戳(脏) 大于 本地相应实例信息的最新修改时间戳(脏) ,则返回404让客户端立即再发起注册请求

      • 如果是集群节点发起的请求,请求中的 overriddenStatus 不为 null 也不为 UNKNOWN,还需要更新本地相应的实例信息(覆盖状态)
    • 请求中的实例最新修改时间戳(脏) 小于 本地相应实例信息的最新修改时间戳(脏) ,根据发起请求的角色处理方式也不同:

      • 服务实例发起的请求,则返回200
      • 集群节点发起的请求,则返回409且响应体中包含本地相应实例信息,让集群节点更新相应信息
    • 请求中的实例最新修改时间戳(脏) 等于 本地相应实例信息的最新修改时间戳(脏) ,返回成功200

2. 服务下架

处理客户端下架请求:DELETE请求, path为:path:"apps/" appName '/' id;

服务端处理处理客户端下架请求在 InstanceResource 类的 cancelLease() 方法

// isReplication:是否是集群节点间的同步复制
@DELETE
public Response cancelLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    try {
        // todo 处理客户端下架
        boolean isSuccess = registry.cancel(app.getName(), id,
            "true".equals(isReplication));

        if (isSuccess) {
            logger.debug("Found (Cancel): {} - {}", app.getName(), id);
            // 处理成功返回200
            return Response.ok().build();
        } else {
            logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
            // 处理失败返回404
            return Response.status(Status.NOT_FOUND).build();
        }
    } catch (Throwable e) {
        logger.error("Error (cancel): {} - {}", app.getName(), id, e);
        // 处理异常返回500
        return Response.serverError().build();
    }

}

2.1 registry.cancel()

@Override
public boolean cancel(final String appName, final String id,
                      final boolean isReplication) {
    // todo 调用父类AbstractInstanceRegistry 的cancel 方法进行本地注册表变更,服务下线就是删除对应的服务实例信息
    if (super.cancel(appName, id, isReplication)) {
        // todo 将服务主动下线请求同步给Eureka Server 集群的其他节点。
        replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);

        return true;
    }
    return false;
}

2.1.1 replicateToPeers()

具体已在Eureka源码7-Server端(处理服务注册请求)中讲过。

2.2 super.cancel()

// AbstractInstanceRegistry.class
public boolean cancel(String appName, String id, boolean isReplication) {
    return internalCancel(appName, id, isReplication);
}

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    // 开启读锁
    read.lock();
    try {
        CANCEL.increment(isReplication);
        // 获取appName 对应的所有实例集合
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;

        if (gMap != null) {
            // 根据实例id 移除对应的实例租约信息
            leaseToCancel = gMap.remove(id);
        }
        // 将变更信息 扔到最近删除队列中
        recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName   "("   id   ")"));
        // 从overriddenInstanceStatusMap 中删除
        InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
        if (instanceStatus != null) {
            logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
        }
        if (leaseToCancel == null) {
            CANCEL_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            // 如果获取不到实例租约信息,则返回 false
            return false;
        } else {
            // 变更实例剔除时间
            leaseToCancel.cancel();
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
                // 设置行为类型为删除
                instanceInfo.setActionType(ActionType.DELETED);
                // todo 放入最近变更队列中
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                // 设置本地相应实例信息的最新修改时间戳(非脏时间戳)
                instanceInfo.setLastUpdatedTimestamp();
                // 获取实例的虚拟互联网协议地址,如果未指定则默认为主机名
                vip = instanceInfo.getVIPAddress();
                // 获取实例的安全虚拟互联网协议地址,如果未指定则默认为主机名
                svip = instanceInfo.getSecureVipAddress();
            }
            // 删除实例对应的缓存数据
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
        }
    } finally {
        read.unlock();
    }
    // 更新 需要发送心跳的客户端数量
    synchronized (lock) {
        if (this.expectedNumberOfClientsSendingRenews > 0) {
            // Since the client wants to cancel it, reduce the number of clients to send renews.
            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
            updateRenewsPerMinThreshold();
        }
    }
    // 关闭读锁
    return true;
}

2.3 总结

  • 当服务端收到客户端下架请求时,会先从本地获取相应服务实例的租约信息并删除

  • 然后把实例下架时间和服务实例名的映射信息添加到最近下架队列,删除 overriddenInstanceStatusMap 中相应覆盖状态

  • 接着在实例租约信息中记录下架时间,转换成实例变更信息添加到最新变更队列,设置相应实例信息的最新修改时间戳(非脏时间戳)

  • 最后,让相应缓存失效,同步复制给集群节点,本地预期收到心跳续租实例数-1,更新预期每分钟收到心跳续租的请求数

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanehje
系列文章
更多 icon
同类精品
更多 icon
继续加载