热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringCloudEureka源码分析之三级缓存的设计原理及源码分析

Spring Cloud Eureka源码分析之三级缓存的设计原理及源码分析

Eureka Server 为了提供响应效率,提供了两层的缓存结构,将 Eureka Client 所需要的注册信息,直接存储在缓存结构中,实现原理如下图所示。

Spring Cloud Eureka源码分析之三级缓存的设计原理及源码分析

第一层缓存:readOnlyCacheMap,本质上是 ConcurrentHashMap,依赖定时从 readWriteCacheMap 同步数据,默认时间为 30 秒。

readOnlyCacheMap : 是一个 CurrentHashMap 只读缓存,这个主要是为了供客户端获取注册信息时使用,其缓存更新,依赖于定时器的更新,通过和 readWriteCacheMap 的值做对比,如果数据不一致,则以 readWriteCacheMap 的数据为准。

第二层缓存:readWriteCacheMap,本质上是 Guava 缓存。

readWriteCacheMap:readWriteCacheMap 的数据主要同步于存储层。当获取缓存时判断缓存中是否没有数据,如果不存在此数据,则通过 CacheLoader 的 load 方法去加载,加载成功之后将数据放入缓存,同时返回数据。

readWriteCacheMap 缓存过期时间,默认为 180 秒,当服务下线、过期、注册、状态变更,都会来清除此缓存中的数据。

Eureka Client 获取全量或者增量的数据时,会先从一级缓存中获取;如果一级缓存中不存在,再从二级缓存中获取;如果二级缓存也不存在,这时候先将存储层的数据同步到缓存中,再从缓存中获取。

通过 Eureka Server 的二层缓存机制,可以非常有效地提升 Eureka Server 的响应时间,通过数据存储层和缓存层的数据切割,根据使用场景来提供不同的数据支持。

多级缓存的意义

这里为什么要设计多级缓存呢?原因很简单,就是当存在大规模的服务注册和更新时,如果只是修改一个ConcurrentHashMap数据,那么势必因为锁的存在导致竞争,影响性能。

而Eureka又是AP模型,只需要满足最终可用就行。所以它在这里用到多级缓存来实现读写分离。注册方法写的时候直接写内存注册表,写完表之后主动失效读写缓存。

获取注册信息接口先从只读缓存取,只读缓存没有再去读写缓存取,读写缓存没有再去内存注册表里取(不只是取,此处较复杂)。并且,读写缓存会更新回写只读缓存

  • responseCacheUpdateIntervalMs : readOnlyCacheMap 缓存更新的定时器时间间隔,默认为30秒
  • responseCacheAutoExpirationInSeconds : readWriteCacheMap 缓存过期时间,默认为 180 秒 。

缓存初始化

readWriteCacheMap使用的是LoadingCache对象,它是guava中提供的用来实现内存缓存的一个api。创建方式如下

LoadingCache cache = CacheBuilder.newBuilder()
    //缓存池大小,在缓存项接近该大小时, Guava开始回收旧的缓存项
    .maximumSize(10000)
    //设置时间对象没有被读/写访问则对象从内存中删除(在另外的线程里面不定期维护)
    .expireAfterAccess(10, TimeUnit.MINUTES)
    //移除监听器,缓存项被移除时会触发
    .removalListener(new RemovalListener () {
        @Override
        public void onRemoval(RemovalNotification rn) {
            //执行逻辑操作
        }
    })
    .recordStats()//开启Guava Cache的统计功能
    .build(new CacheLoader() {
        @Override
        public Object load(String key) {
            //从 SQL或者NoSql 获取对象
        }
    });//CacheLoader类 实现自动加载

其中,CacheLoader是用来实现缓存自动加载的功能,当触发readWriteCacheMap.get(key)方法时,就会回调CacheLoader.load方法,根据key去服务注册信息中去查找实例数据进行缓存

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    this.serverCOnfig= serverConfig;
    this.serverCodecs = serverCodecs;
    this.shouldUseReadOnlyResponseCache= serverConfig.shouldUseReadOnlyResponseCache();
    this.registry = registry;

    long respOnseCacheUpdateIntervalMs= serverConfig.getResponseCacheUpdateIntervalMs();
    this.readWriteCacheMap =
        CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
        .removalListener(new RemovalListener() {
            @Override
            public void onRemoval(RemovalNotification notification) {
                Key removedKey = notification.getKey();
                if (removedKey.hasRegions()) {
                    Key clOneWithNoRegions= removedKey.cloneWithoutRegions();
                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                }
            }
        })
        .build(new CacheLoader() {
            @Override
            public Value load(Key key) throws Exception {
                if (key.hasRegions()) {
                    Key clOneWithNoRegions= key.cloneWithoutRegions();
                    regionSpecificKeys.put(cloneWithNoRegions, key);
                }
                Value value = generatePayload(key);  //注意这里
                return value;
            }
        });

而缓存的加载,是基于generatePayload方法完成的,代码如下。

private Value generatePayload(Key key) {
    Stopwatch tracer = null;
    try {
        String payload;
        switch (key.getEntityType()) {
            case Application:
                boolean isRemoteRegiOnRequested= key.hasRegions();

                if (ALL_APPS.equals(key.getName())) {
                    if (isRemoteRegionRequested) {
                        tracer = serializeAllAppsWithRemoteRegionTimer.start();
                        payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeAllAppsTimer.start();
                        payload = getPayLoad(key, registry.getApplications());
                    }
                } else if (ALL_APPS_DELTA.equals(key.getName())) {
                    if (isRemoteRegionRequested) {
                        tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                        versionDeltaWithRegions.incrementAndGet();
                        versionDeltaWithRegionsLegacy.incrementAndGet();
                        payload = getPayLoad(key,
                                             registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeDeltaAppsTimer.start();
                        versionDelta.incrementAndGet();
                        versionDeltaLegacy.incrementAndGet();
                        payload = getPayLoad(key, registry.getApplicationDeltas());
                    }
                } else {
                    tracer = serializeOneApptimer.start();
                    payload = getPayLoad(key, registry.getApplication(key.getName()));
                }
                break;
            case VIP:
            case SVIP:
                tracer = serializeViptimer.start();
                payload = getPayLoad(key, getApplicationsForVip(key, registry));
                break;
            default:
                logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
                payload = "";
                break;
        }
        return new Value(payload);
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
}

此方法接受一个 Key 类型的参数,返回一个 Value 类型。 其中 Key 中重要的字段有:

  • KeyType ,表示payload文本格式,有 JSON 和 XML 两种值。
  • EntityType ,表示缓存的类型,有 Application , VIP , SVIP 三种值。
  • entityName ,表示缓存的名称,可能是单个应用名,也可能是 ALL_APPSALL_APPS_DELTA

Value 则有一个 String 类型的payload和一个 byte 数组,表示gzip压缩后的字节。

缓存同步

ResponseCacheImpl这个类的构造实现中,初始化了一个定时任务,这个定时任务每个

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    //省略...
    if (shouldUseReadOnlyResponseCache) {
        timer.schedule(getCacheUpdateTask(),
                       new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                                + responseCacheUpdateIntervalMs),
                       responseCacheUpdateIntervalMs);
    }
}

默认每30s从readWriteCacheMap更新有差异的数据同步到readOnlyCacheMap中

private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");
            for (Key key : readOnlyCacheMap.keySet()) { //遍历只读集合
                if (logger.isDebugEnabled()) {
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                 key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    if (cacheValue != currentCacheValue) { //判断差异信息,如果有差异,则更新
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                } finally {
                    CurrentRequestVersion.remove();
                }
            }
        }
    };
}

缓存失效

在AbstractInstanceRegistry.register这个方法中,当完成服务信息保存后,会调用invalidateCache失效缓存

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    //....
     invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    //....
}

最终调用ResponseCacheImpl.invalidate方法,完成缓存的失效机制

public void invalidate(Key... keys) {
    for (Key key : keys) {
        logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                     key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());

        readWriteCacheMap.invalidate(key);
        Collection keysWithRegiOns= regionSpecificKeys.get(key);
        if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
            for (Key keysWithRegion : keysWithRegions) {
                logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                             key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
                readWriteCacheMap.invalidate(keysWithRegion);
            }
        }
    }
}

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Mic带你学架构
如果本篇文章对您有帮助,还请帮忙点个关注和赞,您的坚持是我不断创作的动力。欢迎关注「跟着Mic学架构」公众号公众号获取更多技术干货!

Spring Cloud Eureka源码分析之三级缓存的设计原理及源码分析


推荐阅读
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • Java在运行已编译完成的类时,是通过java虚拟机来装载和执行的,java虚拟机通过操作系统命令JAVA_HOMEbinjava–option来启 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 模板引擎StringTemplate的使用方法和特点
    本文介绍了模板引擎StringTemplate的使用方法和特点,包括强制Model和View的分离、Lazy-Evaluation、Recursive enable等。同时,还介绍了StringTemplate语法中的属性和普通字符的使用方法,并提供了向模板填充属性的示例代码。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 图像因存在错误而无法显示 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
author-avatar
荣媛厉4
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有