Eureka 服务端源码分析:服务下线实现

前面章节分析了 Eureka 服务注册、服务续约的源码,本章节将分析 Eureka 的服务下线源码实现。

当服务实例正常关闭时,它会发送一个服务下线的消息给注册中心,注册中心收到信息后,会将该服务实例状态置为下线,并把该信息传播出去。该下线请求不会自动完成,它需要调用以下内容:

DiscoveryManager.getInstance().shutdownComponent()

在开始分析前,我们进入 AbstractInstanceRegistry.internalCancel() 方法,打一个断点查看方法调用栈信息。如下图:

Eureka 服务端源码分析:服务下线实现

从上图得知,InstanceResource 类的 cancelLease() 方法接收 Eureka 客户端的服务取消请求,然后服务取消请求将一次调用如下方法:

  • InstanceResource.cancelLease()

  • InstanceRegistry.cancel()

  • PeerAwareInstanceRegistryImpl.cancel()

  • AbstractInstanceRegistry.cancel()

  • InstanceRegistry.internalCancel()

  • AbstractInstanceRegistry.internalCancel()

下面将逐个分析上面方法的源码。其中,InstanceRegistry、PeerAwareInstanceRegistryImpl 和 AbstractInstanceRegistry 类的继承层次如下图:

Eureka 服务端源码分析:服务下线实现

InstanceResource.cancelLease()

接收 Eureka 客户端发起的服务取消请求。源码如下:

/**
* Handles cancellation of leases for this particular instance.
*
* @param isReplication
*            a header parameter containing information whether this is
*            replicated from other nodes.
* @return response indicating whether the operation was a success or
*         failure.
*/
@DELETE
public Response cancelLease(
       @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
   try {
       // 调用 cancle() 方法处理服务取消请求
       boolean isSuccess = registry.cancel(app.getName(), id,
           "true".equals(isReplication));

       if (isSuccess) {
           logger.debug("Found (Cancel): {} - {}", app.getName(), id);
           return Response.ok().build();
       } else {
           logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
           return Response.status(Status.NOT_FOUND).build();
       }
   } catch (Throwable e) {
       logger.error("Error (cancel): {} - {}", app.getName(), id, e);
       return Response.serverError().build();
   }

}

InstanceRegistry.cancel()

InstanceRegistry 类的 cancel() 方法通过调用 handleCancelation() 方法触发 EurekaInstanceCanceledEvent 事件。源码如下:

@Override
public boolean cancel(String appName, String serverId, boolean isReplication) {
   // 触发 EurekaInstanceCanceledEvent 事件,你可以通过 ApplicationListener 去监听事件
   // 处理一些其他业务逻辑
   handleCancelation(appName, serverId, isReplication);
   return super.cancel(appName, serverId, isReplication);
}

private void handleCancelation(String appName, String id, boolean isReplication) {
   log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication);
   publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
}

PeerAwareInstanceRegistryImpl.cancel()

PeerAwareInstanceRegistryImpl 类提供了将当前节点注册信息同步到其他节点的功能。这里将介绍 cancel() 方法,该方法在我们成功调用父类的 cancel() 方法后,才进而将服务取消操作复制到其他 Eureka 节点,注意:包含当前节点。源码如下:

/**
* (non-Javadoc)
*
* @see com.netflix.eureka.registry.InstanceRegistry#cancel(java.lang.String,
* java.lang.String, long, boolean)
*/
@Override
public boolean cancel(final String appName, final String id,
                     final boolean isReplication) {
   // 调用父类的 cancle() 方法,继续处理服务取消请求
   // 如果服务取消成功,则将操作同步到其他 Eureka 节点
   if (super.cancel(appName, id, isReplication)) {
       // 将服务取消操作复制到其他 Eureka 节点,不包含当前节点
       replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
       return true;
   }
   return false;
}

/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*/
private void replicateToPeers(Action action, String appName, String id,
                             InstanceInfo info /* optional */,
                             InstanceStatus newStatus /* optional */, boolean isReplication) {
   Stopwatch tracer = action.getTimer().start();
   try {
       if (isReplication) {
           numberOfReplicationsLastMin.increment();
       }
       // If it is a replication already, do not replicate again as this will create a poison replication
       if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
           return;
       }

       for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
           // If the url represents this host, do not replicate to yourself.
           if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
               continue;
           }
           // 操作复制到其他 Eureka 节点
           replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
       }
   } finally {
       tracer.stop();
   }
}

AbstractInstanceRegistry.cancel()

取消服务注册,源码如下:

/**
* Cancels the registration of an instance.
*
* <p>
* This is normally invoked by a client when it shuts down informing the
* server to remove the instance from traffic.
* </p>
*
* @param appName the application name of the application.
* @param id the unique identifier of the instance.
* @param isReplication true if this is a replication event from other nodes, false
*                      otherwise.
* @return true if the instance was removed from the {@link AbstractInstanceRegistry} successfully, false otherwise.
*/
@Override
public boolean cancel(String appName, String id, boolean isReplication) {
   // 取消实例的注册
   return internalCancel(appName, id, isReplication);
}

InstanceRegistry.internalCancel()

@Override
protected boolean internalCancel(String appName, String id, boolean isReplication) {
   // 触发 EurekaInstanceCanceledEvent 事件,你可以通过 ApplicationListener 去监听事件
   handleCancelation(appName, id, isReplication);
   // 调用父类的 internalCancel() 方法
   return super.internalCancel(appName, id, isReplication);
}

private void handleCancelation(String appName, String id, boolean isReplication) {
   log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication);
   publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
}

AbstractInstanceRegistry.internalCancel()

/**
* {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each
* cancel request is replicated to the peers. This is however not desired for expires which would be counted
* in the remote peers as valid cancellations, so self preservation mode would not kick-in.
*/
protected boolean internalCancel(String appName, String id, boolean isReplication) {
   // private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
   // private final Lock read = readWriteLock.readLock();
   // 从可重入锁中获取了只读锁
   read.lock();
   try {
       CANCEL.increment(isReplication);
       // 所有的服务信息都添加到registry这个map中,
       // 格式为:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()
       Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
       Lease<InstanceInfo> leaseToCancel = null;
       if (gMap != null) {
           leaseToCancel = gMap.remove(id);
       }
       recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
       InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
       if (instanceStatus != null) {
           logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
       }
       if (leaseToCancel == null) {
           CANCEL_NOT_FOUND.increment(isReplication);
           logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
           return false;
       } else {
           // 将当前服务的剔除时间置为当前时间 evictionTimestamp = System.currentTimeMillis();
           leaseToCancel.cancel();
           // 获取服务信息
           InstanceInfo instanceInfo = leaseToCancel.getHolder();
           String vip = null;
           String svip = null;
           if (instanceInfo != null) {
               // 将服务信息置为已删除
               instanceInfo.setActionType(ActionType.DELETED);
               recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
               instanceInfo.setLastUpdatedTimestamp();
               vip = instanceInfo.getVIPAddress();
               svip = instanceInfo.getSecureVipAddress();
           }
           // 清理缓存
           invalidateCache(appName, vip, svip);
           logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
       }
   } finally {
       read.unlock();
   }

   synchronized (lock) {
       if (this.expectedNumberOfClientsSendingRenews > 0) {
           // Since the client wants to cancel it, reduce the number of clients to send renews.
           this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
           updateRenewsPerMinThreshold();
       }
   }

   return true;
}

到这里,Eureka 源码就分析完了。仅供学习交流,如有问题请及时反馈,非常感谢。

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号