前面章节介绍了 DiscoveryClient 类中关于服务注册、服务续约、服务下线等操作实际上就是通过发送 HTTP 请求到注册中心来实现的。那么这些操作的执行时机是什么时候呢?是什么时候服务注册操作会被调用?下线操作是如何被触发的?
本章将通过分析 DiscoveryClient 类的构造方法来了解服务注册、服务续约、下线操作是怎样触发的。DiscoveryClient 构造方法源码如下:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
// 参数配置...
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
// 客户端配置为既不注册也不查询数据
// ...
// 不需要设置一个网络任务,直接返回
return; // no need to setup up an network tasks and we are done
}
try {
// 创建线程池
// heartbeat 和 cacheRefresh 的默认大小都是 2 - 1
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 心跳线程池(维持当前应用续约的线程池,用来持续发送心跳请求)
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// ...
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
// 在启动所有后台任务(inc注册)之前,调用并执行预注册处理程序
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
// 核心方法调用 ,启动定时任务
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
//...
}继续查看 initScheduledTasks() 方法,该方法将初始化所有定时任务。源码如下:
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 执行刷新任务的定时器
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
// 线程任务在这里,主要就是为了执行这个Thread的run()
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 客户端默认注册到 eureka,shouldRegisterWithEureka() 默认为 true
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
// 执行心跳任务的定时器
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
// 获取当前注册服务的基本信息
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
logger.info("Saw local status change event {}", statusChangeEvent);
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 将当前服务注册到注册中心
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}该线程用来执行刷新任务,源码如下:
/**
* The task that fetches the registry information at specified intervals.
*/
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry(); // 注册任务刷新
}
}
@VisibleForTesting
void refreshRegistry() {
try {
//...
// 获取注册信息,客户端定时去刷新服务信息
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
//...
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}fetchRegistry(remoteRegionsModified) 方法源码如下:
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
// 如果禁用增量,或者这是第一次,请获取所有应用程序
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
// 从eureka服务器获取完整的注册表信息,并将其存储在本地
getAndStoreFullRegistry();
} else {
// 从eureka服务器获取增量注册表信息,并在本地进行更新
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
// 在更新实例远程状态之前通知有关缓存刷新的信息
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
// 根据缓存中保存的刷新数据更新远程状态
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
// 注册表已成功获取,因此返回true
return true;
}getAndStoreFullRegistry() 方法将从 eureka 服务器获取完整的注册表信息,并将其存储在本地。代码如下:
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}getAndUpdateDelta() 方法将从 eureka 服务器获取增量注册表信息,并在本地进行更新。源码如下:
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}上面分析了刷新任务的源码,将在下章节介绍心跳任务……