// DiscoveryClient类是一个单例,实现了EurekaClient接口 @Singleton public class DiscoveryClient implements EurekaClient { // DiscoveryClient类的构造函数 @Inject // 构造器注入 DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); // 拉取服务计数器:单调地增加生成计数器,以确保陈旧的线程不会将注册表重置为旧版本。 fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); // 过时注册统计 if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } // 过时心跳统计 if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); // 既不需要注册到Eureka也不拉取注册服务 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // 定义了2个线程大小的定时线程池:一个是刷新缓存CacheFreshThread,一个是心跳线程HeartbeatThread scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // 心跳线程池 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 刷新缓存线程池 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 实例化EurekaTransport eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } // 在启动时需要拉取注册服务列表,增量拉取之后如果失败就会从备份里面再次拉取 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } // 初始化定时任务 initScheduledTasks(); try { // 监控DiscoverClient Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); } /** * Initializes all scheduled tasks. */ private void initScheduledTasks() { // 需要拉取注册服务,则定时拉取刷新缓存CacheRefreshThread,可以看#Eureka服务拉取 一篇 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer // 默认30秒,定时拉取服务 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } // 需要注册到Eureka,则定时心跳请求服务端保持客户端存活,即服务续约。可以看#Eureka服务续约 一篇 if (clientConfig.shouldRegisterWithEureka()) { // 默认30秒,定时进行服务续约 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator 当前实例节点复制器实例化 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, // 实例信息复制间隔,默认30秒 clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize // 状态变化监听器 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 实例节点复制器启动 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } } } class InstanceInfoReplicator implements Runnable { // 启动,延迟40秒启动 public void start(int initialDelayMs) { // CAS保证启动一次 if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty(); // for initial register // 定时任务定时调用实例信息复制器(线程),逻辑看run方法 Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } public void run() { try { // 刷新实例信息 discoveryClient.refreshInstanceInfo(); // 获取实例信息的脏时间戳,如果存在则进行服务注册,服务注册可以看 #Eureka服务注册 一篇 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { //Client进行注册操作 discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { // 定时每30秒进行刷新或注册请求 Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } }
①刷新缓存定时服务,即定时拉取服务列表,默认每30秒进行定时拉取服务列表;同时②开启心跳线程定时服务,即定时向服务端进行服务续约,默认每30秒进行定时续约。③启动实例信息复制器进行刷新服务实例信息或服务注册请求。
〓EurekaServer端启动
EurekaBootStrap是server端启动入口,PeerAwareInstanceRegistryImpl是真正的核心类,我们看下其类图:
// 继承Servlet上下文监听器,说明Eureka Server是基于Servlet public class EurekaBootStrap implements ServletContextListener { ... @Override public void contextInitialized(ServletContextEvent event) { try { // 初始化环境 initEurekaEnvironment(); // 初始化Euraka Server Context initEurekaServerContext(); ServletContext sc = event.getServletContext(); sc.setAttribute(EurekaServerContext.class.getName(), serverContext); } catch (Throwable e) { logger.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } } /** * init hook for server context. Override for custom logic. */ protected void initEurekaServerContext() throws Exception { // Eureka Server读取配置 EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig(); // For backward compatibility JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); logger.info("Initializing the eureka client..."); logger.info(eurekaServerConfig.getJsonCodecName()); // Server默认加载编码JSON、XML ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig); ApplicationInfoManager applicationInfoManager = null; // EurekaClient初始化 if (eurekaClient == null) { EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext()) ? new CloudInstanceConfig() : new MyDataCenterInstanceConfig(); applicationInfoManager = new ApplicationInfoManager( instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig(); eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); } else { applicationInfoManager = eurekaClient.getApplicationInfoManager(); } // 如果是使用AWS平台,这里不涉及 PeerAwareInstanceRegistry registry; if (isAws(applicationInfoManager.getInfo())) { registry = new AwsInstanceRegistry( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager); awsBinder.start(); } else { // 初始化集群实例,父类AbstractInstanceRegistry registry = new PeerAwareInstanceRegistryImpl( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); } // 初始化集群节点实例 PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes( registry, eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, applicationInfoManager ); // 初始化EurekaSeverContext实例 serverContext = new DefaultEurekaServerContext( eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, applicationInfoManager ); // 使用非注入的方式Holder保持EurekaServerContext实例 EurekaServerContextHolder.initialize(serverContext); // EurekaServerContext初始化 serverContext.initialize(); logger.info("Initialized server context"); // 复制注册列表到集群上的其它节点 int registryCount = registry.syncUp(); registry.openForTraffic(applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); } // 初始化获得集群节点 protected PeerEurekaNodes getPeerEurekaNodes(PeerAwareInstanceRegistry registry, EurekaServerConfig eurekaServerConfig, EurekaClientConfig eurekaClientConfig, ServerCodecs serverCodecs, ApplicationInfoManager applicationInfoManager) { PeerEurekaNodes peerEurekaNodes = new PeerEurekaNodes( registry, eurekaServerConfig, eurekaClientConfig, serverCodecs, applicationInfoManager ); return peerEurekaNodes; } ... } @Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { /** * 从eureka节点peer封装注册信息,进行节点信息的注册同步,如果操作失败则会进入重试 * Populates the registry information from a peer eureka node. This * operation fails over to other nodes until the list is exhausted if the * communication fails. */ @Override public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; // 默认最大5次进行同步注册 for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { // 重试等待30秒 Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { // 节点注册 register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; } @Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. // 每分钟服务续约的数量:每个节点服务续约每30秒一次,那么多个节点需要count*2次 this.expectedNumberOfRenewsPerMin = count * 2; // 服务续约最小百分阈值默认为0.85。即最小阈值为最小预约数乘以0.85 this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); logger.info("Got " + count + " instances from neighboring DS node"); logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 调用父类的定时启动剔除任务定时启动剔除任务,一旦达到剔除条件则会调用服务下线接口,可以看 #Eureka服务下线 一篇 super.postInit(); } } // PeerAwareInstanceRegistryImpl父类 public abstract class AbstractInstanceRegistry implements InstanceRegistry { ... /** * Create a new, empty instance registry. */ protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) { this.serverConfig = serverConfig; this.clientConfig = clientConfig; this.serverCodecs = serverCodecs; // 近期下线队列recentCanceledQueue this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000); // 近期注册队列recentRegisteredQueue this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000); // 近期1分钟的续约计量统计任务 this.renewsLastMin = new MeasuredRate(1000 * 60 * 1); // 定时任务:定期清除近期变更队列 this.deltaRetentionTimer.schedule(getDeltaRetentionTask(), serverConfig.getDeltaRetentionTimerIntervalInMs(), serverConfig.getDeltaRetentionTimerIntervalInMs()); } private TimerTask getDeltaRetentionTask() { return new TimerTask() { @Override public void run() { Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator(); while (it.hasNext()) { if (it.next().getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) { it.remove(); } else { break; } } } }; } ... // 定时启动剔除任务 protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); } /** * Registers a new instance with a given duration. * 给定一个租期时间注册一个新的实例 * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean) */ public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); // 通过appname获取实例注册数据 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); // 注册计数器+1 REGISTER.increment(isReplication); // 如果实例对应数据不存在,则进行初始化 if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } // 通过实例id获得实例租约信息,如果租约信息存在,那么会比较LastDirtyTimestamp,如果租约信息大于传进来的实例的LastDirtyTimestamp,那么则直接将使用缓存中的注册实例 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // 租约信息不存在则作为一个新的注册 synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { // Since the client wants to cancel it, reduce the threshold // (1 // for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } // 根据注册者和租约时间,实例化租约实例化信息 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); // 将注册者添加到最新注册队列 synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens // 注册者重写状态不为UNKNOWN,并且重写状态Map不包含实例,则将其put到Map中 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp // 如果租约被注册后为UP状态,那么标记服务启动时间戳,而且只是首次才会进行设置 if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); // 将租约存放到近期变更队列 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); // 使对应实例缓存失效 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } } } @Singleton public class DefaultEurekaServerContext implements EurekaServerContext { ... @PostConstruct @Override public void initialize() throws Exception { logger.info("Initializing ..."); peerEurekaNodes.start(); registry.init(peerEurekaNodes); logger.info("Initialized"); } ... } // 集群节点:定时更新集群节点 @Singleton public class PeerEurekaNodes { ... public void start() { taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { // 预先更新集群节点 updatePeerEurekaNodes(resolvePeerUrls()); Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: " + node.getServiceUrl()); } } /** * Given new set of replica URLs, destroy {@link PeerEurekaNode}s no longer available, and * create new ones. 新增集群备份节点,移除不再可用的节点,并且创建新的node节点 * * @param newPeerUrls peer node URLs; this collection should have local node's URL filtered out */ protected void updatePeerEurekaNodes(List<String> newPeerUrls) { // 参数newPeerUrls为空则不处理 if (newPeerUrls.isEmpty()) { logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry"); return; } // 即将关闭的节点集合 Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); // 因为新增的节点可能在即将关闭的节点集合内,所以先从中移除。 toShutdown.removeAll(newPeerUrls); // 即将新增的节点集合 Set<String> toAdd = new HashSet<>(newPeerUrls); // 从新增节点集合中移除本地节点集合 toAdd.removeAll(peerEurekaNodeUrls); // 如果即将关闭的节点集合为空并且即将新增的节点也为空,则说明没有变化,不需要处理,立即返回。 if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change return; } // 移除不再可能的节点,当前包含全部已有节点 List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes); // 即将关闭的节点集合不为空,则将当前节点集合中移除对应节点 if (!toShutdown.isEmpty()) { logger.info("Removing no longer available peer nodes {}", toShutdown); int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); if (toShutdown.contains(eurekaNode.getServiceUrl())) { // 移除节点,节点进行关闭shutdown操作 newNodeList.remove(i); eurekaNode.shutDown(); } else { i++; } } } // 新增节点 if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (String peerUrl : toAdd) { newNodeList.add(createPeerEurekaNode(peerUrl)); } } // 重新赋值 this.peerEurekaNodes = newNodeList; this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); } // 实时获取排除自身节点所剩下集群的节点地址 protected List<String> resolvePeerUrls() { InstanceInfo myInfo = applicationInfoManager.getInfo(); String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo); List<String> replicaUrls = EndpointUtils .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo)); int idx = 0; while (idx < replicaUrls.size()) { if (isThisMyUrl(replicaUrls.get(idx))) { replicaUrls.remove(idx); } else { idx++; } } return replicaUrls; } ... }
1)Eureka Server端启动入口类是继承ServletContextListener的EurekaBoostrap,首先会初始化eureka环境和初始化上下文,初始化上下文时会进行EurekaServer配置的初始化、JSON、XML编码器转化注册、初始化集群注册实例、和集群节点实例等。
〓Eureka client
启动核心类DiscoveryClient,启动时会开启三个定时任务:
①刷新缓存定时服务,即定时拉取服务列表,默认每30秒进行定时拉取服务列表;同时②开启心跳线程定时服务,即定时向服务端进行服务续约,默认每30秒进行定时续约。③启动实例信息复制器进行刷新服务实例信息或服务注册请求。
〓Eureka Server
启动入口类是EurekaBootStrap,核心类是PeerAwareInstanceRegistryImpl,
初始化:首先会初始化eureka环境和初始化上下文,初始化上下文时会进行EurekaServer配置的初始化、JSON、XML编码器转化注册、初始化集群注册实例、和集群节点实例等。
更新节点信息定时任务、定时更新续约数据:调用EurekaServerContext上下文进行初始化:启动默认每10分钟定时更新集群节点数据、响应缓存initializedResponseCache初始化、启动默认每15分钟定时任务更新服务续约最小期望数量和最小续约阈值。
集群同步:集群节点的同步注册:支持失败最大5次重试,进行集群节点间的相互注册。
初始化服务续约最小期望数量和最小续约阈值。
本地缓存数据:服务端注册表维护了近期下线环形队列recentCanceledQueue、近期注册环形队列recentRegisteredQueue、近期变化队列recentlyChangeQueue。
剔除任务:启动默认每1分钟定时任务EvictionTask从最新变化队列中清除过期项(内存记录节点数据)。
这里只是针对Eureka启动初始化做了简要剖析,更多详细的篇章请看后面的分析。
本文仅供学习!所有权归属原作者。侵删!文章来源: 搬运工来架构
文章评论