前面章节介绍了 DiscoveryClient 类中关于服务注册、服务续约、服务下线等操作实际上就是通过发送 HTTP 请求到注册中心来实现的。那么这些操作的执行时机是什么时候呢?是什么时候服务注册操作会被调用?下线操作是如何被触发的?
本章将通过分析 DiscoveryClient 类的构造方法来了解服务注册、服务续约、下线操作是怎样触发的。DiscoveryClient 构造方法源码如下:
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { // 参数配置... if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { // 客户端配置为既不注册也不查询数据 // ... // 不需要设置一个网络任务,直接返回 return; // no need to setup up an network tasks and we are done } try { // 创建线程池 // heartbeat 和 cacheRefresh 的默认大小都是 2 - 1 // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // 心跳线程池(维持当前应用续约的线程池,用来持续发送心跳请求) heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // ... } // call and execute the pre registration handler before all background tasks (inc registration) is started // 在启动所有后台任务(inc注册)之前,调用并执行预注册处理程序 if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } // 核心方法调用 ,启动定时任务 // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } //... }
继续查看 initScheduledTasks() 方法,该方法将初始化所有定时任务。源码如下:
/** * Initializes all scheduled tasks. */ private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); // 执行刷新任务的定时器 cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, // 线程任务在这里,主要就是为了执行这个Thread的run() new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } // 客户端默认注册到 eureka,shouldRegisterWithEureka() 默认为 true if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer // 执行心跳任务的定时器 heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator // 获取当前注册服务的基本信息 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { logger.info("Saw local status change event {}", statusChangeEvent); instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 将当前服务注册到注册中心 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
该线程用来执行刷新任务,源码如下:
/** * The task that fetches the registry information at specified intervals. */ class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); // 注册任务刷新 } } @VisibleForTesting void refreshRegistry() { try { //... // 获取注册信息,客户端定时去刷新服务信息 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } //... } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } }
fetchRegistry(remoteRegionsModified) 方法源码如下:
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications // 如果禁用增量,或者这是第一次,请获取所有应用程序 Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); // 从eureka服务器获取完整的注册表信息,并将其存储在本地 getAndStoreFullRegistry(); } else { // 从eureka服务器获取增量注册表信息,并在本地进行更新 getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}", appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e)); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status // 在更新实例远程状态之前通知有关缓存刷新的信息 onCacheRefreshed(); // Update remote status based on refreshed data held in the cache // 根据缓存中保存的刷新数据更新远程状态 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true // 注册表已成功获取,因此返回true return true; }
getAndStoreFullRegistry() 方法将从 eureka 服务器获取完整的注册表信息,并将其存储在本地。代码如下:
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
getAndUpdateDelta() 方法将从 eureka 服务器获取增量注册表信息,并在本地进行更新。源码如下:
private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
上面分析了刷新任务的源码,将在下章节介绍心跳任务……