• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Sentinel 滑动窗口实现原理源码

武飞扬头像
Kim_smile
帮助1

我们知道,Sentinel 可以用来帮助我们实现流量控制、服务降级、服务熔断,而这些功能的实现都离不开接口被调用的实时指标数据,本文便是关于 Sentinel 是如何实现指标数据统计的。

学新通

上图中的右上角就是滑动窗口的示意图,是 StatisticSlot 的具体实现。StatisticSlot 是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。

Sentinel 是基于滑动窗口实现的实时指标数据收集统计,底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。

学新通

核心数据结构

  • ArrayMetric:滑动窗口核心实现类。
  • LeapArray:滑动窗口顶层数据结构,包含一个一个的窗口数据。
  • WindowWrap:每一个滑动窗口的包装类,其内部的数据结构用 MetricBucket 表示。
  • MetricBucket:指标桶,例如通过数量、阻塞数量、异常数量、成功数量、响应时间,已通过未来配额(抢占下一个滑动窗口的数量)。
  • MetricEvent:指标类型,例如通过数量、阻塞数量、异常数量、成功数量、响应时间等。

ArrayMetric

滑动窗口的入口类为 ArrayMetric,实现了 Metric 指标收集核心接口,该接口主要定义一个滑动窗口中成功的数量、异常数量、阻塞数量,TPS、响应时间等数据。

public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
  • int intervalInMs:表示一个采集的时间间隔,即滑动窗口的总时间,例如 1 分钟。
  • int sampleCount:在一个采集间隔中抽样的个数,默认为 2,即一个采集间隔中会包含两个相等的区间,一个区间就是一个窗口。
  • boolean enableOccupy:是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量。

LeapArray

LeapArray 用来承载滑动窗口,即成员变量 array,类型为 AtomicReferenceArray<WindowWrap<T>>,保证创建窗口的原子性(CAS)。

public abstract class LeapArray<T> {

    //每一个窗口的时间间隔,单位为毫秒
    protected int windowLengthInMs;
    //抽样个数,就一个统计时间间隔中包含的滑动窗口个数
    protected int sampleCount;
    //一个统计的时间间隔
    protected int intervalInMs;
    //滑动窗口的数组,滑动窗口类型为 WindowWrap<MetricBucket>
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    private final ReentrantLock updateLock = new ReentrantLock();
    
    public LeapArray(int sampleCount, int intervalInMs) {
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray<>(sampleCount);
    }
学新通

MetricBucket

Sentinel 使用 MetricBucket 统计一个窗口时间内的各项指标数据,这些指标数据包括请求总数、成功总数、异常总数、总耗时、最小耗时、最大耗时等,而一个 Bucket 可以是记录一秒内的数据,也可以是 10 毫秒内的数据,这个时间长度称为窗口时间。

public class MetricBucket {
    /**
     * 存储各事件的计数,比如异常总数、请求总数等
     */
    private final LongAdder[] counters;
    /**
     * 这段事件内的最小耗时
     */
    private volatile long minRt;
}

Bucket 记录一段时间内的各项指标数据用的是一个 LongAdder 数组,数组的每个元素分别记录一个时间窗口内的请求总数、异常数、总耗时。也就是说:MetricBucket 包含一个 LongAdder 数组,数组的每个元素代表一类 MetricEvent。
LongAdder 保证了数据修改的原子性,并且性能比 AtomicLong 表现更好。

public enum MetricEvent {
    PASS,
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,
    OCCUPIED_PASS
}

当需要获取 Bucket 记录总的成功请求数或者异常总数、总的请求处理耗时,可根据事件类型 (MetricEvent) 从 Bucket 的 LongAdder 数组中获取对应的 LongAdder,并调用 sum 方法获取总数。

public long get(MetricEvent event) {
    return counters[event.ordinal()].sum();
}

当需要 Bucket 记录一个成功请求或者一个异常请求、处理请求的耗时,可根据事件类型(MetricEvent)从 LongAdder 数组中获取对应的 LongAdder,并调用其 add 方法。

public void add(MetricEvent event, long n) {
     counters[event.ordinal()].add(n);
}

WindowWrap

因为 Bucket 自身并不保存时间窗口信息,所以 Sentinel 给 Bucket 加了一个包装类 WindowWrap。
Bucket 用于统计各项指标数据,WindowWrap 用于记录 Bucket 的时间窗口信息(窗口的开始时间、窗口的大小),WindowWrap 数组就是一个滑动窗口。

public class WindowWrap<T> {
    /**
     * 单个窗口的时间长度(毫秒)
     */
    private final long windowLengthInMs;
    /**
     * 窗口的开始时间戳(毫秒)
     */
    private long windowStart;
    /**
     * 统计数据
     */
    private T value;
}

总的来说:

  • WindowWrap 用于包装 Bucket,随着 Bucket 一起创建。
  • WindowWrap 数组实现滑动窗口,Bucket 只负责统计各项指标数据,WindowWrap 用于记录 Bucket 的时间窗口信息。
  • 定位 Bucket 实际上是定位 WindowWrap,拿到 WindowWrap 就能拿到 Bucket。

滑动窗口实现

如果我们希望能够知道某个接口的每秒处理成功请求数(成功 QPS)、每秒处理失败请求数(失败 QPS),以及处理每个成功请求的平均耗时(avg RT),注意这里我们只需要控制 Bucket 统计一秒钟的指标数据即可,但如何才能确保 Bucket 存储的就是精确到 1 秒内的数据呢?

Sentinel 是这样实现的:定义一个 Bucket 数组,根据时间戳来定位到数组的下标。
由于只需要保存最近一分钟的数据。那么 Bucket 数组的大小就可以设置为 60,每个 Bucket 的 windowLengthInMs(窗口时间)大小就是 1 秒。内存资源是有限的,而这个数组可以循环使用,并且永远只保存最近 1 分钟的数据,这样可以避免频繁的创建 Bucket,减少内存资源的占用。

那如何定位 Bucket 呢?我们只需要将当前时间戳减去毫秒部分,得到当前的秒数,再将得到的秒数与数组长度取余数,就能得到当前时间窗口的 Bucket 在数组中的位置(索引)。

calculateTimeIdx 方法中,取余数就是实现循环利用数组。如果想要获取连续的一分钟的 Bucket 数据,就不能简单的从头开始遍历数组,而是指定一个开始时间和结束时间,从开始时间戳开始计算 Bucket 存放在数组中的下标,然后循环每次将开始时间戳加上 1 秒,直到开始时间等于结束时间。

private int calculateTimeIdx(long timeMillis) {
    long timeId = timeMillis / windowLengthInMs;
    return (int)(timeId % array.length());
}

由于循环使用的问题,当前时间戳与一分钟之前的时间戳和一分钟之后的时间戳都会映射到数组中的同一个 Bucket,因此,必须要能够判断取得的 Bucket 是否是统计当前时间窗口内的指标数据,这便要数组每个元素都存储 Bucket 时间窗口的开始时间戳。

比如当前时间戳是 1577017626812,Bucket 统计一秒的数据,将时间戳的毫秒部分全部替换为 0,就能得到 Bucket 时间窗口的开始时间戳为 1577017626000。

//计算时间窗口开始时间戳
protected long calculateWindowStart(long timeMillis) {
    return timeMillis - timeMillis % windowLengthInMs;
}

//判断时间戳是否在当前时间窗口内
public boolean isTimeInWindow(long timeMillis) {
    return windowStart <= timeMillis && timeMillis < windowStart   windowLengthInMs;
}

通过时间戳定位 Bucket

当接收到一个请求时,可根据接收到请求的时间戳计算出一个数组索引,从滑动窗口(WindowWrap 数组)中获取一个 WindowWrap,从而获取 WindowWrap 包装的 Bucket,调用 Bucket 的 add 方法记录相应的事件。

/**
 * 根据时间戳获取 bucket
 * @param timeMillis 时间戳(毫秒)
 * @return 如果时间有效,则在提供的时间戳处显示当前存储桶项;如果时间无效,则为空
 */
public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }
    // 获取时间戳映射到的数组索引
    int idx = calculateTimeIdx(timeMillis);
    // 计算 bucket 时间窗口的开始时间
    long windowStart = calculateWindowStart(timeMillis);

    // 从数组中死循环查找当前的时间窗口,因为可能多个线程都在获取当前时间窗口
    while (true) {
        WindowWrap<T> old = array.get(idx);
        // 一般是项目启动时,时间未到达一个周期,数组还没有存储满,没有到复用阶段,所以数组元素可能为空
        if (old == null) {
            // 创建新的 bucket,并创建一个 bucket 包装器
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            // cas 写入,确保线程安全,期望数组下标的元素是空的,否则就不写入,而是复用
            if (array.compareAndSet(idx, null, window)) {
                return window;
            } else {
                Thread.yield();
            }
        }
        // 如果 WindowWrap 的 windowStart 正好是当前时间戳计算出的时间窗口的开始时间,则就是我们想要的 bucket
        else if (windowStart == old.windowStart()) {
            return old;
        }
        // 复用旧的 bucket
        else if (windowStart > old.windowStart()) {
            if (updateLock.tryLock()) {
                try {
                    // 重置 bucket,并指定 bucket 的新时间窗口的开始时间
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                Thread.yield();
            }
        }
        // 计算出来的当前 bucket 时间窗口的开始时间比数组当前存储的 bucket 的时间窗口开始时间还小,
        // 直接返回一个空的 bucket 就行
        else if (windowStart < old.windowStart()) {
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}
学新通

上面代码的实现是:通过当前时间戳计算出当前时间窗口的 (new) Bucket 在数组中的索引,通过索引从数组中取得 (old) Bucket;并计算 (new) Bucket 时间窗口的开始时间,与 (old) Bucket 时间窗口的开始时间作对比。

  1. 如果旧的 bucket 不存在,那么我们在 windowStart 处创建一个新的 bucket,然后尝试通过 CAS 操作更新环形数组。只有一个线程可以成功更新,保证原子性。
  2. 如果当前 windowStart 等于旧桶的开始时间戳,表示时间在桶内,所以直接返回桶。
  3. 如果旧桶的开始时间戳落后于所提供的时间,这意味着桶已弃用,我们可以复用该桶,并将桶重置为当前的 windowStart。注意重置和清理操作很难是原子的,所以我们需要一个更新锁来保证桶更新的正确性。只有当 bucket 已弃用才会上锁,所以在大多数情况下它不会导致性能损失。
  4. 不应该通过这里,因为提供的时间已经落后了,一般是时钟回拨导致的。

参考资料
https://www.cnblogs.com/dingwpmz/p/12548792.html
https://sentinelguard.io/zh-cn/docs/basic-implementation.html

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgjkifi
系列文章
更多 icon
同类精品
更多 icon
继续加载