• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Dubbo的负载均衡算法:平滑加权轮询算法源码

武飞扬头像
半夏(•̤̀ᵕ•̤́๑)ᵒᵏᵎᵎᵎᵎ
帮助1

平滑加权轮询

轮询算法

轮询算法很简单,就是每台服务器轮流提供服务,代码如下:

private static final List<String> SERVERS;

private static final AtomicInteger OFFSET = new AtomicInteger(0);

static {
    SERVERS = Lists.newArrayList("A", "B", "C");
}

private static String doSelect() {
    if (OFFSET.get() > SERVERS.size() - 1) {
        OFFSET.set(0);
    }
    return SERVERS.get(OFFSET.getAndIncrement());
}

加权轮询

简单轮询算法和简单随机算法一样,面临的一个问题就是,有的机器性能好,有的机器性能差,如何能保证能者多劳呢?就需要给每个服务器加一个权重,让服务的调度机会能按照其权重的比例来,最简单的实现是复制法。假设有三台服务器servers = ["A", "B", "C"],其中每台服务器的权重为:weights = [6, 3, 1],则我们可以按照权重复制一个数组["A","A","A","A","A","A","B","B","B","C"],让其按照上述的代码被调度即可,但是这种算法有一个缺点是对内存的消耗较大。

我们看一种更好的实现方法,我们这里还是建一个一维的坐标轴,标记0-9十个节点,某一次的请求,如果落在[0,5)的区间之内,则选择A,如果落在[6-8)的区间内则选择B,如果落在[9,10)的区间内,则选择C。那么如何让某一次的请求能成为对应区间的一个索引数字呢,之前随机算法我们用的是随机数生成的,这里轮询算法则不能再用随机数了,我们需要为每次请求设置一个编号,这个编号应该是递增的,是全局的,也应该是线程安全的,所以这里我们用AtomicInteger来记录。随着请求的次数越来越多,这个编号必然会超过10,最终到达100,1000,如何映射到0-9的区间呢,可以通过取余的方式来对这个值进行缩小,保证他在0-9之间。取余是一个常用的技巧,在hashmap等hash表的设计中经常用到。

我们以上面的三台服务器权重为例,模拟建一个坐标轴:0-----6-----9-----10,模拟一下我们的算法

  1. 第一次调用,1 % 10 = 1,1在(0-6]的区间,则选择服务器A
  2. 第一次调用,2 % 10 = 2,2在(0-6]的区间,则选择服务器A
  3. 第六次调用,6 % 10 = 6,6在(0-6]的区间,则选择服务器A
  4. 第七次调用,7 % 10 = 7,7不在(0-6]的区间,在(6-9]的区间,则选择服务器B
  5. 第十次调用,10 % 10 = 0,0作为一个特殊的位置,选择服务器C
private static final AtomicInteger NUM = new AtomicInteger(1);

private static final List<String> SERVERS;

private static final Map<String, Integer> SERVER_WEIGHT_MAP;

static {
    SERVERS = Lists.newArrayList(A", "B", "C");
    SERVER_WEIGHT_MAP = new LinkedHashMap<>();
    SERVER_WEIGHT_MAP.put("A", 6);
    SERVER_WEIGHT_MAP.put("B", 3);
    SERVER_WEIGHT_MAP.put("C", 1);
}

private static String doSelectByWeight() {
    int length = SERVER_WEIGHT_MAP.keySet().size();

    boolean sameWeight = true;
    int totalWeight = 0;

    for (int i = 0; i < length; i  ) {
        int weight = (int) SERVER_WEIGHT_MAP.values().toArray()[i];
        totalWeight  = weight;
        if (sameWeight && totalWeight != weight * (i   1)) {
            sameWeight = false;
        }
    }

    if (!sameWeight) {
        int offset = NUM.getAndIncrement() % totalWeight;
        offset = offset == 0 ? totalWeight : offset;
        Set<Map.Entry<String, Integer>> entries = SERVER_WEIGHT_MAP.entrySet();

        for (Map.Entry<String, Integer> entry : entries) {
            Integer weight = entry.getValue();
            // 第七次调用,7 % 10  = 7,7不在(0-6]的区间,在(6-9]的区间,则选择服务器B,这种直观理解好理解,当时用代码实现有些复杂,可以用另一种思路来实现
            // ex: 计算的offset = 6,然后遍历服务器列表,首先得到服务器A,6是否小于A的权重,是则选择A,结束程序
            // 否的话,应该是比6大了,那么到底是选择B,还是选择C呢,假设offset = 7,那么我们让他减去A的权重,看得到的结果是否小于B的权重,是则选中
            // 否的话,在减去B的权重,看得到的结果是否小于C的权重,以此类推
            if (offset <= weight) {
                return entry.getKey();
            }
            offset -= weight;
        }
    }
    return SERVERS.get(NUM.getAndIncrement() % length);
}
学新通

平滑加权轮询

上述算法虽然实现了加权轮询的效果,但是依然有一个缺点就是,如果某一个服务器权重很大,那么他就需要连续的处理请求,比如上面例子中,如果连续调用10次,则依次被选中的服务器是:AAAAAABBBC。这就导致前期服务器A的压力较大,而B和C又处于闲置状态,无法分担压力,我们理想的可能是保证好10次调用,A需要被调用6次,B需要被调用3次,C需要被调用1次就行,顺序其实没必要,而且调用的顺序乱一点,可能才是我们期望的结果,比如:ABAABACABA这样。这就需要用到另一种算法,平滑加权轮询算法

平滑加权轮询算法的思路如下:

  1. 每一个server对应两个权重,weight和currentWeight,weight是固定的,currentWeight初始值为0,后续是动态调整的
  2. 新的请求访问到,调整每个server的currentWeight = currentWeight weight,
  3. currentWeight最大的server被选中
  4. 调整currentWeight = currentWeight - 总权重(只调整最大权重)
编号 currentWeight = currentWeight weight max(currentWeight) max(currentWeight) - 总权重
  [0, 0, 0]    
1 [6, 3, 1] 6 -> A [-4, 3, 1]
2 [2, 6, 2] 6 -> B [2, -4, 2]
3 [8, -1, 3] 8 -> A [-2, -1, 3]
4 [4, 2, 4] 4 -> A [-6, 2, 4]
5 [0, 5, 5] 5 -> B [0, -5, 5]
6 [6, -2, 6] 6 -> A [-4, 2, 6]
7 [2, 1, 7] 7 -> C [2, 1, -3]
8 [8, 4, -2] 8 -> A [-2, 4, 2]
9 [4, 7, -1] 7 -> B [4, -3, -1]
10 [10, 0, 0] 10-> A [0, 0, 0]

代码实现如下:

private static final Map<String, WeightedRoundRobin> WEIGHT_MAP;

    static {
        WEIGHT_MAP = new LinkedHashMap<>();
        WEIGHT_MAP.put("192.168.0.1", new WeightedRoundRobin("192.168.0.1", 6, 0));
        WEIGHT_MAP.put("192.168.0.2", new WeightedRoundRobin("192.168.0.2", 3, 0));
        WEIGHT_MAP.put("192.168.0.3", new WeightedRoundRobin("192.168.0.3", 1, 0));
    }

    @Data
    static class WeightedRoundRobin {
        private String ip;
        private int weight;
        private int current;

        public WeightedRoundRobin (String ip, int weight, int current) {
            this.ip = ip;
            this.weight = weight;
            this.current = current;
        }
    }


    private static String doSelectByWeightV2() {
        Integer totalWeight = WEIGHT_MAP.values().stream().map(WeightedRoundRobin::getWeight).reduce(0, Integer::sum);

        // 1. current_weight  = weight
        WEIGHT_MAP.values().forEach(weight -> weight.setCurrent(weight.getCurrent()   weight.getWeight()));

        // 2. select max
        WeightedRoundRobin maxCurrentWeight = WEIGHT_MAP.values().stream().
                max(Comparator.comparing(WeightedRoundRobin::getCurrent)).get();

        // 3. max(currentWeight) -= sum(weight)
        maxCurrentWeight.setCurrent(maxCurrentWeight.getCurrent() - totalWeight);

        // 返回maxCurrentWeight所对应的ip
        return maxCurrentWeight.getIp();
    }
学新通

dubbo中的代码实现如下:

public class RoundRobinLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "roundrobin";

    private static final int RECYCLE_PERIOD = 60000;

    protected static class WeightedRoundRobin {
        private int weight;
        private AtomicLong current = new AtomicLong(0);
        private long lastUpdate;

        public int getWeight() {
            return weight;
        }

        public void setWeight(int weight) {
            this.weight = weight;
            current.set(0);
        }

        public long increaseCurrent() {
            return current.addAndGet(weight);
        }

        public void sel(int total) {
            current.addAndGet(-1 * total);
        }

        public long getLastUpdate() {
            return lastUpdate;
        }

        public void setLastUpdate(long lastUpdate) {
            this.lastUpdate = lastUpdate;
        }
    }

    private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();

    /**
     * get invoker addr list cached for specified invocation
     * <p>
     * <b>for unit test only</b>
     *
     * @param invokers
     * @param invocation
     * @return
     */
    protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey()   "."   invocation.getMethodName();
        Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map != null) {
            return map.keySet();
        }
        return null;
    }

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey()   "."   invocation.getMethodName();
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        long now = System.currentTimeMillis();
        Invoker<T> selectedInvoker = null;
        WeightedRoundRobin selectedWRR = null;
        for (Invoker<T> invoker : invokers) {
            String identifyString = invoker.getUrl().toIdentityString();
            int weight = getWeight(invoker, invocation);
            // 1. 遍历所有invoker,初始化权重
            WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
                WeightedRoundRobin wrr = new WeightedRoundRobin();
                wrr.setWeight(weight);
                return wrr;
            });

            if (weight != weightedRoundRobin.getWeight()) {
                //weight changed
                weightedRoundRobin.setWeight(weight);
            }
            // 2. 设置current_weight  = weight
            long cur = weightedRoundRobin.increaseCurrent();
            weightedRoundRobin.setLastUpdate(now);
            // 3. 完成一次遍历之后,找到max(currentWeight)
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedInvoker = invoker;
                selectedWRR = weightedRoundRobin;
            }
            totalWeight  = weight;
        }
        if (invokers.size() != map.size()) {
            map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
        }
        if (selectedInvoker != null) {
            // 4. max(currentWeight) -= sum(weight)
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }
        // should not happen here
        return invokers.get(0);
    }

}
学新通

※注:平滑加权轮询算法不是dubbo首次创建并提出的,应该是nginx最初提出的,nginx实现参考:ngx_http_upstream_init_round_robin.c,关于平滑加权轮询的数学证明参考:nginx平滑的基于权重轮询算法分析 | tenfy’ blog

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggbfeg
系列文章
更多 icon
同类精品
更多 icon
继续加载