Netflix Ribbon 响应时间加权策略 ResponseTimeWeightedRule

响应时间加权策略根据服务实例的响应时间来分配请求权重。响应时间越长,权重越低,被选择到的概率就越低。响应时间越短,权重越高,被选择到的概率就越高。

该策略的核心思想是通过测量每个服务实例的响应时间,为响应时间较短的实例分配更高的权重,从而在后续的请求分配中更倾向于选择响应速度快的实例。

  • 响应时间测量:在运行过程中,ResponseTimeWeightedRule 会持续监测每个服务实例的响应时间。当客户端向服务实例发送请求时,开始计时,当收到响应后,记录响应时间。

  • 权重计算:根据记录的响应时间,计算每个服务实例的权重。通常,响应时间越短,权重越高。例如,可以使用反比例函数来计算权重,响应时间为 t 的实例权重可以设置为 1/t 或者其他类似的计算方式。

  • 请求分配:在选择服务实例时,根据权重进行随机选择。权重高的实例被选中的概率更大,这样可以使得更多的请求被分配到响应速度快的实例上。

响应时间加权策略特点

  • 动态调整:随着服务实例的响应时间变化,权重会动态调整。如果一个实例的响应时间突然变长,它的权重会降低,后续请求分配到该实例的概率也会减少。反之,如果一个实例的响应时间变短,它的权重会增加,被选中的概率也会提高。

  • 自适应:能够自动适应服务实例的性能变化,无需手动干预。在服务实例的性能波动较大的情况下,仍然可以有效地分配请求,提高系统的整体性能和响应速度。

配置方式

在 application.properties 中,使用如下配置:

# 已过期,不推荐使用了
user.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.ResponseTimeWeightedRule

# 推荐使用
user.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.WeightedResponseTimeRule

将在名为 user 的客户端上开启响应时间加权策略。

源码分析

Ribbon 响应时间加权策略通过 com.netflix.loadbalancer.ResponseTimeWeightedRule com.netflix.loadbalancer.WeightedResponseTimeRule 类实现,前者已经废弃,推荐使用后者,下面将对两个类进行分析。

分析源码前,先看看它是如何计算权重的,如下图:

c7a0eaedb91b572180891a441d8f2cff_1729217591165-5a63653c-caf7-4e2c-95a0-1e0af29bb7a8.png

上图中,“总响应时间”指所有服务器平均响应时间的总和。权重计算公式“总响应时间 - 当前服务器的平均响应时间”,由此可以看出,响应时间越短,权重越高。

通过权重选择服务器的步骤如下:

(1)获取最大权重值。

(2)生成一个随机数,该随机数位于 0 ~ 最大权重值之间,不包含最大权重值。

(3)迭代所有服务器,逐一将服务器权重和随机数进行比较,如果当前服务器权重大于等于随机值,则选择该服务器。

接下来仔细看源码。

ResponseTimeWeightedRule.java 源码

529449636ae803cb9f6581c9aae04813_1729137087695-50b99a23-0753-489a-bc4d-9e27b0b49769.png

代码如下:

package com.netflix.loadbalancer;

import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** @deprecated */
public class ResponseTimeWeightedRule extends RoundRobinRule {
    public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY;
    public static final int DEFAULT_TIMER_INTERVAL = 30000;
    private int serverWeightTaskTimerInterval = 30000;
    private static final Logger logger;
    private volatile List<Double> accumulatedWeights = new ArrayList();
    private final Random random = new Random();
    protected Timer serverWeightTimer = null;
    protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);
    String name = "unknown";

    public ResponseTimeWeightedRule() {
    }

    public ResponseTimeWeightedRule(ILoadBalancer lb) {
        super(lb);
    }

    public void setLoadBalancer(ILoadBalancer lb) {
        super.setLoadBalancer(lb);
        if (lb instanceof BaseLoadBalancer) {
            this.name = ((BaseLoadBalancer)lb).getName();
        }

        this.initialize(lb);
    }

    // 初始化
    void initialize(ILoadBalancer lb) {
        // 如果存在定时任务,则先取消
        if (this.serverWeightTimer != null) {
            this.serverWeightTimer.cancel();
        }

        // 创建定时任务
        this.serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + this.name, true);
        this.serverWeightTimer.schedule(new DynamicServerWeightTask(), 0L, (long)this.serverWeightTaskTimerInterval);

        // 在定时任务出发前先计算一次权重
        ServerWeight sw = new ServerWeight();
        sw.maintainWeights();

        // 定义一个钩子,用于取消定时任务
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                ResponseTimeWeightedRule.logger.info("Stopping NFLoadBalancer-serverWeightTimer-{}", ResponseTimeWeightedRule.this.name);
                ResponseTimeWeightedRule.this.serverWeightTimer.cancel();
            }
        }));
    }

    public void shutdown() {
        if (this.serverWeightTimer != null) {
            logger.info("Stopping NFLoadBalancer-serverWeightTimer-{}", this.name);
            this.serverWeightTimer.cancel();
        }

    }

    @SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        } else {
            Server server = null;
            while(server == null) {
                // 获取所有权重,权重和服务器列表的位置一一对应的
                List<Double> currentWeights = this.accumulatedWeights;
                // 如果线程已中断,直接返回
                if (Thread.interrupted()) {
                    return null;
                }

                // 获取所有服务器
                List<Server> allList = lb.getAllServers();
                int serverCount = allList.size();
                if (serverCount == 0) {
                    // 服务器为空,直接返回
                    return null;
                }

                int serverIndex = 0;
                // ??? 没看懂
                // 列表最后一个元素如何使最大权重值,List 是无须列表,有没有对其进行明显的排序操作
                double maxTotalWeight = currentWeights.size() == 0 ? 0.0 : (Double)currentWeights.get(currentWeights.size() - 1);
                if (maxTotalWeight < 0.001) {
                    server = super.choose(this.getLoadBalancer(), key);
                } else {
                    // 获取一个在
                    double randomWeight = this.random.nextDouble() * maxTotalWeight;
                    // 遍历权重列表,确定服务器下标
                    int n = 0; // 选中的服务器下标
                    for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
                        Double d = (Double)var13.next();
                        if (d >= randomWeight) {
                            serverIndex = n;
                            break;
                        }
                    }

                    // 获取服务器信息
                    server = (Server)allList.get(serverIndex);
                }

                if (server == null) {
                    // 如果没有选中,则让出 CPU
                    Thread.yield();
                } else {
                    if (server.isAlive()) {
                        // 选中了服务器,且存活,则直接返回
                        return server;
                    }
                    server = null;
                }
            }

            return server;
        }
    }

    void setWeights(List<Double> weights) {
        this.accumulatedWeights = weights;
    }

    public void initWithNiwsConfig(IClientConfig clientConfig) {
        super.initWithNiwsConfig(clientConfig);
        this.serverWeightTaskTimerInterval = (Integer)clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, 30000);
    }

    static {
        WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = WeightedResponseTimeRule.WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY;
        logger = LoggerFactory.getLogger(ResponseTimeWeightedRule.class);
    }

    // 服务器权重,计算每个服务器的权重
    class ServerWeight {
        ServerWeight() {
        }

        public void maintainWeights() {
            // 获取负载均衡器,维护了所有的服务器信息
            ILoadBalancer lb = ResponseTimeWeightedRule.this.getLoadBalancer();
            if (lb != null) {
                // 设置服务权重计算开始标志,避免多线程同时计算
                if (ResponseTimeWeightedRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) {
                    try {
                        ResponseTimeWeightedRule.logger.info("Weight adjusting job started");
                        AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb;
                        // 获取服务器状态对象集
                        LoadBalancerStats stats = nlb.getLoadBalancerStats();
                        if (stats != null) {
                            // 临时保存所有服务器响应平均响应时间的总时间
                            double totalResponseTime = 0.0;
                            // 计算总时间
                            ServerStats ss;
                            for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) {
                                Server server = (Server)var6.next();
                                ss = stats.getSingleServerStat(server);
                            }

                            // 计算单个服务器权重
                            Double weightSoFar = 0.0;
                            // 计算出来的所有服务器权重
                            List<Double> finalWeights = new ArrayList();
                            // 迭代所有服务器
                            Iterator var20 = nlb.getAllServers().iterator();
                            while(var20.hasNext()) {
                                Server serverx = (Server)var20.next();
                                ServerStats ssx = stats.getSingleServerStat(serverx);
                                // 总时间减去当前服务器平均响应时间,的到权重值
                                // 如果服务器平均响应时间越小,得到的 weight 值就越大
                                // 见 choose 方法中的使用
                                double weight = totalResponseTime - ssx.getResponseTimeAvg();
                                weightSoFar = weightSoFar + weight;
                                finalWeights.add(weightSoFar);
                            }

                            ResponseTimeWeightedRule.this.setWeights(finalWeights);
                            return;
                        }
                    } catch (Exception var16) {
                        ResponseTimeWeightedRule.logger.error("Error calculating server weights", var16);
                        return;
                    } finally {
                        // 服务器权重计算完后,将标志恢复
                        ResponseTimeWeightedRule.this.serverWeightAssignmentInProgress.set(false);
                    }

                }
            }
        }
    }

    // 权重更新定时任务,计算每个服务器的权重
    class DynamicServerWeightTask extends TimerTask {
        DynamicServerWeightTask() {
        }

        public void run() {
            ServerWeight serverWeight = ResponseTimeWeightedRule.this.new ServerWeight();
            try {
                serverWeight.maintainWeights();
            } catch (Exception var3) {
                ResponseTimeWeightedRule.logger.error("Error running DynamicServerWeightTask for {}", ResponseTimeWeightedRule.this.name, var3);
            }
        }
    }
}

WeightedResponseTimeRule.java 源码

8b3199bd1dcb428b5718d2882c4aeb4b_1729215453060-acf079c1-cf95-4a8b-8715-56834b4b8930.png

代码如下:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.netflix.loadbalancer;

import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WeightedResponseTimeRule extends RoundRobinRule {
    // 成员变量也没有变化
    // 和 ResponseTimeWeightedRule 一致
    public WeightedResponseTimeRule() {}
    public WeightedResponseTimeRule(ILoadBalancer lb) {}
    public void setLoadBalancer(ILoadBalancer lb) {}

    // 和 ResponseTimeWeightedRule 一致
    void initialize(ILoadBalancer lb) {}

    // 和 ResponseTimeWeightedRule 一致
    public void shutdown() {}

    // 新增,返回权重列表的一个不可修改副本列表
    List<Double> getAccumulatedWeights() {
        return Collections.unmodifiableList(this.accumulatedWeights);
    }

    @SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        } else {
            Server server = null;

            while(server == null) {
                List<Double> currentWeights = this.accumulatedWeights;
                if (Thread.interrupted()) {
                    return null;
                }

                List<Server> allList = lb.getAllServers();
                int serverCount = allList.size();
                if (serverCount == 0) {
                    return null;
                }

                int serverIndex = 0;
                double maxTotalWeight = currentWeights.size() == 0 ? 0.0 : (Double)currentWeights.get(currentWeights.size() - 1);
                // 这里条件变了
                if (!(maxTotalWeight < 0.001) && serverCount == currentWeights.size()) {
                    // 计算权重的方式一致
                    double randomWeight = this.random.nextDouble() * maxTotalWeight;
                    int n = 0;

                    for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
                        Double d = (Double)var13.next();
                        if (d >= randomWeight) {
                            serverIndex = n;
                            break;
                        }
                    }

                    server = (Server)allList.get(serverIndex);
                } else {
                    server = super.choose(this.getLoadBalancer(), key);
                    if (server == null) {
                        return server;
                    }
                }

                if (server == null) {
                    Thread.yield();
                } else {
                    if (server.isAlive()) {
                        return server;
                    }

                    server = null;
                }
            }

            return server;
        }
    }

    // 和 ResponseTimeWeightedRule 一致
    void setWeights(List<Double> weights) {}
    public void initWithNiwsConfig(IClientConfig clientConfig) {}
    class ServerWeight {}
    class DynamicServerWeightTask extends TimerTask {}
}

通过上面分析,对比 ResponseTimeWeightedRule 和 WeightedResponseTimeRule 的代码,几乎一样,没有太多的变化。

响应时间加权策略优缺点

优点

  • 提高性能:通过优先选择响应速度快的服务实例,可以减少请求的平均响应时间,提高系统的性能。

  • 负载均衡优化:相比其他简单的负载均衡策略,如轮询或随机选择,ResponseTimeWeightedRule能够更好地根据服务实例的实际性能进行请求分配,实现更优化的负载均衡。

缺点

  • 初始阶段不稳定:在系统刚开始运行或者新的服务实例加入时,由于没有足够的响应时间数据,权重计算可能不准确,导致请求分配不够优化。

  • 对异常值敏感:如果某个服务实例出现偶尔的异常高响应时间,可能会对其权重产生较大影响,导致在一段时间内被分配的请求较少。

适用场景

  • 对响应时间要求较高的系统:如实时交易系统、在线游戏等,需要快速响应的场景下,响应时间加权策略可以确保请求被分配到性能较好的服务实例上,提高用户体验。

  • 服务实例性能差异较大的环境:当不同的服务实例在硬件配置、网络环境等方面存在较大差异时,该策略可以根据实际性能进行请求分配,充分发挥性能较好的实例的作用。

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号