Su的技术博客

  • 首页
  • 原创
  • 视频
  • Java
  • MySQL
  • DDD
  • 事故复盘
  • 架构方案
  • AI
  • Other
  • 工具
    • AI工具集
    • 工具清单
    • JSON在线格式化
    • JSON在线比较
    • SQL在线格式化
  • 打赏
  • 关于
路很长,又很短
  1. 首页
  2. Java
  3. 正文
                           

【原创】Eureka源码剖析之三:服务拉取

2020-01-21 1411点热度 0人点赞 0条评论

 

前面已经剖析了Eureka初始化、Eureks服务注册,现在继续Eureka服务拉取。Eureka服务集群中,客户端会向服务端拉取已经注册的实例,实时更新注册服务列表。

 

〓Eureka Client
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
// DiscoveryClient实例创建时会调用服务拉取,默认开启拉取注册信息。
// 接着调用拉取注册方法,如果拉取失败,则从备份中拉取
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
// 拉取服务失败,则从配置中获取备份注册全类名进行反射调用获取服务信息
fetchRegistryFromBackup();
}
// 在Eureka Client初始化时,会初始化刷新缓存定时任务(每30秒),刷新缓存线程CacheRefreshThread
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
// 拉取服务
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
if (logger.isDebugEnabled()) {
StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode: ");
allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions? ");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
allAppsHashCodes.append(", Remote region: ");
allAppsHashCodes.append(entry.getKey());
allAppsHashCodes.append(" , apps hashcode: ");
allAppsHashCodes.append(entry.getValue().getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
allAppsHashCodes.toString());
}
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
....
// 拉取注册服务
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 获取本地缓存注册信息
Applications applications = getApplications();
// 如果增量开关disabled或者是首次拉取,那么将会(全量拉取)拉取全部实例applications,否则只是增量拉取服务
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
// 增量拉取
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
// 只是简单的统计下实例数,并且打印在日志上而已
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
// 更新实例远程状态之前通知缓存更新
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
// 根据缓存中保存的刷新数据更新远程状态
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
// 全量拉取
private void getAndStoreFullRegistry() throws Throwable {
// 当前更新代数(更新计数)
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the <a href="https://blog.verysu.com/aritcle/tag/eureka" title="查看相关文章:eureka" target="_blank" style="color:#555;text-decoration:underline;font-weight:bold;">eureka</a> server");
Applications apps = null;
// 调用到EurekaServer的ApplicationsResource-getContainers()方法全量获取注册信息
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 更新本地缓存注册服务信息,将其拉取到的信息进行过滤和打乱,
// 对注册信息进行过滤,过滤后只缓存服务状态为UP的实例
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
// 将拉取到的服务进行过滤和打乱顺序,之后存放在remoteRegionVsApps
private Applications filterAndShuffle(Applications apps) {
if (apps != null) {
if (isFetchingRemoteRegionRegistries()) {
Map<String, Applications> remoteRegionVsApps = new ConcurrentHashMap<String, Applications>();
apps.shuffleAndIndexInstances(remoteRegionVsApps, clientConfig, instanceRegionChecker);
for (Applications applications : remoteRegionVsApps.values()) {
applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
this.remoteRegionVsApps = remoteRegionVsApps;
} else {
apps.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
}
return apps;
}
// 调用http get请求获取服务注册列表
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
webResource = webResource.queryParam("regions", regionsParamValue);
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
applications = response.getEntity(Applications.class);
}
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
serviceUrl, urlPath,
regionsParamValue == null ? "" : "regions=" + regionsParamValue,
response == null ? "N/A" : response.getStatus()
);
}
if (response != null) {
response.close();
}
}
}
// 查看拉取增量服务,同样的原理,只不过是向服务端请求时路径是app/delta来区分全部还是增量了。
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
// 如果增量拉取不到服务,那么就进行全量拉取服务
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 将增量拉取的服务更新到本地
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
// 由于某些原因造成实例数量不一致,那么会再次调用远程全量拉取服务
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
// DiscoveryClient实例创建时会调用服务拉取,默认开启拉取注册信息。 // 接着调用拉取注册方法,如果拉取失败,则从备份中拉取 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { // 拉取服务失败,则从配置中获取备份注册全类名进行反射调用获取服务信息 fetchRegistryFromBackup(); } // 在Eureka Client初始化时,会初始化刷新缓存定时任务(每30秒),刷新缓存线程CacheRefreshThread class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; // This makes sure that a dynamic change to remote regions to fetch is honored. String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync synchronized (instanceRegionChecker.getAzToRegionMapper()) { if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { String[] remoteRegions = latestRemoteRegions.split(","); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else { logger.info("Remote regions to fetch modified concurrently," + " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); } } } else { // Just refresh mapping to reflect any DNS/Property change instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } // 拉取服务 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) { allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append(entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(entry.getValue().getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes.toString()); } } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } } .... // 拉取注册服务 private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // 获取本地缓存注册信息 Applications applications = getApplications(); // 如果增量开关disabled或者是首次拉取,那么将会(全量拉取)拉取全部实例applications,否则只是增量拉取服务 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { // 增量拉取 getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); // 只是简单的统计下实例数,并且打印在日志上而已 logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status // 更新实例远程状态之前通知缓存更新 onCacheRefreshed(); // Update remote status based on refreshed data held in the cache // 根据缓存中保存的刷新数据更新远程状态 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; } // 全量拉取 private void getAndStoreFullRegistry() throws Throwable { // 当前更新代数(更新计数) long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the <a href="https://blog.verysu.com/aritcle/tag/eureka" title="查看相关文章:eureka" target="_blank" style="color:#555;text-decoration:underline;font-weight:bold;">eureka</a> server"); Applications apps = null; // 调用到EurekaServer的ApplicationsResource-getContainers()方法全量获取注册信息 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // 更新本地缓存注册服务信息,将其拉取到的信息进行过滤和打乱, // 对注册信息进行过滤,过滤后只缓存服务状态为UP的实例 localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } } // 将拉取到的服务进行过滤和打乱顺序,之后存放在remoteRegionVsApps private Applications filterAndShuffle(Applications apps) { if (apps != null) { if (isFetchingRemoteRegionRegistries()) { Map<String, Applications> remoteRegionVsApps = new ConcurrentHashMap<String, Applications>(); apps.shuffleAndIndexInstances(remoteRegionVsApps, clientConfig, instanceRegionChecker); for (Applications applications : remoteRegionVsApps.values()) { applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } this.remoteRegionVsApps = remoteRegionVsApps; } else { apps.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } } return apps; } // 调用http get请求获取服务注册列表 private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) { ClientResponse response = null; String regionsParamValue = null; try { WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath); if (regions != null && regions.length > 0) { regionsParamValue = StringUtil.join(regions); webResource = webResource.queryParam("regions", regionsParamValue); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); Applications applications = null; if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) { applications = response.getEntity(Applications.class); } return anEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}", serviceUrl, urlPath, regionsParamValue == null ? "" : "regions=" + regionsParamValue, response == null ? "N/A" : response.getStatus() ); } if (response != null) { response.close(); } } } // 查看拉取增量服务,同样的原理,只不过是向服务端请求时路径是app/delta来区分全部还是增量了。 private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } // 如果增量拉取不到服务,那么就进行全量拉取服务 if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { // 将增量拉取的服务更新到本地 updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason // 由于某些原因造成实例数量不一致,那么会再次调用远程全量拉取服务 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
// DiscoveryClient实例创建时会调用服务拉取,默认开启拉取注册信息。
// 接着调用拉取注册方法,如果拉取失败,则从备份中拉取
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        // 拉取服务失败,则从配置中获取备份注册全类名进行反射调用获取服务信息
        fetchRegistryFromBackup();
}

// 在Eureka Client初始化时,会初始化刷新缓存定时任务(每30秒),刷新缓存线程CacheRefreshThread
class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}

void refreshRegistry() {
    try {
        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

        boolean remoteRegionsModified = false;
        // This makes sure that a dynamic change to remote regions to fetch is honored.
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        if (null != latestRemoteRegions) {
            String currentRemoteRegions = remoteRegionsToFetch.get();
            if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                    if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                        String[] remoteRegions = latestRemoteRegions.split(",");
                        remoteRegionsRef.set(remoteRegions);
                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                        remoteRegionsModified = true;
                    } else {
                        logger.info("Remote regions to fetch modified concurrently," +
                                " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                    }
                }
            } else {
                // Just refresh mapping to reflect any DNS/Property change
                instanceRegionChecker.getAzToRegionMapper().refreshMapping();
            }
        }

        // 拉取服务
        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }

        if (logger.isDebugEnabled()) {
            StringBuilder allAppsHashCodes = new StringBuilder();
            allAppsHashCodes.append("Local region apps hashcode: ");
            allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
            allAppsHashCodes.append(", is fetching remote regions? ");
            allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
            for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                allAppsHashCodes.append(", Remote region: ");
                allAppsHashCodes.append(entry.getKey());
                allAppsHashCodes.append(" , apps hashcode: ");
                allAppsHashCodes.append(entry.getValue().getAppsHashCode());
            }
            logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                    allAppsHashCodes.toString());
        }
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }        
}
....

// 拉取注册服务
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // 获取本地缓存注册信息
        Applications applications = getApplications();
        // 如果增量开关disabled或者是首次拉取,那么将会(全量拉取)拉取全部实例applications,否则只是增量拉取服务
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
            logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
            logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
            logger.info("Application is null : {}", (applications == null));
            logger.info("Registered Applications size is zero : {}",
                    (applications.getRegisteredApplications().size() == 0));
            logger.info("Application version is -1: {}", (applications.getVersion() == -1));
            getAndStoreFullRegistry();
        } else {
            // 增量拉取
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        // 只是简单的统计下实例数,并且打印在日志上而已
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // Notify about cache refresh before updating the instance remote status
    // 更新实例远程状态之前通知缓存更新
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    // 根据缓存中保存的刷新数据更新远程状态
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}

// 全量拉取
 private void getAndStoreFullRegistry() throws Throwable {
    // 当前更新代数(更新计数)
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    logger.info("Getting all instance registry info from the eureka server");

    Applications apps = null;
     // 调用到EurekaServer的ApplicationsResource-getContainers()方法全量获取注册信息
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        // 更新本地缓存注册服务信息,将其拉取到的信息进行过滤和打乱,
        // 对注册信息进行过滤,过滤后只缓存服务状态为UP的实例
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

// 将拉取到的服务进行过滤和打乱顺序,之后存放在remoteRegionVsApps
private Applications filterAndShuffle(Applications apps) {
    if (apps != null) {
        if (isFetchingRemoteRegionRegistries()) {
            Map<String, Applications> remoteRegionVsApps = new ConcurrentHashMap<String, Applications>();
            apps.shuffleAndIndexInstances(remoteRegionVsApps, clientConfig, instanceRegionChecker);
            for (Applications applications : remoteRegionVsApps.values()) {
                applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
            }
            this.remoteRegionVsApps = remoteRegionVsApps;
        } else {
            apps.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
        }
    }
    return apps;
}

// 调用http get请求获取服务注册列表
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
    ClientResponse response = null;
    String regionsParamValue = null;
    try {
        WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
        if (regions != null && regions.length > 0) {
            regionsParamValue = StringUtil.join(regions);
            webResource = webResource.queryParam("regions", regionsParamValue);
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
        response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

        Applications applications = null;
        if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
            applications = response.getEntity(Applications.class);
        }
        return anEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                    serviceUrl, urlPath,
                    regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                    response == null ? "N/A" : response.getStatus()
            );
        }
        if (response != null) {
            response.close();
        }
    }
}

// 查看拉取增量服务,同样的原理,只不过是向服务端请求时路径是app/delta来区分全部还是增量了。
private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
    // 如果增量拉取不到服务,那么就进行全量拉取服务
    if (delta == null) {
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                // 将增量拉取的服务更新到本地
                updateDelta(delta);
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        // 由于某些原因造成实例数量不一致,那么会再次调用远程全量拉取服务
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

 

客户端拉取服务存在全量拉取和增量拉取两种方式,在增量拉取的时候如果拉取失败,还是会再用全量拉取的方式去拉取服务。

 

〓Eureka Server
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
// DiscoveryClient实例创建时会调用服务拉取,默认开启拉取注册信息。
// 接着调用拉取注册方法,如果拉取失败,则从备份中拉取
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
// 拉取服务失败,则从配置中获取备份注册全类名进行反射调用获取服务信息
fetchRegistryFromBackup();
}
// 在Eureka Client初始化时,会初始化刷新缓存定时任务(每30秒),刷新缓存线程CacheRefreshThread
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
// 拉取服务
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
if (logger.isDebugEnabled()) {
StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode: ");
allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions? ");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
allAppsHashCodes.append(", Remote region: ");
allAppsHashCodes.append(entry.getKey());
allAppsHashCodes.append(" , apps hashcode: ");
allAppsHashCodes.append(entry.getValue().getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
allAppsHashCodes.toString());
}
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
....
// 拉取注册服务
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 获取本地缓存注册信息
Applications applications = getApplications();
// 如果增量开关disabled或者是首次拉取,那么将会(全量拉取)拉取全部实例applications,否则只是增量拉取服务
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
// 增量拉取
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
// 只是简单的统计下实例数,并且打印在日志上而已
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
// 更新实例远程状态之前通知缓存更新
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
// 根据缓存中保存的刷新数据更新远程状态
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
// 全量拉取
private void getAndStoreFullRegistry() throws Throwable {
// 当前更新代数(更新计数)
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
// 调用到EurekaServer的ApplicationsResource-getContainers()方法全量获取注册信息
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 更新本地缓存注册服务信息,将其拉取到的信息进行过滤和打乱,
// 对注册信息进行过滤,过滤后只缓存服务状态为UP的实例
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
// 将拉取到的服务进行过滤和打乱顺序,之后存放在remoteRegionVsApps
private Applications filterAndShuffle(Applications apps) {
if (apps != null) {
if (isFetchingRemoteRegionRegistries()) {
Map<String, Applications> remoteRegionVsApps = new ConcurrentHashMap<String, Applications>();
apps.shuffleAndIndexInstances(remoteRegionVsApps, clientConfig, instanceRegionChecker);
for (Applications applications : remoteRegionVsApps.values()) {
applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
this.remoteRegionVsApps = remoteRegionVsApps;
} else {
apps.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
}
return apps;
}
// 调用http get请求获取服务注册列表
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
webResource = webResource.queryParam("regions", regionsParamValue);
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
applications = response.getEntity(Applications.class);
}
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
serviceUrl, urlPath,
regionsParamValue == null ? "" : "regions=" + regionsParamValue,
response == null ? "N/A" : response.getStatus()
);
}
if (response != null) {
response.close();
}
}
}
// 查看拉取增量服务,同样的原理,只不过是向服务端请求时路径是app/delta来区分全部还是增量了。
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
// 如果增量拉取不到服务,那么就进行全量拉取服务
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 将增量拉取的服务更新到本地
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
// 由于某些原因造成实例数量不一致,那么会再次调用远程全量拉取服务
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}```
// DiscoveryClient实例创建时会调用服务拉取,默认开启拉取注册信息。 // 接着调用拉取注册方法,如果拉取失败,则从备份中拉取 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { // 拉取服务失败,则从配置中获取备份注册全类名进行反射调用获取服务信息 fetchRegistryFromBackup(); } // 在Eureka Client初始化时,会初始化刷新缓存定时任务(每30秒),刷新缓存线程CacheRefreshThread class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; // This makes sure that a dynamic change to remote regions to fetch is honored. String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync synchronized (instanceRegionChecker.getAzToRegionMapper()) { if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { String[] remoteRegions = latestRemoteRegions.split(","); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else { logger.info("Remote regions to fetch modified concurrently," + " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); } } } else { // Just refresh mapping to reflect any DNS/Property change instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } // 拉取服务 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) { allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append(entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(entry.getValue().getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes.toString()); } } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } } .... // 拉取注册服务 private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // 获取本地缓存注册信息 Applications applications = getApplications(); // 如果增量开关disabled或者是首次拉取,那么将会(全量拉取)拉取全部实例applications,否则只是增量拉取服务 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { // 增量拉取 getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); // 只是简单的统计下实例数,并且打印在日志上而已 logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status // 更新实例远程状态之前通知缓存更新 onCacheRefreshed(); // Update remote status based on refreshed data held in the cache // 根据缓存中保存的刷新数据更新远程状态 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; } // 全量拉取 private void getAndStoreFullRegistry() throws Throwable { // 当前更新代数(更新计数) long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; // 调用到EurekaServer的ApplicationsResource-getContainers()方法全量获取注册信息 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // 更新本地缓存注册服务信息,将其拉取到的信息进行过滤和打乱, // 对注册信息进行过滤,过滤后只缓存服务状态为UP的实例 localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } } // 将拉取到的服务进行过滤和打乱顺序,之后存放在remoteRegionVsApps private Applications filterAndShuffle(Applications apps) { if (apps != null) { if (isFetchingRemoteRegionRegistries()) { Map<String, Applications> remoteRegionVsApps = new ConcurrentHashMap<String, Applications>(); apps.shuffleAndIndexInstances(remoteRegionVsApps, clientConfig, instanceRegionChecker); for (Applications applications : remoteRegionVsApps.values()) { applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } this.remoteRegionVsApps = remoteRegionVsApps; } else { apps.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } } return apps; } // 调用http get请求获取服务注册列表 private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) { ClientResponse response = null; String regionsParamValue = null; try { WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath); if (regions != null && regions.length > 0) { regionsParamValue = StringUtil.join(regions); webResource = webResource.queryParam("regions", regionsParamValue); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); Applications applications = null; if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) { applications = response.getEntity(Applications.class); } return anEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}", serviceUrl, urlPath, regionsParamValue == null ? "" : "regions=" + regionsParamValue, response == null ? "N/A" : response.getStatus() ); } if (response != null) { response.close(); } } } // 查看拉取增量服务,同样的原理,只不过是向服务端请求时路径是app/delta来区分全部还是增量了。 private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } // 如果增量拉取不到服务,那么就进行全量拉取服务 if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { // 将增量拉取的服务更新到本地 updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason // 由于某些原因造成实例数量不一致,那么会再次调用远程全量拉取服务 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }```
// DiscoveryClient实例创建时会调用服务拉取,默认开启拉取注册信息。
// 接着调用拉取注册方法,如果拉取失败,则从备份中拉取
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        // 拉取服务失败,则从配置中获取备份注册全类名进行反射调用获取服务信息
        fetchRegistryFromBackup();
}

// 在Eureka Client初始化时,会初始化刷新缓存定时任务(每30秒),刷新缓存线程CacheRefreshThread
class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}

void refreshRegistry() {
    try {
        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

        boolean remoteRegionsModified = false;
        // This makes sure that a dynamic change to remote regions to fetch is honored.
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        if (null != latestRemoteRegions) {
            String currentRemoteRegions = remoteRegionsToFetch.get();
            if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                    if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                        String[] remoteRegions = latestRemoteRegions.split(",");
                        remoteRegionsRef.set(remoteRegions);
                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                        remoteRegionsModified = true;
                    } else {
                        logger.info("Remote regions to fetch modified concurrently," +
                                " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                    }
                }
            } else {
                // Just refresh mapping to reflect any DNS/Property change
                instanceRegionChecker.getAzToRegionMapper().refreshMapping();
            }
        }

        // 拉取服务
        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }

        if (logger.isDebugEnabled()) {
            StringBuilder allAppsHashCodes = new StringBuilder();
            allAppsHashCodes.append("Local region apps hashcode: ");
            allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
            allAppsHashCodes.append(", is fetching remote regions? ");
            allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
            for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                allAppsHashCodes.append(", Remote region: ");
                allAppsHashCodes.append(entry.getKey());
                allAppsHashCodes.append(" , apps hashcode: ");
                allAppsHashCodes.append(entry.getValue().getAppsHashCode());
            }
            logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                    allAppsHashCodes.toString());
        }
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }        
}
....

// 拉取注册服务
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // 获取本地缓存注册信息
        Applications applications = getApplications();
        // 如果增量开关disabled或者是首次拉取,那么将会(全量拉取)拉取全部实例applications,否则只是增量拉取服务
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
            logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
            logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
            logger.info("Application is null : {}", (applications == null));
            logger.info("Registered Applications size is zero : {}",
                    (applications.getRegisteredApplications().size() == 0));
            logger.info("Application version is -1: {}", (applications.getVersion() == -1));
            getAndStoreFullRegistry();
        } else {
            // 增量拉取
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        // 只是简单的统计下实例数,并且打印在日志上而已
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // Notify about cache refresh before updating the instance remote status
    // 更新实例远程状态之前通知缓存更新
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    // 根据缓存中保存的刷新数据更新远程状态
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}

// 全量拉取
 private void getAndStoreFullRegistry() throws Throwable {
    // 当前更新代数(更新计数)
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    logger.info("Getting all instance registry info from the eureka server");

    Applications apps = null;
     // 调用到EurekaServer的ApplicationsResource-getContainers()方法全量获取注册信息
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        // 更新本地缓存注册服务信息,将其拉取到的信息进行过滤和打乱,
        // 对注册信息进行过滤,过滤后只缓存服务状态为UP的实例
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

// 将拉取到的服务进行过滤和打乱顺序,之后存放在remoteRegionVsApps
private Applications filterAndShuffle(Applications apps) {
    if (apps != null) {
        if (isFetchingRemoteRegionRegistries()) {
            Map<String, Applications> remoteRegionVsApps = new ConcurrentHashMap<String, Applications>();
            apps.shuffleAndIndexInstances(remoteRegionVsApps, clientConfig, instanceRegionChecker);
            for (Applications applications : remoteRegionVsApps.values()) {
                applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
            }
            this.remoteRegionVsApps = remoteRegionVsApps;
        } else {
            apps.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
        }
    }
    return apps;
}

// 调用http get请求获取服务注册列表
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
    ClientResponse response = null;
    String regionsParamValue = null;
    try {
        WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
        if (regions != null && regions.length > 0) {
            regionsParamValue = StringUtil.join(regions);
            webResource = webResource.queryParam("regions", regionsParamValue);
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
        response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

        Applications applications = null;
        if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
            applications = response.getEntity(Applications.class);
        }
        return anEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                    serviceUrl, urlPath,
                    regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                    response == null ? "N/A" : response.getStatus()
            );
        }
        if (response != null) {
            response.close();
        }
    }
}

// 查看拉取增量服务,同样的原理,只不过是向服务端请求时路径是app/delta来区分全部还是增量了。
private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
    // 如果增量拉取不到服务,那么就进行全量拉取服务
    if (delta == null) {
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                // 将增量拉取的服务更新到本地
                updateDelta(delta);
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        // 由于某些原因造成实例数量不一致,那么会再次调用远程全量拉取服务
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}```

 

总结

1)Eureka client拉取服务有两种方式:全量和增量,增量拉取失败则会再请求全量拉取。客户端可能会有两种方式去拉取服务,一个是在客户端启动时,如果配置有开启就会马上拉取服务,另外一个是定时更新缓存服务,定时会向服务端拉取服务。
2)Eureka server接收到请求时,会从自己的缓存中获取服务然后返回给客户端。
 


 

本文仅供学习!所有权归属原作者。侵删!文章来源: 搬运工来架构

更多文章:

  1. Eureka源码剖析之一:初始化-启动
  2. Eureka源码之二:服务注册
  3. Eureka源码剖析之四:服务续约
  4. Eureka源码剖析之五:服务下线
  5. 【进阶玩法】策略+责任链+组合实现合同签章
  6. RocketMQ消息回溯实践与解析
  7. RocketMQ 很慢?引出了一个未解之谜
  8. mysql-connect-java驱动从5.x升级到8.x的CST时区问题
  9. log4j2同步日志引发的性能问题
  10. 殷浩详解DDD 第四讲:领域层设计规范
标签: 原创 开源框架 源码 eureka 注册中心
最后更新:2023-02-25

Cocodroid

专注Java后端,分享技术。

打赏 点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复

广告
最新 热点 推荐
最新 热点 推荐
视频笔记:微服务架构P4 设计模式:每服务数据库、API 网关和事件驱动架构 干货 | 论Elasticsearch数据建模的重要性 马蜂窝消息总线——面向业务的消息服务设计 基于 MySQL Binlog 实现可配置的异构数据同步 视频笔记:Google发布Agent2Agent协议 视频笔记:什么是微服务,为什么是微服务? 视频笔记:什么是AI 智能体? 视频笔记:什么是Flink?
Elasticsearch 使用误区之六——富文本内容写入前不清洗基于 MySQL Binlog 实现可配置的异构数据同步马蜂窝消息总线——面向业务的消息服务设计视频笔记:微服务架构P4 设计模式:每服务数据库、API 网关和事件驱动架构干货 | 论Elasticsearch数据建模的重要性你可以不用RxJava,但必须得领悟它的思想!如何秒级实现接口间“幂等”补偿:一款轻量级仿幂等数据校正处理辅助工具视频笔记:什么是Flink?
IntelliJ IDEA 2022.2 正式发布,功能真心强大! Elasticsearch基础但非常有用的功能之一:别名 4款亲测好用的开发画图工具 大型系统架构重构10步法 系统设计 | 企业应用数据交换 架构师日记-从技术角度揭露电商大促备战的奥秘 【进阶玩法】策略+责任链+组合实现合同签章 事务异常:Transaction rolled back because it has been marked as rollback-only

CRUD (1) Event Sourcing (1) graphql (1) id (1) NoSQL (1) quarkus (1) rest (1) RocketMQ (2) Spring Boot (1) zk (1) zookeeper (1) 上下文 (1) 事务消息 (1) 二级缓存 (1) 值对象 (1) 关系数据库 (1) 分布式缓存 (1) 原子性 (1) 唯一ID (1) 商品 (1) 多对多 (1) 子域 (1) 字符集 (1) 客户端心跳 (1) 幂等 (2) 干货 (1) 并发 (1) 应用场景 (1) 应用架构图 (1) 康威定律 (2) 异步复制 (1) 微服务架构 (3) 总体方案 (1) 技术方案 (2) 技术架构 (2) 技术架构图 (1) 技能 (1) 持续集成 (1) 支撑域 (1) 故障恢复 (1) 数据架构图 (1) 方案选型 (1) 日记 (1) 服务发现 (1) 服务治理 (1) 服务注册 (2) 机房 (1) 核心域 (1) 泄漏 (1) 洋葱架构 (1) 消息队列 (5) 源码剖析 (1) 灰度发布 (1) 熔断 (1) 生态 (1) 画图工具 (1) 研发团队 (1) 线程 (2) 组织架构 (1) 缓存架构 (1) 编码 (1) 视频 (19) 读写分离 (1) 贵州 (1) 软件设计 (1) 迁移 (1) 通用域 (1) 集群化 (1) 雪花算法 (1) 顺序消息 (1)

推荐链接🔗
  • AI工具集
  • 工具箱🛠️

站点已运行 1470 天

COPYRIGHT © 2014-2025 verysu.com . ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

粤ICP备15033072号-2

x

通知