// DiscoveryClient实例创建时会调用服务拉取,默认开启拉取注册信息。 // 接着调用拉取注册方法,如果拉取失败,则从备份中拉取 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { // 拉取服务失败,则从配置中获取备份注册全类名进行反射调用获取服务信息 fetchRegistryFromBackup(); } // 在Eureka Client初始化时,会初始化刷新缓存定时任务(每30秒),刷新缓存线程CacheRefreshThread class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; // This makes sure that a dynamic change to remote regions to fetch is honored. String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync synchronized (instanceRegionChecker.getAzToRegionMapper()) { if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { String[] remoteRegions = latestRemoteRegions.split(","); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else { logger.info("Remote regions to fetch modified concurrently," + " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); } } } else { // Just refresh mapping to reflect any DNS/Property change instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } // 拉取服务 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) { allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append(entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(entry.getValue().getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes.toString()); } } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } } .... // 拉取注册服务 private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // 获取本地缓存注册信息 Applications applications = getApplications(); // 如果增量开关disabled或者是首次拉取,那么将会(全量拉取)拉取全部实例applications,否则只是增量拉取服务 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { // 增量拉取 getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); // 只是简单的统计下实例数,并且打印在日志上而已 logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status // 更新实例远程状态之前通知缓存更新 onCacheRefreshed(); // Update remote status based on refreshed data held in the cache // 根据缓存中保存的刷新数据更新远程状态 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; } // 全量拉取 private void getAndStoreFullRegistry() throws Throwable { // 当前更新代数(更新计数) long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; // 调用到EurekaServer的ApplicationsResource-getContainers()方法全量获取注册信息 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // 更新本地缓存注册服务信息,将其拉取到的信息进行过滤和打乱, // 对注册信息进行过滤,过滤后只缓存服务状态为UP的实例 localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } } // 将拉取到的服务进行过滤和打乱顺序,之后存放在remoteRegionVsApps private Applications filterAndShuffle(Applications apps) { if (apps != null) { if (isFetchingRemoteRegionRegistries()) { Map<String, Applications> remoteRegionVsApps = new ConcurrentHashMap<String, Applications>(); apps.shuffleAndIndexInstances(remoteRegionVsApps, clientConfig, instanceRegionChecker); for (Applications applications : remoteRegionVsApps.values()) { applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } this.remoteRegionVsApps = remoteRegionVsApps; } else { apps.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } } return apps; } // 调用http get请求获取服务注册列表 private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) { ClientResponse response = null; String regionsParamValue = null; try { WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath); if (regions != null && regions.length > 0) { regionsParamValue = StringUtil.join(regions); webResource = webResource.queryParam("regions", regionsParamValue); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); Applications applications = null; if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) { applications = response.getEntity(Applications.class); } return anEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}", serviceUrl, urlPath, regionsParamValue == null ? "" : "regions=" + regionsParamValue, response == null ? "N/A" : response.getStatus() ); } if (response != null) { response.close(); } } } // 查看拉取增量服务,同样的原理,只不过是向服务端请求时路径是app/delta来区分全部还是增量了。 private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } // 如果增量拉取不到服务,那么就进行全量拉取服务 if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { // 将增量拉取的服务更新到本地 updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason // 由于某些原因造成实例数量不一致,那么会再次调用远程全量拉取服务 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
客户端拉取服务存在全量拉取和增量拉取两种方式,在增量拉取的时候如果拉取失败,还是会再用全量拉取的方式去拉取服务。
// DiscoveryClient实例创建时会调用服务拉取,默认开启拉取注册信息。 // 接着调用拉取注册方法,如果拉取失败,则从备份中拉取 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { // 拉取服务失败,则从配置中获取备份注册全类名进行反射调用获取服务信息 fetchRegistryFromBackup(); } // 在Eureka Client初始化时,会初始化刷新缓存定时任务(每30秒),刷新缓存线程CacheRefreshThread class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; // This makes sure that a dynamic change to remote regions to fetch is honored. String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync synchronized (instanceRegionChecker.getAzToRegionMapper()) { if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { String[] remoteRegions = latestRemoteRegions.split(","); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else { logger.info("Remote regions to fetch modified concurrently," + " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); } } } else { // Just refresh mapping to reflect any DNS/Property change instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } // 拉取服务 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) { allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append(entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(entry.getValue().getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes.toString()); } } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } } .... // 拉取注册服务 private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // 获取本地缓存注册信息 Applications applications = getApplications(); // 如果增量开关disabled或者是首次拉取,那么将会(全量拉取)拉取全部实例applications,否则只是增量拉取服务 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { // 增量拉取 getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); // 只是简单的统计下实例数,并且打印在日志上而已 logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status // 更新实例远程状态之前通知缓存更新 onCacheRefreshed(); // Update remote status based on refreshed data held in the cache // 根据缓存中保存的刷新数据更新远程状态 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; } // 全量拉取 private void getAndStoreFullRegistry() throws Throwable { // 当前更新代数(更新计数) long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; // 调用到EurekaServer的ApplicationsResource-getContainers()方法全量获取注册信息 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // 更新本地缓存注册服务信息,将其拉取到的信息进行过滤和打乱, // 对注册信息进行过滤,过滤后只缓存服务状态为UP的实例 localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } } // 将拉取到的服务进行过滤和打乱顺序,之后存放在remoteRegionVsApps private Applications filterAndShuffle(Applications apps) { if (apps != null) { if (isFetchingRemoteRegionRegistries()) { Map<String, Applications> remoteRegionVsApps = new ConcurrentHashMap<String, Applications>(); apps.shuffleAndIndexInstances(remoteRegionVsApps, clientConfig, instanceRegionChecker); for (Applications applications : remoteRegionVsApps.values()) { applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } this.remoteRegionVsApps = remoteRegionVsApps; } else { apps.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } } return apps; } // 调用http get请求获取服务注册列表 private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) { ClientResponse response = null; String regionsParamValue = null; try { WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath); if (regions != null && regions.length > 0) { regionsParamValue = StringUtil.join(regions); webResource = webResource.queryParam("regions", regionsParamValue); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); Applications applications = null; if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) { applications = response.getEntity(Applications.class); } return anEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}", serviceUrl, urlPath, regionsParamValue == null ? "" : "regions=" + regionsParamValue, response == null ? "N/A" : response.getStatus() ); } if (response != null) { response.close(); } } } // 查看拉取增量服务,同样的原理,只不过是向服务端请求时路径是app/delta来区分全部还是增量了。 private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } // 如果增量拉取不到服务,那么就进行全量拉取服务 if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { // 将增量拉取的服务更新到本地 updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason // 由于某些原因造成实例数量不一致,那么会再次调用远程全量拉取服务 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }```
本文仅供学习!所有权归属原作者。侵删!文章来源: 搬运工来架构
文章评论