Eureka 源码分析(五)

前面章节分析了 Eureka 客户端源码,本章将分析 Eureka 服务端源码。同 Eureka Client 启动一样,需要添加@EnableEurekaServer 注解。

前面章节分析了 Eureka 客户端源码,本章将分析 Eureka 服务端源码。同 Eureka Client 启动一样,需要添加@EnableEurekaServer 注解。

在该注解源码中用 @Import(EurekaServerMarkerConfiguration.class) 表明了程序在启动时会先加载 EurekaServerMarkerConfiguration 配置类中的配置,而在该配置类中,发布了一个标记类 EurekaServerMarkerConfiguration$Marker,该标记类会用于开启后续 EurekaServer 相关配置的加载工作。

@EnableEurekaServer 源码如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

EurekaServerMarkerConfiguration 类的源码如下:

@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {

   @Bean
   public Marker eurekaServerMarkerBean() {
      return new Marker();
   }

   class Marker {

   }

}

EurekaServerMarkerConfiguration 类是一个 @Configuration 配置类,且类中只有一个 Marker 类(该类是一个空类,没有任何实现),并且在该类的 eurekaServerMarkerBean() Bean 方法中创建了 Marker 类的实例。

从注释中可以看到 EurekaServerMarkerConfiguration 是一个激活 EurekaServerAutoConfiguration 的开关。通过之前章节的分析,我们实际可以发现 Spring Boot 相关项目的一些设计模式了,很多的类并不是被显示的加载到容器中,而是通过配置的方式,最经典的方式就是放到 META-INF/spring.factories 文件中去加载,那么我们也来看下 spring-cloud-netflix-eureka-server-{version}.jar!META-INF/spring.factories,具体内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

真正的配置信息在 EurekaServerAutoConfiguration 中,我们继续查看 EurekaServerAutoConfiguration 自动配置类,源码如下:

// 表示这是一个配置类
@Configuration(proxyBeanMethods = false)
// 导入启动 EurekaServer 的 bean
@Import(EurekaServerInitializerConfiguration.class)
// 表示只有在 spring 容器里面含有 Market 这个实例的时候,才会加载当前这个Bean(EurekaServerAutoConfiguration )
// 这就是控制是否开启 EurekaServer 的关键
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class })
// 加载配置文件
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
    //...
}

该类要实现自动配置,必须满足 @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) 条件,该条件要求必须在 BeanFactory 中存在 EurekaServerMarkerConfiguration.Marker 实例,才能正常被执行。

让我们继续看看 EurekaServerAutoConfiguration 类的源码到底干了什么,源码如下:

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
    // ...
    // Eureka-server 的可视化 web 界面就是通过 EurekaController 提供的
   @Bean
   @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
   public EurekaController eurekaController() {
      return new EurekaController(this.applicationInfoManager);
   }

    // ...
    // 接收客户端的注册等请求就是通过 InstanceRegistry 来处理的,是真正处理业务的类
   @Bean
   public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
      this.eurekaClient.getApplications(); // force initialization
      return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient,
            this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
            this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
   }
   // ...
   // 初始化 Eureka-server,会同步其他注册中心的数据到当前注册中心
   @Bean
   public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
         EurekaServerContext serverContext) {
      return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig,
            registry, serverContext);
   }

   // 创建过滤器,并匹配路径 /eureka/*
   @Bean
   public FilterRegistrationBean<?> jerseyFilterRegistration(javax.ws.rs.core.Application eurekaJerseyApp) {
      FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
      bean.setFilter(new ServletContainer(eurekaJerseyApp));
      bean.setOrder(Ordered.LOWEST_PRECEDENCE);
      bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
      return bean;
   }
   //...
}

通过上面的源码分析可知,EurekaServer 在启动的时候,会加载很多 bean 到 Spring 容器中,每个 bean 都实现了各自的功能,其中真正处理客户端请求的类是 InstanceRegistry。源码如下:

public class InstanceRegistry extends PeerAwareInstanceRegistryImpl implements ApplicationContextAware {
    //...
    // 接收 Eureka 客户端的服务注册请求
   @Override
   public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
      handleRegistration(info, leaseDuration, isReplication);
      super.register(info, leaseDuration, isReplication);
   }

   @Override
   public void register(final InstanceInfo info, final boolean isReplication) {
      handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
      super.register(info, isReplication);
   }

   // 接收 Eureka 客户端服务下线请求
   @Override
   public boolean cancel(String appName, String serverId, boolean isReplication) {
      handleCancelation(appName, serverId, isReplication);
      return super.cancel(appName, serverId, isReplication);
   }

   // 接收 Eureka 客户端服务续约请求
   @Override
   public boolean renew(final String appName, final String serverId, boolean isReplication) {
      log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication);
      List<Application> applications = getSortedApplications();
      for (Application input : applications) {
         if (input.getName().equals(appName)) {
            InstanceInfo instance = null;
            for (InstanceInfo info : input.getInstances()) {
               if (info.getId().equals(serverId)) {
                  instance = info;
                  break;
               }
            }
            publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));
            break;
         }
      }
      return super.renew(appName, serverId, isReplication);
   }
   //...
}

以上是在处理客户端的不同请求。但是客户端发送的是 HTTP 请求,这只是一个类,服务端应该也有一个接收 HTTP 请求的类,然后将接收到的请求封装后委托给 InstanceRegistry 来处理具体业务。这个类就是 com.netflix.eureka.resources 包下的ApplicationResource、InstanceResource。

ApplicationResource 类源码如下:

@Produces({"application/xml", "application/json"})
public class ApplicationResource {
    //...
    private final EurekaServerConfig serverConfig;
    private final PeerAwareInstanceRegistry registry;
    private final ResponseCache responseCache;
    //...

    // 获取有关特定 Application 的信息
    @GET
    public Response getApplication(@PathParam("version") String version,
                                   @HeaderParam("Accept") final String acceptHeader,
                                   @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
        //...
    }

    // 获取有关应用程序特定实例的信息
    @Path("{id}")
    public InstanceResource getInstanceInfo(@PathParam("id") String id) {
        return new InstanceResource(this, id, serverConfig, registry);
    }

    // 为 Application 注册有关特定实例的信息
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        //...
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

    //...
}

InstanceResource 类源码如下:

@Produces({"application/xml", "application/json"})
public class InstanceResource {
    //...
    private final PeerAwareInstanceRegistry registry;
    private final EurekaServerConfig serverConfig;
    //...
    // Get请求返回关于实例的InstanceInfo的信息
    @GET
    public Response getInstanceInfo() {
        InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id);
        if (appInfo != null) {
            logger.debug("Found: {} - {}", app.getName(), id);
            return Response.ok(appInfo).build();
        } else {
            logger.debug("Not Found: {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
    }

    // 从客户端实例更新租约的put请求
    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
        //...
        return response;
    }

    // 处理 InstanceStatus 更新
    @PUT
    @Path("status")
    public Response statusUpdate(
            @QueryParam("value") String newStatus,
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        try {
            //...
            boolean isSuccess = registry.statusUpdate(app.getName(), id,
                    InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
                    "true".equals(isReplication));
            //...
        } catch (Throwable e) {
            logger.error("Error updating instance {} for status {}", id,
                    newStatus);
            return Response.serverError().build();
        }
    }

    // 删除实例的状态
    @DELETE
    @Path("status")
    public Response deleteStatusUpdate(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("value") String newStatusValue,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        try {
            //....
            InstanceStatus newStatus = newStatusValue == null ? InstanceStatus.UNKNOWN : InstanceStatus.valueOf(newStatusValue);
            boolean isSuccess = registry.deleteStatusOverride(app.getName(), id,
                    newStatus, lastDirtyTimestamp, "true".equals(isReplication));
            //...
        } catch (Throwable e) {
            logger.error("Error removing instance's {} status override", id);
            return Response.serverError().build();
        }
    }

    // 更新特定于用户的元数据信息
    @PUT
    @Path("metadata")
    public Response updateMetadata(@Context UriInfo uriInfo) {
        try {
            InstanceInfo instanceInfo = registry.getInstanceByAppAndId(app.getName(), id);
            //...
            Map<String, String> metadataMap = instanceInfo.getMetadata();
            // Metadata map is empty - create a new map
            if (Collections.emptyMap().getClass().equals(metadataMap.getClass())) {
                metadataMap = new ConcurrentHashMap<>();
                InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
                builder.setMetadata(metadataMap);
                instanceInfo = builder.build();
            }
            // Add all the user supplied entries to the map
            for (Entry<String, List<String>> entry : entrySet) {
                metadataMap.put(entry.getKey(), entry.getValue().get(0));
            }
            //...
            registry.register(instanceInfo, false);
            return Response.ok().build();
        } catch (Throwable e) {
            logger.error("Error updating metadata for instance {}", id, e);
            return Response.serverError().build();
        }

    }

    // 处理此特定实例的租约取消
    @DELETE
    public Response cancelLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        try {
            boolean isSuccess = registry.cancel(app.getName(), id,
                "true".equals(isReplication));
            //...
        } catch (Throwable e) {
            logger.error("Error (cancel): {} - {}", app.getName(), id, e);
            return Response.serverError().build();
        }

    }
    //...
}

到这里,Eureka 客户端和服务端是怎样启动?以及启动过程就分析完了,后续将继续分析服务注册、取消服务等。

学习必须与实干相结合。 —— 泰戈尔
0 不喜欢
说说我的看法 -
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号