Su的技术博客

  • 首页
  • Java
  • MySQL
  • DDD
  • 事故复盘
  • 架构方案
  • Other
  • 工具
  • 打赏
  • 关于
  1. 首页
  2. Java
  3. 正文
                           

【原创】Eureka源码剖析之五:服务下线

2020-01-26 26点热度 0人点赞 0条评论

 

现在研究下Eureka服务下线的源码。由服务续约的源码我们知道,如果客户端在90秒内没有继续跟服务端进行心跳的话,服务端会进行下线客户端并且更改状态将其剔除,并且也会在集群中告知(同步)其它节点。

 

〓Eureka Client
/**
 * 注销服务,调用client的cancel服务,往里面看也就是调用了服务端的http delete 请求进行服务下线
 */
void unregister() {
    // It can be null if shouldRegisterWithEureka == false
    if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
        try {
            logger.info("Unregistering ...");
            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
            logger.info(PREFIX + appPathIdentifier + " - deregister  status: " + httpResponse.getStatusCode());
        } catch (Exception e) {
            logger.error(PREFIX + appPathIdentifier + " - de-registration failed" + e.getMessage(), e);
        }
    }
}

@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder.delete(ClientResponse.class);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

 

接下来再看是哪里调用注销服务:
@PreDestroy
 @Override
 public synchronized void shutdown() {
     if (isShutdown.compareAndSet(false, true)) {
         logger.info("Shutting down DiscoveryClient ...");
         // 注销服务状态的监听器
         if (statusChangeListener != null && applicationInfoManager != null) {
             applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
         }
         // 取消定时器任务,关闭线程池等
         cancelScheduledTasks();

         // 服务实例以及被注册,那么设置实例状态为DOWN,并且进行注销操作
         if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {
             applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
             unregister();
         }
         // 关闭eurekaTransport
         if (eurekaTransport != null) {
             eurekaTransport.shutdown();
         }
         // 关闭监控
         heartbeatStalenessMonitor.shutdown();
         registryStalenessMonitor.shutdown();

         logger.info("Completed shut down of DiscoveryClient");
     }
 }

 // DiscoveryManager调用了DiscveryClient的shutdown方法,这里就是服务下线的入口
 public void shutdownComponent() {
     if (discoveryClient != null) {
         try {
             discoveryClient.shutdown();
             discoveryClient = null;
         } catch (Throwable th) {
             logger.error("Error in shutting down client", th);
         }
     }
 }
〓Eureka Server
// 服务端提供的对外服务下线接口
@DELETE
public Response cancelLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    boolean isSuccess = registry.cancel(app.getName(), id,
            "true".equals(isReplication));

    if (isSuccess) {
        logger.debug("Found (Cancel): " + app.getName() + " - " + id);
        return Response.ok().build();
    } else {
        logger.info("Not Found (Cancel): " + app.getName() + " - " + id);
        return Response.status(Status.NOT_FOUND).build();
    }
}

// 服务端下线入口
@Override
public boolean cancel(final String appName, final String id,
                      final boolean isReplication) {
    // 调用父类cancel方法,将实例信息添加到最近下线队列、最近变更队列,并且使本地缓存失效操作
    if (super.cancel(appName, id, isReplication)) {
        // 服务端集群之间进行Cancel同步操作
        replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
        synchronized (lock) {
            if (this.expectedNumberOfRenewsPerMin > 0) {
                // Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
                this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
                this.numberOfRenewsPerMinThreshold =
                        (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
            }
        }
        return true;
    }
    return false;
}

// 最终调用这个方法进行实例信息移除
protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        // 读锁
        read.lock();
        // 取消(下线)计数
        CANCEL.increment(isReplication);
        // 获取续约实例
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
            leaseToCancel = gMap.remove(id);
        }
        // 添加到近期取消队列
        recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
        // 移除实例状态
        InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
        if (instanceStatus != null) {
            logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
        }
        if (leaseToCancel == null) {
            // 续约实例不存在,返回false
            CANCEL_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            // 续约实例执行取消操作:设置剔除时间
            leaseToCancel.cancel();
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
                // 设置实例行为为delete
                instanceInfo.setActionType(ActionType.DELETED);
                // 添加到近期变化队列
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                instanceInfo.setLastUpdatedTimestamp();
                vip = instanceInfo.getVIPAddress();
                svip = instanceInfo.getSecureVipAddress();
            }
            // 使响应缓存失效
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            return true;
        }
    } finally {
        // 释放读锁
        read.unlock();
    }
}

 

总结

Eureka client执行DiscoveryManager调用了DiscveryClient的shutdown方法进行服务的下线操作,然后服务端接收到http delete请求之后进行服务的相关下线操作,并且同步到集群中的其它节点。
Eureka server则会将实例信息进行剔除处理,并且添加到近期变化队列和近期取消队列里。
 


 

本文仅供学习!所有权归属原作者。侵删!文章来源: 搬运工来架构

标签: 原创 开源框架 源码 eureka 注册中心
最后更新:2023-02-25

Cocodroid

专注Java后端,分享技术。

打赏 点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复

最新 热点 推荐
最新 热点 推荐
殷浩详解DDD 第四讲:领域层设计规范 既生@Resource,何生@Autowired? Go整洁架构实践 接口优化的常见方案实战总结 QQ音乐高可用架构体系 构建一个布隆过滤器 —— Building a Bloom filter
殷浩详解DDD 第四讲:领域层设计规范Redis为什么这么快?构建一个布隆过滤器 —— Building a Bloom filterQQ音乐高可用架构体系接口优化的常见方案实战总结Go整洁架构实践
百度工程师浅谈分布式日志 笔记 | 面试官问我:TCP与UDP的区别 腾讯的这道面试题,我懵了... —— Redis的hashtable是如何扩容的 ElasticSearch之各大版本演进,发布8.0.0 Alpha 2版本 如何设计一款高性能分布式锁,实现数据的安全访问? 猪八戒网DevOps之Java组件安全检测

@Autowired (1) @Resource (1) API网关 (1) ddd (6) DP (1) ElasticSearch (1) eureka (7) go (1) HTTP (1) IDEA (1) iOS (1) Java (8) JSR (1) QQ音乐 (1) repository (1) Spring (1) SQL优化 (1) 代理 (1) 依赖注入 (1) 同城双活 (1) 垃圾回收 (1) 定时任务 (1) 容灾 (1) 布隆过滤器 (1) 异地双活 (1) 接口优化 (1) 故障转移 (1) 数据库 (2) 整洁架构 (1) 文件网关 (1) 方案 (2) 服务续约 (1) 注册中心 (7) 流水账 (1) 流量 (1) 第五 (1) 线上案例 (1) 线上问题 (2) 缓存 (1) 缓存击穿 (1) 编译 (3) 网络 (3) 聊聊 (1) 订单 (1) 设计规范 (1) 详解 (1) 连接池 (1) 限流 (1) 领域驱动设计 (4) 高可用 (1)

COPYRIGHT © 2014-2023 verysu.com . ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

粤ICP备15033072号-2