前面章节分析了 Eureka 服务注册的源码,本章将接着分析 Eureka 服务续约的源码实现。
在服务注册完成后,Eureka 客户会每隔 30 秒发送一次心跳来续约。通过续约来告知 Eureka 服务端该 Eureka 客户仍然存在,没有出现问题。正常情况下,如果 Eureka 服务端在 90 秒没有收到 Eureka 客户的续约,它会将实例从其注册表中删除。
注意:建议不要轻易更改续约间隔。
先找到 AbstractInstanceRegistry 类,在 renew() 方法中打一个断点,然后分别开启 Eureka 服务端和客户端,等待片刻自动进入调试模式,调用栈信息如下图:

从上图得知,客户端请求由 InstanceResource 的 renewLease() 方法来接收,然后将请求交给 InstanceRegistry 的 renew() 方法,InstanceRegistry 又将调用 PeerAwareInstanceRegistryImpl 类的 renew() 方法,PeerAwareInstanceRegistryImpl 将继续调用 AbstractInstanceRegistry 类的 renew() 方法。
InstanceRegistry、PeerAwareInstanceRegistryImpl 和 AbstractInstanceRegistry 类的关系如下图:

下面将从源码的角度逐一分析各个类中的 renew() 方法实现。
该类将提供一个 PUT 类型的 HTTP 接口供 Eureka 客户端调用,通过该接口实现服务续约。该方法实现源码如下:
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 服务续约(registry 为 InstanceRegistry 类型)
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}该类实现了 InstanceRegistry 接口。InstanceRegistry 接口继承了 LookupService 、LeaseManager 接口,提供应用实例的注册与发现服务。另外,它结合实际业务场景,定义了更加丰富的接口方法。renew() 方法源码如下:
@Override
public boolean renew(final String appName, final String serverId, boolean isReplication) {
log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication);
// 从注册表中按 Application.getName() 的排序,获取所有 Applications 的列表。
List<Application> applications = getSortedApplications();
for (Application input : applications) {
if (input.getName().equals(appName)) {
InstanceInfo instance = null;
for (InstanceInfo info : input.getInstances()) {
if (info.getId().equals(serverId)) {
instance = info;
break;
}
}
// 触发一个 EurekaInstanceRenewedEvent 事件,我们可以通过 ApplicationListener 监听该事件
publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));
break;
}
}
return super.renew(appName, serverId, isReplication);
}InstanceRegistry 类在服务注册、续约、下线等操作完成后,会调用 PeerAwareInstanceRegistryImpl 的相关逻辑。而 PeerAwareInstanceRegistryImpl 中主要是添加了一个广播的功能,拥有了将服务实例的注册、续约、下线等操作同步到其它 Eureka Server 的能力。我们这里分析一下 renew() 方法,源码如下:
public boolean renew(final String appName, final String id, final boolean isReplication) {
// 调用父类 renew() 方法
if (super.renew(appName, id, isReplication)) {
// 广播到其他节点
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 复制到其他节点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}该类用于处理来自 Eureka 客户端的所有注册表请求。renew() 方法源码如下:
/**
* Marks the given instance of the given app name as renewed, and also marks whether it originated from
* replication.
*
* @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
*/
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
// 所有的服务信息都添加到 registry 这个 map 中,
// 格式为:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
// 主要是为了获取当前服务的一些过期信息
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
overriddenInstanceStatus.name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
// 主要操作在这里,将最新更新时间重置,剔除任务检查的也就是这个最新更新时间
leaseToRenew.renew();
return true;
}
}
// Lease.java
/**
* Renew the lease, use renewal duration if it was specified by the
* associated {@link T} during registration, otherwise default duration is
* {@link #DEFAULT_DURATION_IN_SECS}.
*/
public void renew() {
// 这里去更新时间
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}服务续约的源码分析完了,下章节将接着分析服务下线的源码实现。