前面章节分析了 Eureka 客户端源码,本章将分析 Eureka 服务端源码。同 Eureka Client 启动一样,需要添加@EnableEurekaServer 注解。
在该注解源码中用 @Import(EurekaServerMarkerConfiguration.class) 表明了程序在启动时会先加载 EurekaServerMarkerConfiguration 配置类中的配置,而在该配置类中,发布了一个标记类 EurekaServerMarkerConfiguration$Marker,该标记类会用于开启后续 EurekaServer 相关配置的加载工作。
@EnableEurekaServer 源码如下:
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(EurekaServerMarkerConfiguration.class) public @interface EnableEurekaServer { }
EurekaServerMarkerConfiguration 类的源码如下:
@Configuration(proxyBeanMethods = false) public class EurekaServerMarkerConfiguration { @Bean public Marker eurekaServerMarkerBean() { return new Marker(); } class Marker { } }
EurekaServerMarkerConfiguration 类是一个 @Configuration 配置类,且类中只有一个 Marker 类(该类是一个空类,没有任何实现),并且在该类的 eurekaServerMarkerBean() Bean 方法中创建了 Marker 类的实例。
从注释中可以看到 EurekaServerMarkerConfiguration 是一个激活 EurekaServerAutoConfiguration 的开关。通过之前章节的分析,我们实际可以发现 Spring Boot 相关项目的一些设计模式了,很多的类并不是被显示的加载到容器中,而是通过配置的方式,最经典的方式就是放到 META-INF/spring.factories 文件中去加载,那么我们也来看下 spring-cloud-netflix-eureka-server-{version}.jar!META-INF/spring.factories,具体内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
真正的配置信息在 EurekaServerAutoConfiguration 中,我们继续查看 EurekaServerAutoConfiguration 自动配置类,源码如下:
// 表示这是一个配置类 @Configuration(proxyBeanMethods = false) // 导入启动 EurekaServer 的 bean @Import(EurekaServerInitializerConfiguration.class) // 表示只有在 spring 容器里面含有 Market 这个实例的时候,才会加载当前这个Bean(EurekaServerAutoConfiguration ) // 这就是控制是否开启 EurekaServer 的关键 @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) // 加载配置文件 @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration implements WebMvcConfigurer { //... }
该类要实现自动配置,必须满足 @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) 条件,该条件要求必须在 BeanFactory 中存在 EurekaServerMarkerConfiguration.Marker 实例,才能正常被执行。
让我们继续看看 EurekaServerAutoConfiguration 类的源码到底干了什么,源码如下:
@Configuration(proxyBeanMethods = false) @Import(EurekaServerInitializerConfiguration.class) @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration implements WebMvcConfigurer { // ... // Eureka-server 的可视化 web 界面就是通过 EurekaController 提供的 @Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true) public EurekaController eurekaController() { return new EurekaController(this.applicationInfoManager); } // ... // 接收客户端的注册等请求就是通过 InstanceRegistry 来处理的,是真正处理业务的类 @Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); } // ... // 初始化 Eureka-server,会同步其他注册中心的数据到当前注册中心 @Bean public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext); } // 创建过滤器,并匹配路径 /eureka/* @Bean public FilterRegistrationBean<?> jerseyFilterRegistration(javax.ws.rs.core.Application eurekaJerseyApp) { FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>(); bean.setFilter(new ServletContainer(eurekaJerseyApp)); bean.setOrder(Ordered.LOWEST_PRECEDENCE); bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*")); return bean; } //... }
通过上面的源码分析可知,EurekaServer 在启动的时候,会加载很多 bean 到 Spring 容器中,每个 bean 都实现了各自的功能,其中真正处理客户端请求的类是 InstanceRegistry。源码如下:
public class InstanceRegistry extends PeerAwareInstanceRegistryImpl implements ApplicationContextAware { //... // 接收 Eureka 客户端的服务注册请求 @Override public void register(InstanceInfo info, int leaseDuration, boolean isReplication) { handleRegistration(info, leaseDuration, isReplication); super.register(info, leaseDuration, isReplication); } @Override public void register(final InstanceInfo info, final boolean isReplication) { handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication); super.register(info, isReplication); } // 接收 Eureka 客户端服务下线请求 @Override public boolean cancel(String appName, String serverId, boolean isReplication) { handleCancelation(appName, serverId, isReplication); return super.cancel(appName, serverId, isReplication); } // 接收 Eureka 客户端服务续约请求 @Override public boolean renew(final String appName, final String serverId, boolean isReplication) { log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication); List<Application> applications = getSortedApplications(); for (Application input : applications) { if (input.getName().equals(appName)) { InstanceInfo instance = null; for (InstanceInfo info : input.getInstances()) { if (info.getId().equals(serverId)) { instance = info; break; } } publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication)); break; } } return super.renew(appName, serverId, isReplication); } //... }
以上是在处理客户端的不同请求。但是客户端发送的是 HTTP 请求,这只是一个类,服务端应该也有一个接收 HTTP 请求的类,然后将接收到的请求封装后委托给 InstanceRegistry 来处理具体业务。这个类就是 com.netflix.eureka.resources 包下的ApplicationResource、InstanceResource。
ApplicationResource 类源码如下:
@Produces({"application/xml", "application/json"}) public class ApplicationResource { //... private final EurekaServerConfig serverConfig; private final PeerAwareInstanceRegistry registry; private final ResponseCache responseCache; //... // 获取有关特定 Application 的信息 @GET public Response getApplication(@PathParam("version") String version, @HeaderParam("Accept") final String acceptHeader, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) { //... } // 获取有关应用程序特定实例的信息 @Path("{id}") public InstanceResource getInstanceInfo(@PathParam("id") String id) { return new InstanceResource(this, id, serverConfig, registry); } // 为 Application 注册有关特定实例的信息 @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { //... registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible } //... }
InstanceResource 类源码如下:
@Produces({"application/xml", "application/json"}) public class InstanceResource { //... private final PeerAwareInstanceRegistry registry; private final EurekaServerConfig serverConfig; //... // Get请求返回关于实例的InstanceInfo的信息 @GET public Response getInstanceInfo() { InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id); if (appInfo != null) { logger.debug("Found: {} - {}", app.getName(), id); return Response.ok(appInfo).build(); } else { logger.debug("Not Found: {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } } // 从客户端实例更新租约的put请求 @PUT public Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); //... return response; } // 处理 InstanceStatus 更新 @PUT @Path("status") public Response statusUpdate( @QueryParam("value") String newStatus, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { try { //... boolean isSuccess = registry.statusUpdate(app.getName(), id, InstanceStatus.valueOf(newStatus), lastDirtyTimestamp, "true".equals(isReplication)); //... } catch (Throwable e) { logger.error("Error updating instance {} for status {}", id, newStatus); return Response.serverError().build(); } } // 删除实例的状态 @DELETE @Path("status") public Response deleteStatusUpdate( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("value") String newStatusValue, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { try { //.... InstanceStatus newStatus = newStatusValue == null ? InstanceStatus.UNKNOWN : InstanceStatus.valueOf(newStatusValue); boolean isSuccess = registry.deleteStatusOverride(app.getName(), id, newStatus, lastDirtyTimestamp, "true".equals(isReplication)); //... } catch (Throwable e) { logger.error("Error removing instance's {} status override", id); return Response.serverError().build(); } } // 更新特定于用户的元数据信息 @PUT @Path("metadata") public Response updateMetadata(@Context UriInfo uriInfo) { try { InstanceInfo instanceInfo = registry.getInstanceByAppAndId(app.getName(), id); //... Map<String, String> metadataMap = instanceInfo.getMetadata(); // Metadata map is empty - create a new map if (Collections.emptyMap().getClass().equals(metadataMap.getClass())) { metadataMap = new ConcurrentHashMap<>(); InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo); builder.setMetadata(metadataMap); instanceInfo = builder.build(); } // Add all the user supplied entries to the map for (Entry<String, List<String>> entry : entrySet) { metadataMap.put(entry.getKey(), entry.getValue().get(0)); } //... registry.register(instanceInfo, false); return Response.ok().build(); } catch (Throwable e) { logger.error("Error updating metadata for instance {}", id, e); return Response.serverError().build(); } } // 处理此特定实例的租约取消 @DELETE public Response cancelLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { try { boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication)); //... } catch (Throwable e) { logger.error("Error (cancel): {} - {}", app.getName(), id, e); return Response.serverError().build(); } } //... }
到这里,Eureka 客户端和服务端是怎样启动?以及启动过程就分析完了,后续将继续分析服务注册、取消服务等。