Eureka 源码分析(八)

前面章节分析了 Eureka 服务注册、服务续约的源码,本章节将分析 Eureka 的服务下线源码实现。

前面章节分析了 Eureka 服务注册、服务续约的源码,本章节将分析 Eureka 的服务下线源码实现。

当服务实例正常关闭时,它会发送一个服务下线的消息给注册中心,注册中心收到信息后,会将该服务实例状态置为下线,并把该信息传播出去。该下线请求不会自动完成,它需要调用以下内容:

DiscoveryManager.getInstance().shutdownComponent()

在开始分析前,我们进入 AbstractInstanceRegistry.internalCancel() 方法,打一个断点查看方法调用栈信息。如下图:

Eureka 源码分析(八)

从上图得知,InstanceResource 类的 cancelLease() 方法接收 Eureka 客户端的服务取消请求,然后服务取消请求将一次调用如下方法:

  • InstanceResource.cancelLease()

  • InstanceRegistry.cancel()

  • PeerAwareInstanceRegistryImpl.cancel()

  • AbstractInstanceRegistry.cancel()

  • InstanceRegistry.internalCancel()

  • AbstractInstanceRegistry.internalCancel()

下面将逐个分析上面方法的源码。其中,InstanceRegistry、PeerAwareInstanceRegistryImpl 和 AbstractInstanceRegistry 类的继承层次如下图:

Eureka 源码分析(八)

InstanceResource.cancelLease()

接收 Eureka 客户端发起的服务取消请求。源码如下:

/**
 * Handles cancellation of leases for this particular instance.
 *
 * @param isReplication
 *            a header parameter containing information whether this is
 *            replicated from other nodes.
 * @return response indicating whether the operation was a success or
 *         failure.
 */
@DELETE
public Response cancelLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    try {
        // 调用 cancle() 方法处理服务取消请求
        boolean isSuccess = registry.cancel(app.getName(), id,
            "true".equals(isReplication));

        if (isSuccess) {
            logger.debug("Found (Cancel): {} - {}", app.getName(), id);
            return Response.ok().build();
        } else {
            logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
    } catch (Throwable e) {
        logger.error("Error (cancel): {} - {}", app.getName(), id, e);
        return Response.serverError().build();
    }

}

InstanceRegistry.cancel()

InstanceRegistry 类的 cancel() 方法通过调用 handleCancelation() 方法触发 EurekaInstanceCanceledEvent 事件。源码如下:

@Override
public boolean cancel(String appName, String serverId, boolean isReplication) {
    // 触发 EurekaInstanceCanceledEvent 事件,你可以通过 ApplicationListener 去监听事件
    // 处理一些其他业务逻辑
    handleCancelation(appName, serverId, isReplication);
    return super.cancel(appName, serverId, isReplication);
}

private void handleCancelation(String appName, String id, boolean isReplication) {
    log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication);
    publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
}

PeerAwareInstanceRegistryImpl.cancel()

PeerAwareInstanceRegistryImpl 类提供了将当前节点注册信息同步到其他节点的功能。这里将介绍 cancel() 方法,该方法在我们成功调用父类的 cancel() 方法后,才进而将服务取消操作复制到其他 Eureka 节点,注意:包含当前节点。源码如下:

/**
 * (non-Javadoc)
 *
 * @see com.netflix.eureka.registry.InstanceRegistry#cancel(java.lang.String,
 * java.lang.String, long, boolean)
 */
@Override
public boolean cancel(final String appName, final String id,
                      final boolean isReplication) {
    // 调用父类的 cancle() 方法,继续处理服务取消请求
    // 如果服务取消成功,则将操作同步到其他 Eureka 节点
    if (super.cancel(appName, id, isReplication)) {
        // 将服务取消操作复制到其他 Eureka 节点,不包含当前节点
        replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

/**
 * Replicates all eureka actions to peer eureka nodes except for replication
 * traffic to this node.
 */
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 操作复制到其他 Eureka 节点
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

AbstractInstanceRegistry.cancel()

取消服务注册,源码如下:

/**
 * Cancels the registration of an instance.
 *
 * <p>
 * This is normally invoked by a client when it shuts down informing the
 * server to remove the instance from traffic.
 * </p>
 *
 * @param appName the application name of the application.
 * @param id the unique identifier of the instance.
 * @param isReplication true if this is a replication event from other nodes, false
 *                      otherwise.
 * @return true if the instance was removed from the {@link AbstractInstanceRegistry} successfully, false otherwise.
 */
@Override
public boolean cancel(String appName, String id, boolean isReplication) {
    // 取消实例的注册
    return internalCancel(appName, id, isReplication);
}

InstanceRegistry.internalCancel()

@Override
protected boolean internalCancel(String appName, String id, boolean isReplication) {
    // 触发 EurekaInstanceCanceledEvent 事件,你可以通过 ApplicationListener 去监听事件
    handleCancelation(appName, id, isReplication);
    // 调用父类的 internalCancel() 方法
    return super.internalCancel(appName, id, isReplication);
}

private void handleCancelation(String appName, String id, boolean isReplication) {
    log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication);
    publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
}

AbstractInstanceRegistry.internalCancel()

/**
 * {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each
 * cancel request is replicated to the peers. This is however not desired for expires which would be counted
 * in the remote peers as valid cancellations, so self preservation mode would not kick-in.
 */
protected boolean internalCancel(String appName, String id, boolean isReplication) {
    // private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    // private final Lock read = readWriteLock.readLock();
    // 从可重入锁中获取了只读锁
    read.lock();
    try {
        CANCEL.increment(isReplication);
        // 所有的服务信息都添加到registry这个map中,
        // 格式为:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
            leaseToCancel = gMap.remove(id);
        }
        recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
        InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
        if (instanceStatus != null) {
            logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
        }
        if (leaseToCancel == null) {
            CANCEL_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            // 将当前服务的剔除时间置为当前时间 evictionTimestamp = System.currentTimeMillis();
            leaseToCancel.cancel();
            // 获取服务信息
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
                // 将服务信息置为已删除
                instanceInfo.setActionType(ActionType.DELETED);
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                instanceInfo.setLastUpdatedTimestamp();
                vip = instanceInfo.getVIPAddress();
                svip = instanceInfo.getSecureVipAddress();
            }
            // 清理缓存
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
        }
    } finally {
        read.unlock();
    }

    synchronized (lock) {
        if (this.expectedNumberOfClientsSendingRenews > 0) {
            // Since the client wants to cancel it, reduce the number of clients to send renews.
            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
            updateRenewsPerMinThreshold();
        }
    }

    return true;
}

到这里,Eureka 源码就分析完了。仅供学习交流,如有问题请及时反馈,非常感谢。

最灵繁的人也看不见自己的背脊。——非洲
0 不喜欢
说说我的看法 -
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号