热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

enode框架2.0stepbystep之整体架构介绍

前言今天是个开心的日子,又是周末,可以轻轻松松的写写文章了。去年,我写了enode1.0版本,那时我也写了一个分析系列。经过了大半年的时间,我对第一个版本做了很多架构上的改进,

前言


今天是个开心的日子,又是周末,可以轻轻松松的写写文章了。去年,我写了enode 1.0版本,那时我也写了一个分析系列。经过了大半年的时间,我对第一个版本做了很多架构上的改进,最重要的就是让enode实现了分布式,通过新增一个分布式消息队列equeue来实现。之所以要设计一个分布式的消息队列是因为在enode
1.0版本中,某个特定的消息队列只能被某个特定的消费者消费。这样就会导致一个问题,就是如果这个消费者挂了,那这个消费者对应的消息队列就不能自动被其他消费者消费了。这个问题会直接导致系统不可用。而enode
2.0中,就不会有这个问题了,因为消息队列被设计为独立的,被消费者所共享的;一个消息队列可以被多个消费者集群消费或广播消费,如果一个消费者挂了,那其他的消费者会自动顶上。这里具体的细节,我会在后面详细介绍。


enode框架简介



  1. 框架名称:enode

  2. 框架特色:DDD+CQRS + EDA + Event Sourcing + In Memory

  3. 设计目标:让程序员只关注业务代码、高性能、分布式、可水平扩展

  4. 开源地址:https://github.com/tangxuehua/enode

  5. 基于enode实现的一个银行转账的例子的地址:https://github.com/tangxuehua/banktransfersample

  6. nuget包Id:enode

  7. 一个独立的分布式消息队列equeue,可以为enode提供command,domain event的发布和订阅:https://github.com/tangxuehua/equeue

enode架构图


bubuko.com,布布扣


熟悉CQRS架构的人看到这图应该就再熟悉不过了,enode实现的是一个CQRS架构。基本的概念就不多介绍了,如果大家对上图中的一些概念还不太清楚,可以看一下我的博客里的其他相关文章,我应该都有写到。下面主要介绍一下enode
2.0在实现CQRS架构时的一些不一样的地方(由于篇幅的限制,先说三点吧):


command handler一次只处理一个command


就是你不能在command
handler中一次修改多个聚合根,我觉得这应该是enode对开发人员的最大约束,可能也是最让开发人员觉得不爽的地方。但我觉得这个不是约束,而是对数据强一致性和最终一致性的一个正确认识。 >在我学过ddd+cqrs+event
sourcing这三个东西之后,我认识到,聚合内必须确保强一致性,聚合间最终一致性。
传统三层开发,我们通过unit of
work模式(简称uow,比如nhibernate的session, entity
framework的dbcontext)可以轻易实现多个对象修改的强一致性事务;确实在传统三层模式开发中,这种利用uow的方式来实现跨聚合的强一致性事务的方式很实用,开发起来很方便,开发人员可以不必担心会出现数据不一致的问题了,因为所有修改总是在一个事务内保存。


但enode的设计目标不是为了支持传统三层开发,而是面向ddd+cqrs+eda+event
sourcing架构的框架。曾经我也想让command
handler支持修改多个聚合根,但这样做必须要面临一个很棘手的问题:command在发送到command
queue时,无法根据聚合根ID来路由了。因为一个command会修改多个聚合根,也就是说一个command不会和一个聚合根一一对应了。这意味着同一个聚合根没办法总是被路由到同一个command
queue里,这样就导致相同ID的聚合根可能会在两台服务器被同时修改,这就会导致整个系统可能会频繁的产生并发更新冲突。很多command就会不断的重试,整个系统的性能就会下降。而enode设计之初就是为了高性能,所以这点让我觉得很难接收。


相反,如果一个command总是只会创建或修改一个聚合根,那我们的command就能根据聚合根ID来路由到特定的消息队列,同一个聚合根ID总是会被路由到同一个queue,而一个queue的消费者服务器(command
handler所在的服务器)同一时刻总是只有一个,那我们就能保证一个聚合根的修改不会有并发问题。当然光这样还不够,在这个command消费者服务器里,enode框架会用内存级别的queue对同一个聚合根的所有command再次进行排队(如果需要排队的话),之所以要这样是因为有时对一个聚合根的并发修改command可能1s内发送了很多过来,所以command
handler肯定来不及在1s内全部处理掉这些command,所以需要在内存里再次排队(天猫双十一的时候,应用服务器内部也会有类似的对同一个聚合根设计一个相应的内存queue来避免对同一个聚合根的修改的并发冲突的问题)。通过这样的设计,我们可以做到绝大部分情况下,不会再有并发冲突的问题,也就是command不会再出现重试的情况。 >这样最后的效果就是:不同ID的聚合根的处理可以并行,同一个ID的聚合根的处理是串行,通过两级排队实现。前面说到,这样只能做到绝大部分情况下不会有并发冲突,那么什么时候还是会有并发冲突呢?就是在新增command消费者服务器的时候,比如我们发现最近系统繁忙,我们希望增加command消费者服务器来加快command的处理,那在新增服务器后,原来修改某个聚合根的command可能会被路由到新的服务器,但是这个聚合根的有些command可能还在原来的服务器上还没执行完,此时就会出现同一个聚合根在两台服务器上被同时修改的可能了;那这个怎么解决呢?我现在的想法是框架层面不必解决了,我们只需要在系统最空的时候(比如凌晨4点)的时候,增加服务器即可,因为那个时候消息队列里的消息是最少的,也就是不太可能会产生因为增加command
handler服务器而导致并发冲突的问题,这样我们就可以最大限度的避免可能带来的并发冲突。


让domain生活在in memory中


相比一般的CQRS架构,enode每次在处理一个command,在获取聚合根时,不是从eventstore获取,而是从缓存获取。从上面的架构图可以看出,enode架构中有一个domain
memory
cache,目前用redis实现。这样做的好处是,将所有的聚合根都缓存在redis缓存中,这样就能提高聚合根的读取时间;有一个问题需要考虑,redis缓存服务器宕机了怎么办?宕机后缓存数据就没了,那如何恢复这些缓存数据呢?这也是我选择redis的一个主要理由,因为redis支持持久化,我们可以利用redis的aof或快照方式的持久化功能,来持久化缓存数据。从而可以在redis挂了后能最快的速度恢复缓存,重启redis服务器即可。那重启之前以及重启的过程中,因为无法从redis获取聚合根了,那只能从eventstore通过event
sourcing的方式去获取,那样的话性能肯定会比较差,那怎么办呢?答案是通过定时为聚合根创建快照,这也是采用event
sourcing架构的一个好处。我们可以定时对某些聚合跟创建快照(注意,我觉得只需要考虑那些对性能要求很高的模块所涉及到的聚合根创建快照即可),那怎么创建呢?可以开一个独立的进程,监听domain
event,对需要创建快照的domain event做出判断,根据某种快照创建策略进行判断,如果认为需要创建快照,则从event
store拿出该聚合根的相关事件,通过event
sourcing还原得到某个版本的聚合根,这样就得到了某个聚合根的某个版本的快照了。然后持久化起来即可。然后,enode支持在从event
store获取聚合根前,先检查是否有快照,如果有快照,则会先加载快照,再把快照之后的domain event从event
store获取,再把这些快照之后的domain
event一个个apply到当前聚合根,从而得到最新状态的聚合根。这个过程比获取该聚合根的所有领域事件在一个个通过event
sourcing还原得到聚合根要快的多;尤其是在一个聚合根的domain
event比较多的情况下就更有意义。因此,通过缓存的引入,我们可以提高command handler的处理速度。


event store的设计


关于重复的command的幂等处理和聚合根可能存在的并发冲突的判断


另外一点很重要的是,因为我们的command是会发送到分布式消息队列,然后队列中的command消息会被取出来执行;大家知道,我们很难保证一个消息不会被重复执行,也就是说,一个command可能会重复执行。因此,我们的应用要支持对command的密等处理。而对于使用enode框架的应用,因为整个command
side的数据持久化就是持久化domain event,程序员不必关心domain
event的持久化过程。所以enode很有必要能内置支持对command的重复处理的判断。那么如何做呢?我觉得最靠谱的做法是,在持久化domain
event的时候就能绝对靠谱的检测出来某个command是否被重复执行了。那很自然就想到将被持久化的domain
event和产生他的对应command关联起来。所以我设计了如下的结构,用来表示一个command在操作聚合根后所产生的领域事件的信息。



///

The commandId which generate this event stream.
///

public string CommitId { get; private set; }
/// The aggregate root id.
///

public string AggregateRootId { get; private set; }
/// The aggregate root type code.
///

public int AggregateRootTypeCode { get; private set; }
/// The version of the event stream.
///

public int Version { get; private set; }
/// The occurred time of the event stream.
///

public DateTime Timestamp { get; private set; }
/// The domain events of the event stream.
///

public IEnumerable Events { get; private set; }


  • CommitId:就是当前的CommandId;

  • AggregateRootId:当前被操作的聚合根的全局唯一ID;

  • AggregateRootTypeCode:表示聚合根的类型的一个code,通过该code我们可以知道当前记录是哪个类型的聚合根的;

  • Version:一个版本号,表示聚合根产生领域事件后的新版本号,是产生事件前的版本号+1;也就是说,聚合根的版本是每次被修改一次,那Version就加1;

  • Timestamp:一个时间戳,用于记录产生domain event时的时间;

  • Events:表示当前command操作聚合根后所产生的领域事件,一次操作可以产生多个领域事件;

对于上面的结构体,我们可以实现两个重要的功能:1)为AggregateRootId和Version这两个字段建立唯一索引,这样我们就能实现判断某个聚合根是否被并发修改,因为如果有并发修改导致并发冲突,那保存到eventstore时,它们的Version肯定是相同的;2)为AggregateRootId和CommitId两个字段建立唯一索引,这样我们就能判断某个command是否被重复执行,因为一个command被实例化出来后,它所要修改的聚合根ID就不可能再修改了,所以如果该command被重复执行,那最后产生的领域事件(上面这个结构体)最后被持久化到eventstore时就会违反这个唯一索引,从而框架就能知道是否有command被重复执行了;


另外,上面这个结构体被保存到eventstore时,是以一条记录的方式被保存,Events集合会被序列化为一段二进制;所以,假如我们用关系型数据库来保存,那就是只有一条insert语句即可,这样就实现了一个聚合根的一次修改的事务持久化。然后因为上两个索引的存在,我们就能在保存时判断是否有并发冲突或command是否被重复执行。


关于domain event大数据量的考虑


在设计event store时,我考虑了很多。最后认为event
store要解决的最大的两个问题是持久化性能和可水平扩展性。首先,因为每次command
handler在处理完一个聚合根后,都会把产生的领域事件持久化到event
store,没持久化完成则不能认为该command已处理完,所以持久化的性能对处理command的吞吐量至关重要。另外一点就是可水平扩展性,因为event
store里保存的都是domain event,而enode又是为了实现高性能为目标的,所以event
store里的数据肯定会非常多,比如1s中要持久化1K个domain
event,那一天就会有8600W条记录要记录,一天就真么多,那1年就更多了,所以用单点存储所有的domain
event显示不靠谱了。所以我们的event
store必须要支持水平扩展。比如我们可以设计100个分区,那每个分区一天只需要保存86W条记录,一年也只需要保存3亿多条记录即可。之前我很追求单个存储节点的高性能,所以曾经想过要用leveldb,stsdb,甚至redis这种高性能的基于key,value的nosql存储。但后来发现这种nosql存储虽然性能很高,但因为只是key,value的存储结构,所以没办法支持二级索引,这样就没办法实现上面第一点中提到的command的幂等处理和聚合根并发冲突的检测。另一个重要的原因是,event
store中的数据我们有时候是要被查询的。比如现在某个command遇到的并发冲突,那框架需要自动重试,但是重试之前需要先更新redis缓存,就是把eventstore里的最新的聚合根更新到redis缓存里,这样command在重试时才能拿到最新版本的聚合根,这样重试才能成功。那如何从eventstore里拿最新的聚合根呢?只能根据聚合根ID从eventstore里查询。而聚合根ID又不是key,value
nosql的key,自然就没办法实现这个需求了;所以,我觉得合理的办法应该是用关系型数据库来实现eventstore。有人说关系型数据库的性能不行。我觉得只要关系型数据库支持水平扩展,也就是将domain
event sharding(分片)到不同的分库分表中,那平均到每个库里的domain
event的数量就不大了;这样整个eventstore的持久化性能就可以随着分库的数量的增加而线性增加;比如我现在单个db insert
domain event的性能是1K tps(mysql配合ssd硬盘完全无压力,呵呵),那10个库的tps就能达到1W
tps了。因为我们分库会根据聚合根ID的hash code来平均散列,这样能确保每个库中的聚合根的domain
event数量是基本一样的;从而就能实现整个event
store的持久化性能随着分库的增加而线性增加。所以,有了分库的优势,大数据量和性能都不是问题了。且因为关系型数据库支持二级索引和唯一索引,那查询domain
event也不是问题了。


enode物理部署结构图


bubuko.com,布布扣


上图是enode在实际项目中我目前认为的一个物理部署结构图。


首先客户端浏览器通过网络最后访问到我们的web服务器集群,当然web服务器前面肯定还有网关和负载均衡器,我这里为了突出重点就不画出来了。然后每个web服务器接受到httprequest后会生成command,然后通过enode框架发送到分布式消息队列服务器(message
queue server),目前由我开发的equeue实现。然后消息队列服务器上的消息会被推送到command process
servers,command process server就是执行command handler、完成domain logic,持久化domain
event,以及publish domain event的服务器。command process server处理完之后,domain
event会由enode框架自动发送到message queue server,然后会被event process server处理,event
process server就是订阅domain event,然后根据domain event更新query db。对于查询,web
server可以直接通过sql查询query db即可。


各种服务器的集群:



  • web server:无状态,可以任意增加服务器;

  • command process
    server:就是处理业务逻辑的服务器,也是无状态,可以任意增加服务器;但服务器的数目最好和command所对应的topic下的queue的数量保持一致,这点后续在写分布式消息队列equeue的文章时在详细谈吧;

  • redis server:就是缓存聚合根的服务器,属于缓存服务器;可以按需要存储的容量来规划需要开多少台redis
    server;目前我觉得最好的redis动态扩容方法就是pre sharding;

  • event db server:就是存储domain
    event的服务器,按照上的分析,我们采用的是关系型数据库,比如用mysql;mysql的分库分表技术已经很成熟,后续文章我们再详细讨论如何分库以及如何做数据迁移;

  • event process server:就是订阅domain event,根据domain event更新query
    db的服务器;可以根据需要来部署多少台,和command process server类似;这里有一点必须要先提一下,就是在更新query
    db时,因为每次更新都是针对某个domain event来更新query db的,而domain
    event只表示一个聚合根的修改,所以每次我们更新query
    db时,也只更新该聚合根所在范围的表;我们千万不要去更新超过该聚合根范围的表,否则就会产生并发冲突,导致event
    handler执行失败;这样就会是的cqrs的query db同步数据变的很慢。对于query side,如果我们觉得直接从query
    db查询数据太慢,可以考虑设计查询缓存,也就是不走query db来查询数据,而是走缓存。这种缓存就和我们平时的缓存设计类似了;利用domain
    event,我们先天就有优势可以让缓存非常及时的更新,呵呵。因为一旦有domain event过来,我们就能快速更新我们的query
    side缓存,而query db就可以异步更新即可。这样就可以解决query side同步更新数据慢的问题。

  • message queue
    server:就是消息队列服务器,目前equeue还不支持集群,只支持单机;这个以后有时间会考虑实现master-slave模式,类似于淘宝的rocketmq一样。

好了,就写这些吧,后续的再后续文章中补上,呵呵。

enode框架 2.0 step by step之整体架构介绍,布布扣,bubuko.com


推荐阅读
  • 在project.properties添加#Projecttarget.targetandroid-19android.library.reference.1..Sliding ... [详细]
  • 知识图谱——机器大脑中的知识库
    本文介绍了知识图谱在机器大脑中的应用,以及搜索引擎在知识图谱方面的发展。以谷歌知识图谱为例,说明了知识图谱的智能化特点。通过搜索引擎用户可以获取更加智能化的答案,如搜索关键词"Marie Curie",会得到居里夫人的详细信息以及与之相关的历史人物。知识图谱的出现引起了搜索引擎行业的变革,不仅美国的微软必应,中国的百度、搜狗等搜索引擎公司也纷纷推出了自己的知识图谱。 ... [详细]
  • 生成式对抗网络模型综述摘要生成式对抗网络模型(GAN)是基于深度学习的一种强大的生成模型,可以应用于计算机视觉、自然语言处理、半监督学习等重要领域。生成式对抗网络 ... [详细]
  • 基于layUI的图片上传前预览功能的2种实现方式
    本文介绍了基于layUI的图片上传前预览功能的两种实现方式:一种是使用blob+FileReader,另一种是使用layUI自带的参数。通过选择文件后点击文件名,在页面中间弹窗内预览图片。其中,layUI自带的参数实现了图片预览功能。该功能依赖于layUI的上传模块,并使用了blob和FileReader来读取本地文件并获取图像的base64编码。点击文件名时会执行See()函数。摘要长度为169字。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • CentOS 7部署KVM虚拟化环境之一架构介绍
    本文介绍了CentOS 7部署KVM虚拟化环境的架构,详细解释了虚拟化技术的概念和原理,包括全虚拟化和半虚拟化。同时介绍了虚拟机的概念和虚拟化软件的作用。 ... [详细]
  • 本文介绍了一种解析GRE报文长度的方法,通过分析GRE报文头中的标志位来计算报文长度。具体实现步骤包括获取GRE报文头指针、提取标志位、计算报文长度等。该方法可以帮助用户准确地获取GRE报文的长度信息。 ... [详细]
  • GetWindowLong函数
    今天在看一个代码里头写了GetWindowLong(hwnd,0),我当时就有点费解,靠,上网搜索函数原型说明,死活找不到第 ... [详细]
  • 本文讨论了在Windows 8上安装gvim中插件时出现的错误加载问题。作者将EasyMotion插件放在了正确的位置,但加载时却出现了错误。作者提供了下载链接和之前放置插件的位置,并列出了出现的错误信息。 ... [详细]
author-avatar
紫云轻梦lyq
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有