前面章节分析了 Eureka 服务注册、服务续约的源码,本章节将分析 Eureka 的服务下线源码实现。
当服务实例正常关闭时,它会发送一个服务下线的消息给注册中心,注册中心收到信息后,会将该服务实例状态置为下线,并把该信息传播出去。该下线请求不会自动完成,它需要调用以下内容:
DiscoveryManager.getInstance().shutdownComponent()
在开始分析前,我们进入 AbstractInstanceRegistry.internalCancel() 方法,打一个断点查看方法调用栈信息。如下图:
从上图得知,InstanceResource 类的 cancelLease() 方法接收 Eureka 客户端的服务取消请求,然后服务取消请求将一次调用如下方法:
InstanceResource.cancelLease()
InstanceRegistry.cancel()
PeerAwareInstanceRegistryImpl.cancel()
AbstractInstanceRegistry.cancel()
InstanceRegistry.internalCancel()
AbstractInstanceRegistry.internalCancel()
下面将逐个分析上面方法的源码。其中,InstanceRegistry、PeerAwareInstanceRegistryImpl 和 AbstractInstanceRegistry 类的继承层次如下图:
接收 Eureka 客户端发起的服务取消请求。源码如下:
/** * Handles cancellation of leases for this particular instance. * * @param isReplication * a header parameter containing information whether this is * replicated from other nodes. * @return response indicating whether the operation was a success or * failure. */ @DELETE public Response cancelLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { try { // 调用 cancle() 方法处理服务取消请求 boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication)); if (isSuccess) { logger.debug("Found (Cancel): {} - {}", app.getName(), id); return Response.ok().build(); } else { logger.info("Not Found (Cancel): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } } catch (Throwable e) { logger.error("Error (cancel): {} - {}", app.getName(), id, e); return Response.serverError().build(); } }
InstanceRegistry 类的 cancel() 方法通过调用 handleCancelation() 方法触发 EurekaInstanceCanceledEvent 事件。源码如下:
@Override public boolean cancel(String appName, String serverId, boolean isReplication) { // 触发 EurekaInstanceCanceledEvent 事件,你可以通过 ApplicationListener 去监听事件 // 处理一些其他业务逻辑 handleCancelation(appName, serverId, isReplication); return super.cancel(appName, serverId, isReplication); } private void handleCancelation(String appName, String id, boolean isReplication) { log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication); publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication)); }
PeerAwareInstanceRegistryImpl 类提供了将当前节点注册信息同步到其他节点的功能。这里将介绍 cancel() 方法,该方法在我们成功调用父类的 cancel() 方法后,才进而将服务取消操作复制到其他 Eureka 节点,注意:包含当前节点。源码如下:
/** * (non-Javadoc) * * @see com.netflix.eureka.registry.InstanceRegistry#cancel(java.lang.String, * java.lang.String, long, boolean) */ @Override public boolean cancel(final String appName, final String id, final boolean isReplication) { // 调用父类的 cancle() 方法,继续处理服务取消请求 // 如果服务取消成功,则将操作同步到其他 Eureka 节点 if (super.cancel(appName, id, isReplication)) { // 将服务取消操作复制到其他 Eureka 节点,不包含当前节点 replicateToPeers(Action.Cancel, appName, id, null, null, isReplication); return true; } return false; } /** * Replicates all eureka actions to peer eureka nodes except for replication * traffic to this node. */ private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } // 操作复制到其他 Eureka 节点 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
取消服务注册,源码如下:
/** * Cancels the registration of an instance. * * <p> * This is normally invoked by a client when it shuts down informing the * server to remove the instance from traffic. * </p> * * @param appName the application name of the application. * @param id the unique identifier of the instance. * @param isReplication true if this is a replication event from other nodes, false * otherwise. * @return true if the instance was removed from the {@link AbstractInstanceRegistry} successfully, false otherwise. */ @Override public boolean cancel(String appName, String id, boolean isReplication) { // 取消实例的注册 return internalCancel(appName, id, isReplication); }
@Override protected boolean internalCancel(String appName, String id, boolean isReplication) { // 触发 EurekaInstanceCanceledEvent 事件,你可以通过 ApplicationListener 去监听事件 handleCancelation(appName, id, isReplication); // 调用父类的 internalCancel() 方法 return super.internalCancel(appName, id, isReplication); } private void handleCancelation(String appName, String id, boolean isReplication) { log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication); publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication)); }
/** * {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each * cancel request is replicated to the peers. This is however not desired for expires which would be counted * in the remote peers as valid cancellations, so self preservation mode would not kick-in. */ protected boolean internalCancel(String appName, String id, boolean isReplication) { // private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // private final Lock read = readWriteLock.readLock(); // 从可重入锁中获取了只读锁 read.lock(); try { CANCEL.increment(isReplication); // 所有的服务信息都添加到registry这个map中, // 格式为:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>() Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToCancel = null; if (gMap != null) { leaseToCancel = gMap.remove(id); } recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")")); InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); if (instanceStatus != null) { logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name()); } if (leaseToCancel == null) { CANCEL_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); return false; } else { // 将当前服务的剔除时间置为当前时间 evictionTimestamp = System.currentTimeMillis(); leaseToCancel.cancel(); // 获取服务信息 InstanceInfo instanceInfo = leaseToCancel.getHolder(); String vip = null; String svip = null; if (instanceInfo != null) { // 将服务信息置为已删除 instanceInfo.setActionType(ActionType.DELETED); recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel)); instanceInfo.setLastUpdatedTimestamp(); vip = instanceInfo.getVIPAddress(); svip = instanceInfo.getSecureVipAddress(); } // 清理缓存 invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); } } finally { read.unlock(); } synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to cancel it, reduce the number of clients to send renews. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; updateRenewsPerMinThreshold(); } } return true; }
到这里,Eureka 源码就分析完了。仅供学习交流,如有问题请及时反馈,非常感谢。