• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Java的Eureka系列:Server端【定时的清除过期了的Client】

武飞扬头像
juejin
帮助81

前言

Java的Eureka系列:Server端【定时的清除过期了的Client】

0. 环境

  • eureka版本:1.10.11
  • Spring Cloud : 2020.0.2
  • Spring Boot :2.4.4

1. 方法入口

@Configuration
// Step1:引入 EurekaServerInitializerConfiguration 配置类
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    // ......
}


@Configuration
public class EurekaServerInitializerConfiguration 
        implements ServletContextAware, SmartLifecycle, Ordered {
    // ......
    // EurekaServerInitializerConfiguration 配置类实现了 SmartLifecycle 接口
    // IOC 容器初始化即将结束时,回调生命周期 start() 方法
    @Override
    public void start() {
        new Thread(() -> {
	    try {
	        // TODO: is this class even needed now?
                // Step2:Eureka Server 上下文初始化
		eurekaServerBootstrap.contextInitialized(
                           EurekaServerInitializerConfiguration.this.servletContext);
		// ....
	    }
	    catch (Exception ex) {
	        // Help!
	        log.error("Could not initialize Eureka servlet context", ex);
	    }
        }).start();
    }
    // ......
}

public class EurekaServerBootstrap {
    // ......
    public void contextInitialized(ServletContext context) {
        try {
	    initEurekaEnvironment();
            // todo 初始化 Eureka Server 上下文
	    initEurekaServerContext();

            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
	}
        catch (Throwable e) {
	    // ......
	}
    }
    
    protected void initEurekaServerContext() throws Exception {
        // ......
        // todo 打开交通
        this.registry.openForTraffic(this.applicationInfoManager, registryCount);
        // ......
    }

    // ......
}


// InstanceRegistry.class
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // todo 调用父类打开交通方法
    // count:服务端启动时同步集群节点注册表的实例数,不能为0,如果为0默认赋值1
    super.openForTraffic(applicationInfoManager,
	    count == 0 ? this.defaultOpenForTrafficCount : count);
}

2. super.openForTraffic()

/**
 *    打开交通,Server端定时清理过期的Client
 * @param applicationInfoManager
 * @param count 服务端启动时同步集群节点注册表的实例数,不能为0,如果为0默认赋值1
 */
 // PeerAwareInstanceRegistryImpl.class
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    // 预期收到心跳续租的实例数赋值
    this.expectedNumberOfClientsSendingRenews = count;
    // todo 更新预期每分钟收到心跳续租请求数
    updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    // 记录服务端启动时间
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        // 设置 peerInstancesTransferEmptyOnStartup = false
        // 表示服务端启动时同步集群节点注册表的实例数不为空,判断是否允许客户端拉取注册表时提到过
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    // 设置服务端实例状态为 UP
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    // todo 调用父类方法
    super.postInit();
}

2.1 updateRenewsPerMinThreshold()

// AbstractInstanceRegistry.class
protected void updateRenewsPerMinThreshold() {
    // 客户端数量 * (60 / 心跳间隔)* 自我保护开启的阈值因子)
    // = (客户端数量 * 每个客户端每分钟发送心跳的数量 * 阈值因子)
    // = (所有客户端每分钟发送的心跳数量 * 阈值因子)
    // = 当前Server开启自我保护机制的每分钟最小心跳数量
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

2.2 super.postInit()

// AbstractInstanceRegistry.class
protected void postInit() {
    // todo 统计最近一分钟处理的心跳续租数的定时任务
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        // 取消定期清理过期实例任务
        evictionTaskRef.get().cancel();
    }
    // 启动新的定期清理过期实例任务
    // 固定时间重复执行,默认一分钟后开始,每分钟执行一次
    evictionTaskRef.set(new EvictionTask());
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}

3. 定期清理过期实例任务

class EvictionTask extends TimerTask {

    // 最近一次执行清理任务时间
    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

    @Override
    public void run() {
        try {
            //  todo 获取补偿时间
            // 因时间偏斜或GC,导致任务实际执行时间超过指定的间隔时间(默认一分钟)
            long compensationTimeMs = getCompensationTimeMs();
            logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            //  todo 处理过期实例
            evict(compensationTimeMs);
        } catch (Throwable e) {
            logger.error("Could not run the evict task", e);
        }
    }
    ...
}

3.1 计算补偿时间

// AbstractInstanceRegistry.EvictionTask.class
long getCompensationTimeMs() {
    // 获取当前时间
    long currNanos = getCurrentTimeNano();
    // 获取最近一次执行任务时间
    // 并赋值 lastExecutionNanosRef 为当前时间,给下一次执行任务时使用
    long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
    if (lastNanos == 0l) {
        return 0l;
    }

    // 计算最近一次任务的实际执行时间 elapsedMs = 当前任务执行时间 - 最近一次任务执行时间
    long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
    // 计算最近一个任务执行的超时时间 compensationTime = 最近一次任务的实际执行时间 - 设定的任务执行间隔时间
    long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
    // 如果超时时间大于0,则作为补偿时间返回
    // 如果超时时间小于等于0,则表示没有超时,返回0
    return compensationTime <= 0l ? 0l : compensationTime;
}

long getCurrentTimeNano() {  // for testing
    // 返回当前时间(纳秒)
    return System.nanoTime();
}

3.2 处理过期实例

// AbstractInstanceRegistry.class
public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");
    // todo 自我保护
    if (!isLeaseExpirationEnabled()) {
        // 如果服务端允许自我保护且 最近一分钟处理心跳续租请求数 小于等于 预期每分钟收到心跳续租请求数
        // 则开启自我保护机制,不再清理过期实例
        // 配置文件可以配置关闭自我保护机制
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }

    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.
    // 新建一个过期实例租约信息列表
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                // 遍历服务端注册表,判断每个实例是否过期
                // 如果过期,则将相应实例租约信息添加到 expiredLeases
                // todo isExpired() 判断实例过期方法
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }

    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    // 获取服务端注册表的实例数
    int registrySize = (int) getLocalRegistrySize();
    // 计算注册实例数阈值,默认 registrySizeThreshold = 注册实例数 * 0.85
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    // 计算清理实例限制数,evictionLimit = 注册实例数 - 注册实例数阈值
    int evictionLimit = registrySize - registrySizeThreshold;

    // 清理实例限制数 和 过期实例数 取最小值作为实际需要清理的实例数
    // Eureka 这样设计是为了保证可用性和分区容错性,避免一次性清理大量过期实例
    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i  ) {
            // Pick a random item (Knuth shuffle algorithm)
            // 从过期实例中随机选择下架
            // Knuth 洗牌算法
            int next = i   random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            // todo 下架实例,并且标识非同步复制集群节点
            internalCancel(appName, id, false);
        }
    }
}

3.2.1 判断过期方法

// Lease.class
public boolean isExpired(long additionalLeaseMs) {
    // evictionTimestamp:实例下架时间,当客户端下架时记录
    // lastUpdateTimestamp:续租到期时间,当客户端注册或心跳续租时记录
    // duration:续租时间,如果客户端注册时未指定,默认90s
    // additionalLeaseMs:补偿时间
    // 如果 实例下架时间大于0 或 (当前时间 大于 续租到期时间   续租时间   补偿时间),则表示已过期
    return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp   duration   additionalLeaseMs));
}

4. 下架实例

internalCancel()这个方法在Java的Eureka系列:Server端【处理服务续约和服务下架请求】第2小节有详细说明。

5. 自我保护renewsLastMin

renewsLastMin 是服务端统计最后一分钟处理的心跳续租数的定时任务:

  • 服务端启动时开启任务,关闭时停止任务
  • 服务端处理客户端实例心跳续租时 1
  • 服务端处理客户端实例下架时-1
  • 服务端的定期清理过期实例任务中被用来判断是否开启自我保护

AbstractInstanceRegistry 类中初始化:

// AbstractInstanceRegistry.class
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
    // ......
    this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
    // ......
}

// 除了这里使用这个计数任务工具类
// 还有 PeerAwareInstanceRegistryImpl.numberOfReplicationsLastMin 字段也使用该工具类
// numberOfReplicationsLastMin:服务端统计最后一分钟同步复制给集群节点的操作数
public class MeasuredRate {
  
    // 最近一分钟(上一分钟)的计数
    // getCount() 返回该计数
    private final AtomicLong lastBucket = new AtomicLong(0);
  
    // 当前一分钟正在统计的计数
    private final AtomicLong currentBucket = new AtomicLong(0);
  
    // ......

    public synchronized void start() {
        if (!isActive) {
            // 开启任务
            // 固定时间重复执行,一分钟后开始,没分钟执行一次
            timer.schedule(new TimerTask() {

                @Override
                public void run() {
                    try {
                        // Zero out the current bucket.
                        // 每当到了任务刚刚开始的时候
                        // 记录上一分钟的计数
                        // 清零当前一分钟正在统计的计数
                        lastBucket.set(currentBucket.getAndSet(0));
                    } catch (Throwable e) {
                        logger.error("Cannot reset the Measured Rate", e);
                    }
                }
            }, sampleInterval, sampleInterval);

            isActive = true;
        }
    }
    ... 
}

6. 总结

  • 定期清理过期实例任务:服务端启动时开启,固定时间重复执行,默认一分钟后开始,每分钟执行一次

  • 清理过期实例流程:

    • 首先,服务端判断是否开启自动保护,如果开启则不清理过期实例,如果不开启则继续处理
    • 然后,从本地注册表中取出所有过期实例
    • 接着,计算出清理实例限制数,从清理实例限制数和过期实例取出最小值(避免一次性清理大量过期实例,保证可用性和分区容错性)
    • 最后,上面取出的最小值最为实例下架数,随机下架过期实例

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanehjh
系列文章
更多 icon
同类精品
更多 icon
继续加载