前面章节分析了 Eureka 服务注册的源码,本章将接着分析 Eureka 服务续约的源码实现。
在服务注册完成后,Eureka 客户会每隔 30 秒发送一次心跳来续约。通过续约来告知 Eureka 服务端该 Eureka 客户仍然存在,没有出现问题。正常情况下,如果 Eureka 服务端在 90 秒没有收到 Eureka 客户的续约,它会将实例从其注册表中删除。
注意:建议不要轻易更改续约间隔。
先找到 AbstractInstanceRegistry 类,在 renew() 方法中打一个断点,然后分别开启 Eureka 服务端和客户端,等待片刻自动进入调试模式,调用栈信息如下图:
从上图得知,客户端请求由 InstanceResource 的 renewLease() 方法来接收,然后将请求交给 InstanceRegistry 的 renew() 方法,InstanceRegistry 又将调用 PeerAwareInstanceRegistryImpl 类的 renew() 方法,PeerAwareInstanceRegistryImpl 将继续调用 AbstractInstanceRegistry 类的 renew() 方法。
InstanceRegistry、PeerAwareInstanceRegistryImpl 和 AbstractInstanceRegistry 类的关系如下图:
下面将从源码的角度逐一分析各个类中的 renew() 方法实现。
该类将提供一个 PUT 类型的 HTTP 接口供 Eureka 客户端调用,通过该接口实现服务续约。该方法实现源码如下:
@PUT public Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); // 服务续约(registry 为 InstanceRegistry 类型) boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); // Not found in the registry, immediately ask for a register if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } // Check if we need to sync based on dirty time stamp, the client // instance might have changed some value Response response; if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) { response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); // Store the overridden status since the validation found out the node that replicates wins if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus != null) && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); } } else { response = Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus()); return response; }
该类实现了 InstanceRegistry 接口。InstanceRegistry 接口继承了 LookupService 、LeaseManager 接口,提供应用实例的注册与发现服务。另外,它结合实际业务场景,定义了更加丰富的接口方法。renew() 方法源码如下:
@Override public boolean renew(final String appName, final String serverId, boolean isReplication) { log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication); // 从注册表中按 Application.getName() 的排序,获取所有 Applications 的列表。 List<Application> applications = getSortedApplications(); for (Application input : applications) { if (input.getName().equals(appName)) { InstanceInfo instance = null; for (InstanceInfo info : input.getInstances()) { if (info.getId().equals(serverId)) { instance = info; break; } } // 触发一个 EurekaInstanceRenewedEvent 事件,我们可以通过 ApplicationListener 监听该事件 publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication)); break; } } return super.renew(appName, serverId, isReplication); }
InstanceRegistry 类在服务注册、续约、下线等操作完成后,会调用 PeerAwareInstanceRegistryImpl 的相关逻辑。而 PeerAwareInstanceRegistryImpl 中主要是添加了一个广播的功能,拥有了将服务实例的注册、续约、下线等操作同步到其它 Eureka Server 的能力。我们这里分析一下 renew() 方法,源码如下:
public boolean renew(final String appName, final String id, final boolean isReplication) { // 调用父类 renew() 方法 if (super.renew(appName, id, isReplication)) { // 广播到其他节点 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; } /** * Replicates all eureka actions to peer eureka nodes except for replication * traffic to this node. * */ private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } // 复制到其他节点 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
该类用于处理来自 Eureka 客户端的所有注册表请求。renew() 方法源码如下:
/** * Marks the given instance of the given app name as renewed, and also marks whether it originated from * replication. * * @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean) */ public boolean renew(String appName, String id, boolean isReplication) { RENEW.increment(isReplication); // 所有的服务信息都添加到 registry 这个 map 中, // 格式为:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>() Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToRenew = null; if (gMap != null) { // 主要是为了获取当前服务的一些过期信息 leaseToRenew = gMap.get(id); } if (leaseToRenew == null) { RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null) { // touchASGCache(instanceInfo.getASGName()); InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required", instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status", instanceInfo.getStatus().name(), overriddenInstanceStatus.name(), instanceInfo.getId()); instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } renewsLastMin.increment(); // 主要操作在这里,将最新更新时间重置,剔除任务检查的也就是这个最新更新时间 leaseToRenew.renew(); return true; } } // Lease.java /** * Renew the lease, use renewal duration if it was specified by the * associated {@link T} during registration, otherwise default duration is * {@link #DEFAULT_DURATION_IN_SECS}. */ public void renew() { // 这里去更新时间 lastUpdateTimestamp = System.currentTimeMillis() + duration; }
服务续约的源码分析完了,下章节将接着分析服务下线的源码实现。