响应时间加权策略根据服务实例的响应时间来分配请求权重。响应时间越长,权重越低,被选择到的概率就越低。响应时间越短,权重越高,被选择到的概率就越高。
该策略的核心思想是通过测量每个服务实例的响应时间,为响应时间较短的实例分配更高的权重,从而在后续的请求分配中更倾向于选择响应速度快的实例。
响应时间测量:在运行过程中,ResponseTimeWeightedRule 会持续监测每个服务实例的响应时间。当客户端向服务实例发送请求时,开始计时,当收到响应后,记录响应时间。
权重计算:根据记录的响应时间,计算每个服务实例的权重。通常,响应时间越短,权重越高。例如,可以使用反比例函数来计算权重,响应时间为 t 的实例权重可以设置为 1/t 或者其他类似的计算方式。
请求分配:在选择服务实例时,根据权重进行随机选择。权重高的实例被选中的概率更大,这样可以使得更多的请求被分配到响应速度快的实例上。
动态调整:随着服务实例的响应时间变化,权重会动态调整。如果一个实例的响应时间突然变长,它的权重会降低,后续请求分配到该实例的概率也会减少。反之,如果一个实例的响应时间变短,它的权重会增加,被选中的概率也会提高。
自适应:能够自动适应服务实例的性能变化,无需手动干预。在服务实例的性能波动较大的情况下,仍然可以有效地分配请求,提高系统的整体性能和响应速度。
在 application.properties 中,使用如下配置:
# 已过期,不推荐使用了 user.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.ResponseTimeWeightedRule # 推荐使用 user.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.WeightedResponseTimeRule
将在名为 user 的客户端上开启响应时间加权策略。
Ribbon 响应时间加权策略通过 com.netflix.loadbalancer.ResponseTimeWeightedRule 和 com.netflix.loadbalancer.WeightedResponseTimeRule 类实现,前者已经废弃,推荐使用后者,下面将对两个类进行分析。
分析源码前,先看看它是如何计算权重的,如下图:
上图中,“总响应时间”指所有服务器平均响应时间的总和。权重计算公式“总响应时间 - 当前服务器的平均响应时间”,由此可以看出,响应时间越短,权重越高。
通过权重选择服务器的步骤如下:
(1)获取最大权重值。
(2)生成一个随机数,该随机数位于 0 ~ 最大权重值之间,不包含最大权重值。
(3)迭代所有服务器,逐一将服务器权重和随机数进行比较,如果当前服务器权重大于等于随机值,则选择该服务器。
接下来仔细看源码。
代码如下:
package com.netflix.loadbalancer; import com.netflix.client.config.IClientConfig; import com.netflix.client.config.IClientConfigKey; import edu.umd.cs.findbugs.annotations.SuppressWarnings; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** @deprecated */ public class ResponseTimeWeightedRule extends RoundRobinRule { public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY; public static final int DEFAULT_TIMER_INTERVAL = 30000; private int serverWeightTaskTimerInterval = 30000; private static final Logger logger; private volatile List<Double> accumulatedWeights = new ArrayList(); private final Random random = new Random(); protected Timer serverWeightTimer = null; protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false); String name = "unknown"; public ResponseTimeWeightedRule() { } public ResponseTimeWeightedRule(ILoadBalancer lb) { super(lb); } public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); if (lb instanceof BaseLoadBalancer) { this.name = ((BaseLoadBalancer)lb).getName(); } this.initialize(lb); } // 初始化 void initialize(ILoadBalancer lb) { // 如果存在定时任务,则先取消 if (this.serverWeightTimer != null) { this.serverWeightTimer.cancel(); } // 创建定时任务 this.serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + this.name, true); this.serverWeightTimer.schedule(new DynamicServerWeightTask(), 0L, (long)this.serverWeightTaskTimerInterval); // 在定时任务出发前先计算一次权重 ServerWeight sw = new ServerWeight(); sw.maintainWeights(); // 定义一个钩子,用于取消定时任务 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { ResponseTimeWeightedRule.logger.info("Stopping NFLoadBalancer-serverWeightTimer-{}", ResponseTimeWeightedRule.this.name); ResponseTimeWeightedRule.this.serverWeightTimer.cancel(); } })); } public void shutdown() { if (this.serverWeightTimer != null) { logger.info("Stopping NFLoadBalancer-serverWeightTimer-{}", this.name); this.serverWeightTimer.cancel(); } } @SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"}) public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } else { Server server = null; while(server == null) { // 获取所有权重,权重和服务器列表的位置一一对应的 List<Double> currentWeights = this.accumulatedWeights; // 如果线程已中断,直接返回 if (Thread.interrupted()) { return null; } // 获取所有服务器 List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { // 服务器为空,直接返回 return null; } int serverIndex = 0; // ??? 没看懂 // 列表最后一个元素如何使最大权重值,List 是无须列表,有没有对其进行明显的排序操作 double maxTotalWeight = currentWeights.size() == 0 ? 0.0 : (Double)currentWeights.get(currentWeights.size() - 1); if (maxTotalWeight < 0.001) { server = super.choose(this.getLoadBalancer(), key); } else { // 获取一个在 double randomWeight = this.random.nextDouble() * maxTotalWeight; // 遍历权重列表,确定服务器下标 int n = 0; // 选中的服务器下标 for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) { Double d = (Double)var13.next(); if (d >= randomWeight) { serverIndex = n; break; } } // 获取服务器信息 server = (Server)allList.get(serverIndex); } if (server == null) { // 如果没有选中,则让出 CPU Thread.yield(); } else { if (server.isAlive()) { // 选中了服务器,且存活,则直接返回 return server; } server = null; } } return server; } } void setWeights(List<Double> weights) { this.accumulatedWeights = weights; } public void initWithNiwsConfig(IClientConfig clientConfig) { super.initWithNiwsConfig(clientConfig); this.serverWeightTaskTimerInterval = (Integer)clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, 30000); } static { WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = WeightedResponseTimeRule.WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY; logger = LoggerFactory.getLogger(ResponseTimeWeightedRule.class); } // 服务器权重,计算每个服务器的权重 class ServerWeight { ServerWeight() { } public void maintainWeights() { // 获取负载均衡器,维护了所有的服务器信息 ILoadBalancer lb = ResponseTimeWeightedRule.this.getLoadBalancer(); if (lb != null) { // 设置服务权重计算开始标志,避免多线程同时计算 if (ResponseTimeWeightedRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) { try { ResponseTimeWeightedRule.logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb; // 获取服务器状态对象集 LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats != null) { // 临时保存所有服务器响应平均响应时间的总时间 double totalResponseTime = 0.0; // 计算总时间 ServerStats ss; for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) { Server server = (Server)var6.next(); ss = stats.getSingleServerStat(server); } // 计算单个服务器权重 Double weightSoFar = 0.0; // 计算出来的所有服务器权重 List<Double> finalWeights = new ArrayList(); // 迭代所有服务器 Iterator var20 = nlb.getAllServers().iterator(); while(var20.hasNext()) { Server serverx = (Server)var20.next(); ServerStats ssx = stats.getSingleServerStat(serverx); // 总时间减去当前服务器平均响应时间,的到权重值 // 如果服务器平均响应时间越小,得到的 weight 值就越大 // 见 choose 方法中的使用 double weight = totalResponseTime - ssx.getResponseTimeAvg(); weightSoFar = weightSoFar + weight; finalWeights.add(weightSoFar); } ResponseTimeWeightedRule.this.setWeights(finalWeights); return; } } catch (Exception var16) { ResponseTimeWeightedRule.logger.error("Error calculating server weights", var16); return; } finally { // 服务器权重计算完后,将标志恢复 ResponseTimeWeightedRule.this.serverWeightAssignmentInProgress.set(false); } } } } } // 权重更新定时任务,计算每个服务器的权重 class DynamicServerWeightTask extends TimerTask { DynamicServerWeightTask() { } public void run() { ServerWeight serverWeight = ResponseTimeWeightedRule.this.new ServerWeight(); try { serverWeight.maintainWeights(); } catch (Exception var3) { ResponseTimeWeightedRule.logger.error("Error running DynamicServerWeightTask for {}", ResponseTimeWeightedRule.this.name, var3); } } } }
代码如下:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package com.netflix.loadbalancer; import com.netflix.client.config.IClientConfig; import com.netflix.client.config.IClientConfigKey; import edu.umd.cs.findbugs.annotations.SuppressWarnings; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class WeightedResponseTimeRule extends RoundRobinRule { // 成员变量也没有变化 // 和 ResponseTimeWeightedRule 一致 public WeightedResponseTimeRule() {} public WeightedResponseTimeRule(ILoadBalancer lb) {} public void setLoadBalancer(ILoadBalancer lb) {} // 和 ResponseTimeWeightedRule 一致 void initialize(ILoadBalancer lb) {} // 和 ResponseTimeWeightedRule 一致 public void shutdown() {} // 新增,返回权重列表的一个不可修改副本列表 List<Double> getAccumulatedWeights() { return Collections.unmodifiableList(this.accumulatedWeights); } @SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"}) public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } else { Server server = null; while(server == null) { List<Double> currentWeights = this.accumulatedWeights; if (Thread.interrupted()) { return null; } List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; double maxTotalWeight = currentWeights.size() == 0 ? 0.0 : (Double)currentWeights.get(currentWeights.size() - 1); // 这里条件变了 if (!(maxTotalWeight < 0.001) && serverCount == currentWeights.size()) { // 计算权重的方式一致 double randomWeight = this.random.nextDouble() * maxTotalWeight; int n = 0; for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) { Double d = (Double)var13.next(); if (d >= randomWeight) { serverIndex = n; break; } } server = (Server)allList.get(serverIndex); } else { server = super.choose(this.getLoadBalancer(), key); if (server == null) { return server; } } if (server == null) { Thread.yield(); } else { if (server.isAlive()) { return server; } server = null; } } return server; } } // 和 ResponseTimeWeightedRule 一致 void setWeights(List<Double> weights) {} public void initWithNiwsConfig(IClientConfig clientConfig) {} class ServerWeight {} class DynamicServerWeightTask extends TimerTask {} }
通过上面分析,对比 ResponseTimeWeightedRule 和 WeightedResponseTimeRule 的代码,几乎一样,没有太多的变化。
提高性能:通过优先选择响应速度快的服务实例,可以减少请求的平均响应时间,提高系统的性能。
负载均衡优化:相比其他简单的负载均衡策略,如轮询或随机选择,ResponseTimeWeightedRule能够更好地根据服务实例的实际性能进行请求分配,实现更优化的负载均衡。
初始阶段不稳定:在系统刚开始运行或者新的服务实例加入时,由于没有足够的响应时间数据,权重计算可能不准确,导致请求分配不够优化。
对异常值敏感:如果某个服务实例出现偶尔的异常高响应时间,可能会对其权重产生较大影响,导致在一段时间内被分配的请求较少。
对响应时间要求较高的系统:如实时交易系统、在线游戏等,需要快速响应的场景下,响应时间加权策略可以确保请求被分配到性能较好的服务实例上,提高用户体验。
服务实例性能差异较大的环境:当不同的服务实例在硬件配置、网络环境等方面存在较大差异时,该策略可以根据实际性能进行请求分配,充分发挥性能较好的实例的作用。