之前写过一篇《乘风破浪,遇见云原生(Cloud Native)之Docker Desktop for Windows 运行MYSQL多实例并实现主从(Master-Slave)部署》,实现了MYSQL主从多实例部署,基于它我们来写一写怎么在Entity Framework Core的配合下实现读写分离,我们通过MediatR来实现CQRS架构设计。
某车企开展引荐活动送积分,需要提供一个服务对引荐信息进行管理,通过API接口提供引荐信息的管理能力。
简单架构示意图
MediatR
EntityFrameworkCore
Swashbuckle
MySqlConnector
Newtonsoft.Json
https://github.com/TaylorShi/HelloEfCoreMasterSlave
dotnet new sln -o HelloEfCoreMasterSlave
这里我们将采用面向领域驱动设计(DDD
)的模式,先将解决方案中项目完成分组:
0.Shared
共享项目,定义业务无关的基础代码和接口定义1.Infrastructure
基础层,定义仓储、Context2.Domain
领域层,定义领域模式和领域事件3.Application
应用层,定义命令和处理程序,协调调度任务4.Api
应用入口,定义API终结点、验证5.Test
应用测试,定义API终结点、验证这里面放一些公共的代码,比如全局的已知异常定义IKnowException
和实现类KnowException
。
领域抽象项目,这里定义包括:
IAggregateRoot
聚合根接口IEntity
、IEntity
实体接口Entity
、Entity
实体抽象类IDomainEvent
领域事件接口,继承自MediatR.INotification
IDomainEventHandler
领域事件处理程序,继承自INotificationHandler
ValueObject
值对象依赖包
dotnet add package MediatR.Extensions.Microsoft.DependencyInjection --version 8.0.0
MediatR.Extensions.Microsoft.DependencyInjection其内部包括
基础层核心项目,这里分组包括
Behaviors
行为管理Contexts
上下文管理Extensions
扩展管理Repositorys
仓储管理Transactions
事务管理行为管理组包括
TransactionBehavior
事务行为管理类,继承自IPipelineBehavior
,用于命令执行前后添加事务策略。上下文管理组包括
EFContext
Entity Framework Core上下文扩展管理组包括
GenericTypeExtensions
通用类型扩展DomainEventExtension
领域事务扩展QueryableExtensions
LINQ查询扩展仓储管理组包括
IRepository
实体接口,继承自实体抽象类Entity
和聚合根接口IAggregateRoot
Repository
实体抽象类,继承自实体接口IRepository
、实体抽象类Entity
和聚合根接口IAggregateRoot
事务管理组包括
ITransaction
事务管理接口IUnitOfWork
工作单元接口依赖包
dotnet add package Microsoft.EntityFrameworkCore.Relational --version 3.1.0
Microsoft.EntityFrameworkCore.Relational其内部包括
依赖项目
Framework.Domain.Abstractions
基础层项目,这里分组包括
Contexts
上下文管理EntityConfigurations
实体配置Repositories
实体仓储上下文管理组包括
ReferralContextTransactionBehavior
领域事务行为管理类ReferralMasterContext
业务MasterContext,继承自Entity Framework Core上下文EFContext
,代表MYSQL主实例的ContextReferralSlaveContext
业务SlaveContext,继承自Entity Framework Core上下文DbContext
,代表MYSQL从实例的Context实体配置组包括
ReferralCodeEntityTypeConfiguration
业务领域模型和实体类型配置类,继承自Entity Framework Core实体配置接口IEntityTypeConfiguration
实体仓储组包括
IReferralCodeRepository
业务领域仓储接口,继承自实体接口IRepository
ReferralCodeRepository
业务领域仓储类,继承自实体抽象类Repository
和业务领域仓储接口IReferralCodeRepository
依赖项目
Framework.Infrastructure.Core
Referral.Domain
领域层项目,这里分组包括
Events
领域事件Aggregates
领域模型领域模型组包括
ReferralCode
业务领域模型,继承自实体抽象类Entity
和聚合根接口IAggregateRoot
依赖项目
Framework.Domain.Abstractions
应用层项目,这里分组包括
Commands
命令和处理DomainEventHandlers
领域事件处理IntegrationEvents
集成事件定义Queries
查询和处理Extensions
服务扩展命令和处理组包括
CreateReferralCommand
创建引荐命令定义,继承自MediatR命令请求接口IRequest
CreateReferralCommandHandler
创建引荐命令处理,继承自MediatR命令处理接口IRequestHandler
DeleteReferralCommand
删除引荐命令定义,继承自MediatR命令请求接口IRequest
DeleteReferralCommandHandler
删除引荐命令处理,继承自MediatR命令处理接口IRequestHandler
ModifyReferralCommand
修改引荐命令定义,继承自MediatR命令请求接口IRequest
ModifyReferralCommandHandler
修改引荐命令处理,继承自MediatR命令处理接口IRequestHandler
查询和处理组包括
QueryReferralCommand
查询引荐命令定义,继承自MediatR命令请求接口IRequest
QueryReferralCommandHandler
查询引荐命令处理,继承自MediatR命令处理接口IRequestHandler
服务扩展组包括
CommandHandlerExtensions
命令处理扩展EFContextExtensions
EF上下文扩展IntegrationEventsExtensions
集成事件扩展RepositoryExtensions
仓储服务扩展依赖包
dotnet add package Pomelo.EntityFrameworkCore.MySql --version 3.1.0
Pomelo.EntityFrameworkCore.MySql其内部包括
Microsoft.EntityFrameworkCore.Relational
Microsoft.EntityFrameworkCore
MySqlConnector
Pomelo.JsonObject
Newtonsoft.Json
依赖项目
Referral.Infrastructure
应用入口项目,这里分组包括
Controllers
API终结点Extensions
扩展API终结点组包括
ReferralController
业务服务终结点扩展组包括
ApplicationUseExtensions
应用启用扩展RoutingEndpointExtensions
路由和终结点扩展依赖项目
Referral.Application
依赖包
dotnet add package Microsoft.AspNetCore.Mvc.Versioning.ApiExplorer --version 5.0.0
Microsoft.AspNetCore.Mvc.Versioning.ApiExplorer其内部包括
Microsoft.AspNetCore.Mvc.Versioning
dotnet add package Swashbuckle.AspNetCore --version 6.4.0
Swashbuckle.AspNetCore其内部包括
Microsoft.Extensions.ApiDescription.Server
Swashbuckle.AspNetCore.Swagger
Swashbuckle.AspNetCore.SwaggerGen
Swashbuckle.AspNetCore.SwaggerUI
Microsoft.OpenApi
在Referral.Infrastructure
中,我们构建了两个业务Context,每一个Context会对应一个MYSQL的ConnectionString
。
我们首先需要将Master和Slave两个节点的连接字符串在appsettings.json
中配置出来。
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*",
"MYSQL-Master": "server=localhost;port=16000;user=root;password=xxxxxxxxxxxxxxx;database=xxxx;charset=utf8mb4;COnnectionReset=false;",
"MYSQL-Slave": "server=localhost;port=17001;user=root;password=xxxxxxxxxxxxxxx;database=xxxx;charset=utf8mb4;COnnectionReset=false;"
}
注意,配置节点名称分别是MYSQL-Master
和MYSQL-Slave
,它们的端口是不一样的。
接下来,在Startup.cs
的ConfigureServices
中添加MYSQL集群上下文服务AddMySqlClusterContext
public void ConfigureServices(IServiceCollection services)
{
// 添加MYSQL集群上下文服务
services.AddMySqlClusterContext(Configuration.GetValue
}
位于EF上下文扩展EFContextExtensions
中的AddMySqlClusterContext
定义
///
/// EF上下文扩展
///
public static class EFContextExtensions
{
///
/// 添加MYSQL集群上下文服务
///
///
///
///
///
public static IServiceCollection AddMySqlClusterContext(this IServiceCollection services, string masterConnectionString, string slaveConnectionString)
{
// 添加引荐MasterContext
services.AddDbContext
{
optionsAction.UseMySql(masterConnectionString);
});
// 添加引荐SlaveContext
services.AddDbContext
{
optionsAction.UseMySql(slaveConnectionString);
});
return services;
}
}
这个我们就分开注册了两个不同的MYSQL实例,其中一个Master用于写,另外一个Slave用于读。
在Referral.Api
,我们定义了一个业务终结点ReferralController
///
/// 引荐服务
///
[ApiVersion("1.0")]
[Route("api/v{version:ApiVersion}/[controller]/[action]")]
[ApiController]
public class ReferralController : ControllerBase
{
readonly IMediator _mediator;
///
/// 构造函数
///
///
public ReferralController(IMediator mediator)
{
_mediator = mediator;
}
///
/// 创建引荐
///
///
///
[HttpPost]
public async Task
{
// 发送创建引荐的命令
return Ok(await _mediator.Send(cmd, HttpContext.RequestAborted));
}
///
/// 修改引荐
///
///
///
[HttpPost]
public async Task
{
// 发送修改引荐的命令
return Ok(await _mediator.Send(cmd, HttpContext.RequestAborted));
}
///
/// 删除引荐
///
///
///
[HttpPost]
public async Task
{
// 发送修改引荐的命令
return Ok(await _mediator.Send(cmd, HttpContext.RequestAborted));
}
///
/// 查询引荐
///
///
///
[HttpGet]
public async Task
{
// 发送查询引荐的命令
return Ok(await _mediator.Send(cmd, HttpContext.RequestAborted));
}
}
这里全部通过MediatR
将来自前端的请求通过命令的方式发送出去,等待命令被处理之后,再将结果返回给调用者,实现CQRS模式。
这个案例中,我们仅设计了一个引荐代码的领域模型ReferralCode
///
/// 引荐代码领域模型
///
public class ReferralCode : Entity
{
///
/// 引荐名称
///
public string Name { get; private set; }
///
/// 引荐代码
///
public string Code { get; private set; }
///
/// 构造函数
///
///
///
public ReferralCode(string name, string code)
{
Name = name;
Code = code;
}
///
/// 修改
///
///
///
public void Modify(string name, string code)
{
Name = name;
Code = code;
}
}
基于封闭原则,所有的Set
都被设置为Private
,创建通过构造函数来进行,修改通过独立的Modify
进行。
在Referral.Infrastructure
中关于领域模型和实体的映射关系,我们是这样的设计的
///
/// 引荐代码领域模型和实体类型配置类
///
internal class ReferralCodeEntityTypeConfiguration : IEntityTypeConfiguration
{
public void Configure(EntityTypeBuilder
{
builder.HasKey(p => p.Id);
builder.ToTable("referralcode");
builder.Property(p => p.Name).HasMaxLength(120);
builder.Property(p => p.Code).HasMaxLength(200);
}
}
创建引荐命令定义
///
/// 创建引荐命令定义
///
public class CreateReferralCommand : IRequest
{
///
/// 引荐名称
///
public string Name { get; set; }
///
/// 引荐代码
///
public string Code { get; set; }
}
创建引荐命令处理
///
/// 创建引荐命令处理
///
internal class CreateReferralCommandHandler : IRequestHandler
{
///
/// 引荐代码仓储
///
readonly IReferralCodeRepository _referralCodeRepository;
///
/// 构造函数
///
///
public CreateReferralCommandHandler(IReferralCodeRepository referralCodeRepository)
{
_referralCodeRepository = referralCodeRepository;
}
///
/// 处理程序
///
///
///
///
public async Task
{
var referralCode = new ReferralCode(request.Name, request.Code);
await _referralCodeRepository.AddAsync(referralCode, cancellationToken);
return true;
}
}
这里从容器中获取业务仓储实例_referralCodeRepository
,通过它的AddAsync
方法实现添加动作。
删除引荐命令定义
///
/// 删除引荐命令定义
///
public class DeleteReferralCommand : IRequest
{
///
/// 引荐ID
///
public int Id { get; set; }
}
删除引荐命令处理
///
/// 删除引荐命令处理
///
internal class DeleteReferralCommandHandler : IRequestHandler
{
///
/// 引荐代码仓储
///
readonly IReferralCodeRepository _referralCodeRepository;
///
/// 构造函数
///
///
public DeleteReferralCommandHandler(IReferralCodeRepository referralCodeRepository)
{
_referralCodeRepository = referralCodeRepository;
}
///
/// 处理程序
///
///
///
///
public async Task
{
return await _referralCodeRepository.DeleteAsync(request.Id);
}
}
这里从容器中获取业务仓储实例_referralCodeRepository
,通过它的DeleteAsync
方法实现删除动作。
修改引荐命令定义
///
/// 修改引荐命令定义
///
public class ModifyReferralCommand : IRequest
{
///
/// 引荐ID
///
public int Id { get; set; }
///
/// 引荐名称
///
public string Name { get; set; }
///
/// 引荐代码
///
public string Code { get; set; }
}
修改引荐命令处理
///
/// 修改引荐命令处理
///
internal class ModifyReferralCommandHandler : IRequestHandler
{
///
/// 引荐代码仓储
///
readonly IReferralCodeRepository _referralCodeRepository;
///
/// 构造函数
///
///
public ModifyReferralCommandHandler(IReferralCodeRepository referralCodeRepository)
{
_referralCodeRepository = referralCodeRepository;
}
///
/// 处理程序
///
///
///
///
public async Task
{
var referralCode = await _referralCodeRepository.GetAsync(request.Id, cancellationToken);
if (referralCode != null)
{
referralCode.Modify(request.Name, request.Code);
await _referralCodeRepository.UpdateAsync(referralCode);
return true;
}
return false;
}
}
这里从容器中获取业务仓储实例_referralCodeRepository
,先通过GetAsync
查询要修改的数据是否存在,如果存在那么通过UpdateAsync
更新它。
查询引荐命令定义
///
/// 查询引荐命令定义
///
public class QueryReferralCommand : IRequest>
{
///
/// 引荐ID
///
public int Id { get; set; }
///
/// 引荐名称
///
public string Name { get; set; }
///
/// 引荐代码
///
public string Code { get; set; }
}
查询引荐命令处理
///
/// 查询引荐命令处理
///
public class QueryReferralCommandHandler : IRequestHandler
{
readonly ReferralSlaveContext _referralSlaveContext;
///
/// 构造函数
///
///
public QueryReferralCommandHandler(ReferralSlaveContext referralSlaveContext)
{
_referralSlaveCOntext= referralSlaveContext;
}
///
/// 处理程序
///
///
///
///
public async Task> Handle(QueryReferralCommand request, CancellationToken cancellationToken)
{
IQueryable
if (request.Id > 0)
{
query = query.Where(x => x.Id == request.Id);
}
if (!string.IsNullOrEmpty(request.Name))
{
query = query.Where(x => x.Name == request.Name);
}
if (!string.IsNullOrEmpty(request.Code))
{
query = query.Where(x => x.Code == request.Code);
}
return await query.ToListAsync();
}
}
这里从容器中获取SlaveContext实例_referralSlaveContext
,通过判断查询入参的条件来拼接IQueryable
,最后通过ToListAsync
获取筛选结果。
但是上面这种写法有点啰嗦,我们引入一个LINQ查询扩展QueryableExtensions
以便优化它。
///
/// LINQ查询扩展
///
public static class QueryableExtensions
{
///
/// 分页查询
///
///
///
///
///
///
///
public static IQueryable
{
if (query == null)
{
throw new ArgumentNullException("query");
}
return query.Skip(skipCount).Take(maxResultCount);
}
///
/// 根据If条件筛选
///
///
///
///
///
///
public static IQueryable
{
return condition
? query.Where(predicate)
: query;
}
///
/// 根据If条件筛选
///
///
///
///
///
///
public static IQueryable
{
return condition
? query.Where(predicate)
: query;
}
}
最终我们可以将前面的查询优化为如下的写法
///
/// 处理程序
///
///
///
///
public async Task> Handle(QueryReferralCommand request, CancellationToken cancellationToken)
{
IQueryable
.WhereIf(request.Id > 0, x => x.Id == request.Id)
.WhereIf(!string.IsNullOrEmpty(request.Name), x => x.Name == request.Name)
.WhereIf(!string.IsNullOrEmpty(request.Code), x => x.Code == request.Code);
return await query.ToListAsync();
}
通过MediatR
实现命令和查询分离的同时,其实我们还做了一个自动事务的设计,它有个类似中间件的逻辑,我们在Referral.Application
中定义了一个命令处理扩展CommandHandlerExtensions
,我们看下它的定义
///
/// 命令处理扩展
///
public static class CommandHandlerExtensions
{
///
/// 添加命令处理服务
///
///
///
public static IServiceCollection AddCommandHandlers(this IServiceCollection services)
{
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(ReferralContextTransactionBehavior<,>));
return services.AddMediatR(typeof(ReferralCode).Assembly, typeof(CreateReferralCommand).Assembly);
}
}
这里将ReferralContextTransactionBehavior<,>
注册为IPipelineBehavior<,>
的实现。它本质是事务行为管理类TransactionBehavior
的实现。
它的处理核心逻辑是
///
/// 处理程序
///
///
///
///
///
public async Task
{
var respOnse= default(TResponse);
var typeName = request.GetGenericTypeName();
try
{
// 如果当前开启了事务,那么就继续后面的动作
if (_dbContext.HasActiveTransaction)
{
return await next();
}
var strategy = _dbContext.Database.CreateExecutionStrategy();
await strategy.ExecuteAsync(async () =>
{
Guid transactionId;
using (var transaction = await _dbContext.BeginTransactionAsync())
using (_logger.BeginScope("TransactionContext:{TransactionId}", transaction.TransactionId))
{
_logger.LogInformation("----- 开始事务 {TransactionId} ({@Command})", transaction.TransactionId, typeName, request);
respOnse= await next();
_logger.LogInformation("----- 提交事务 {TransactionId} {CommandName}", transaction.TransactionId, typeName);
await _dbContext.CommitTransactionAsync(transaction);
transactiOnId= transaction.TransactionId;
}
});
return response;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理事务出错 {CommandName} ({@Command})", typeName, request);
throw;
}
}
它的逻辑是在执行命令的处理程序之前先判断上下文是否开启事务,如果没有开启事务就创建一个事务,再来执行命令处理的逻辑,处理完毕之后,再来提交这个事务。
IQueryable
,揭开表达式树的神秘面纱