Su的技术博客

  • 首页
  • 原创
  • 视频
  • Java
  • MySQL
  • DDD
  • 事故复盘
  • 架构方案
  • AI
  • Other
  • 工具
    • AI工具集
    • 工具清单
    • JSON在线格式化
    • JSON在线比较
    • SQL在线格式化
  • 打赏
  • 关于
路很长,又很短
  1. 首页
  2. Java
  3. 正文
                           

【原创】Eureka源码剖析之四:服务续约

2020-01-23 1383点热度 0人点赞 0条评论

 

这里主要看下Eureka关于服务续约的源码,其实大致跟服务注册流程类似,不过更多细节和逻辑还是很大不同的。

 

〓Eureka Client
// 在DiscoveryClient里有renew方法,大概知道其是服务续约的入口。renew使用http的方式发送心跳给服务端,如果服务端返回404,说明是某些原因造成服务是没有注册成功,那么就会再次调用register注册接口进行注册。
@Singleton
public class DiscoveryClient implements EurekaClient {

    ...
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }
    ...
}

// 这里使用Jersey put的方式发送心跳sendHeartBeat
public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient {
    ...
    /**
     * Compared to regular heartbeat, in the replication channel the server may return a more up to date
     * instance copy.
     */
    @Override
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
        String urlPath = "apps/" + appName + '/' + id;
        ClientResponse response = null;
        try {
            WebResource webResource = jerseyClient.getClient().resource(serviceUrl)
                    .path(urlPath)
                    .queryParam("status", info.getStatus().toString())
                    .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
            if (overriddenStatus != null) {
                webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
            }
            Builder requestBuilder = webResource.getRequestBuilder();
            addExtraHeaders(requestBuilder);
            response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class);
            InstanceInfo infoFromPeer = null;
            if (response.getStatus() == Status.CONFLICT.getStatusCode() && response.hasEntity()) {
                infoFromPeer = response.getEntity(InstanceInfo.class);
            }
            return anEurekaHttpResponse(response.getStatus(), infoFromPeer).type(MediaType.APPLICATION_JSON_TYPE).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("[heartbeat] Jersey HTTP PUT {}; statusCode={}", urlPath, response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
    ...
}

 

接下来再看哪里调用了它:
 /**
 * The heartbeat task that renews the lease in the given intervals.
 * 初始化定时任务时,定时调度心跳线程进行续约服务的调用
 */
private class HeartbeatThread implements Runnable {

    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

 // Heartbeat timer
scheduler.schedule(
        new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
        ),
        renewalIntervalInSecs, TimeUnit.SECONDS);

 // default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
    new ThreadFactoryBuilder()
            .setNameFormat("DiscoveryClient-%d")
            .setDaemon(true)
            .build());

 

跟踪了上面的源码我们知道:
服务续约是用定时任务做心跳HeartbeatThread来实现,scheduler定时器使用了两个线程,1个是给刷新缓存服务,1个是给服务续约。
LEASE_RENEWAL_INTERVAL_SECONDS = 30,续约服务是定时30秒进行服务续约。
接下来,再看下服务端源码:
〓Eureka Server
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
    ...
    public boolean renew(final String appName, final String id, final boolean isReplication) {
        if (super.renew(appName, id, isReplication)) {
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }
    ...
}

// 从这里看出服务端接收客户端的参数一致,也说明这里是接收客户端put请求的续约服务。服务端校验了dirty时间戳,并且将服务端的状态进行覆盖重写。
@Produces({"application/xml", "application/json"})
public class InstanceResource {
    ...
    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response = null;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
        return response;
    }
    ...
}

 

总结

1)Eureka client启动定时任务作为心跳服务即续约服务,每30秒发送心跳http put请求给服务端;
2)Eureka server接收到客户端的请求会校验dirty时间戳(其实这个应该是为了避免时间钟回拨等问题造成时间不一致),之后会将服务端的状态进行覆盖重写进而更新。而服务端在启动的时候同时也有剔除任务,在客户端没有定时续约超过LEASE_EXPIRATION_DURATION_SECONDS = 90秒,则会进行自动剔除下线。
 


 

本文仅供学习!所有权归属原作者。侵删!文章来源: 搬运工来架构

更多文章:

  1. Eureka源码剖析之三:服务拉取
  2. Eureka源码剖析之一:初始化-启动
  3. Eureka源码之二:服务注册
  4. Eureka源码剖析之五:服务下线
  5. 迄今为止最完整的DDD实践
  6. RocketMQ消息回溯实践与解析
  7. 殷浩详解DDD 第四讲:领域层设计规范
  8. 殷浩详解DDD系列 第一讲 - Domain Primitive
  9. JVM 内存大对象监控和优化实践
  10. 全链路压测之影子库及ShardingSphere实现影子库源码剖析
标签: 原创 开源框架 源码 eureka 服务续约 注册中心
最后更新:2023-02-25

Cocodroid

专注Java后端,分享技术。

打赏 点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复

广告
最新 热点 推荐
最新 热点 推荐
Anthropic Code with Claude 开发者大会:开启 AI Agent 新时代 视频笔记-微服务架构P4:必懂5种设计模式 视频笔记:微服务架构P4 设计模式:每服务数据库、API 网关和事件驱动架构 干货 | 论Elasticsearch数据建模的重要性 马蜂窝消息总线——面向业务的消息服务设计 基于 MySQL Binlog 实现可配置的异构数据同步 视频笔记:Google发布Agent2Agent协议 视频笔记:什么是微服务,为什么是微服务?
基于 MySQL Binlog 实现可配置的异构数据同步马蜂窝消息总线——面向业务的消息服务设计视频笔记:微服务架构P4 设计模式:每服务数据库、API 网关和事件驱动架构干货 | 论Elasticsearch数据建模的重要性视频笔记-微服务架构P4:必懂5种设计模式Anthropic Code with Claude 开发者大会:开启 AI Agent 新时代
Cache——对于缓存你应该知道的都在这张图里 高效开发与设计:提效Spring应用的运行效率和生产力 JVM 内存分析工具 MAT 的深度讲解与实践——高阶篇 《解构领域驱动设计》读书笔记 ParNew+CMS 实践案例 : HiveMetastore FullGC诊断优化 RocketMQ 很慢?引出了一个未解之谜 Redis为什么这么快? Eureka源码剖析之七:架构&面试题【总结】

CRUD (1) Event Sourcing (1) graphql (1) id (1) NoSQL (1) quarkus (1) rest (1) RocketMQ (2) Spring Boot (1) zk (1) zookeeper (1) 上下文 (1) 事务消息 (1) 二级缓存 (1) 值对象 (1) 关系数据库 (1) 分布式缓存 (1) 原子性 (1) 唯一ID (1) 商品 (1) 多对多 (1) 子域 (1) 字符集 (1) 客户端心跳 (1) 幂等 (2) 干货 (1) 并发 (1) 应用场景 (1) 应用架构图 (1) 康威定律 (2) 异步复制 (1) 微服务架构 (3) 总体方案 (1) 技术方案 (2) 技术架构 (2) 技术架构图 (1) 技能 (1) 持续集成 (1) 支撑域 (1) 故障恢复 (1) 数据架构图 (1) 方案选型 (1) 日记 (1) 服务发现 (1) 服务治理 (1) 服务注册 (2) 机房 (1) 核心域 (1) 泄漏 (1) 洋葱架构 (1) 消息队列 (5) 源码剖析 (1) 灰度发布 (1) 熔断 (1) 生态 (1) 画图工具 (1) 研发团队 (1) 线程 (2) 组织架构 (1) 缓存架构 (1) 编码 (1) 视频 (20) 读写分离 (1) 贵州 (1) 软件设计 (1) 迁移 (1) 通用域 (1) 集群化 (1) 雪花算法 (1) 顺序消息 (1)

推荐链接🔗
  • AI工具集
  • 工具箱🛠️

COPYRIGHT © 2014-2025 verysu.com . ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

粤ICP备15033072号-2