热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MassTransit知多少|基于StateMachine实现Saga编排式分布式事务

原标题:MassTransit知多少|基于StateMachine实现Saga编排式分布式事务什么是状态机状态机作为一种程序开发范例

原标题:MassTransit 知多少 | 基于StateMachine实现Saga编排式分布式事务

什么是状态机

状态机作为一种程序开发范例,在实际的应用开发中有很多的应用场景,其中.NET 中的async/await 的核心底层实现就是基于状态机机制。状态机分为两种:有限状态机和无限状态机,本文介绍的就是有限状态机,有限状态机在任何时候都可以准确地处于有限状态中的一种,其可以根据一些输入从一个状态转换到另一个状态。一个有限状态机是由其状态列表、初始状态和触发每个转换的输入来定义的。如下图展示的就是一个闸机的状态机示意图:

从上图可以看出,状态机主要有以下核心概念:


  1. State:状态,闸机有已开启(opened)和已关闭(closed)状态。

  2. Transition:转移,即闸机从一个状态转移到另一个状态的过程。

  3. Transition Condition:转移条件,也可理解为事件,即闸机在某一状态下只有触发了某个转移条件,才会执行状态转移。比如,闸机处于已关闭状态时,只有接收到开启事件才会执行转移动作,进而转移到开启状态。

  4. Action:动作,即完成状态转移要执行的动作。比如要从关闭状态转移到开启状态,则需要执行开闸动作。

在.NET中,dotnet-state-machine/statelessMassTransit都提供了开箱即用的状态机实现。本文将重点介绍MassTransit中的状态机在Saga 模式中的应用。

MassTransit StateMachine

在MassTransit 中MassTransitStateMachine就是状态机的具体抽象,可以用其编排一系列事件来实现状态的流转,也可以用来实现Saga模式的分布式事务。并支持与EF Core和Dapper集成将状态持久化到关系型数据库,也支持将状态持久化到MongoDB、Redis等数据库。是以简单的下单流程:创建订单->扣减库存->支付订单举例而言,其示意图如下所示。

基于状态机实现编排式Saga事务

那具体如何使用MassTransitStateMachine来应用编排式Saga 模式呢,接下来就来创建解决方案来实现以上下单流程示例。依次创建以下项目,除共享类库项目外,均安装MassTransitMassTransit.RabbitMQNuGet包。





























项目项目名项目类型
订单服务MassTransit.SmDemo.OrderServiceASP.NET Core Web API
库存服务MassTransit.SmDemo.InventoryServiceWorker Service
支付服务MassTransit.SmDemo.PaymentServiceWorker Service
共享类库MassTransit.SmDemo.SharedClass Library

三个服务都添加扩展类MassTransitServiceExtensions,并在Program.cs类中调用services.AddMassTransitWithRabbitMq();注册服务。

using System.Reflection;
using MassTransit.CourierDemo.Shared.Models;
namespace MassTransit.CourierDemo.InventoryService;
public static class MassTransitServiceExtensions
{
public static IServiceCollection AddMassTransitWithRabbitMq(this IServiceCollection services)
{
return services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
// By default, sagas are in-memory, but should be changed to a durable
// saga repository.
x.SetInMemorySagaRepositoryProvider();
var entryAssembly = Assembly.GetEntryAssembly();
x.AddConsumers(entryAssembly);
x.AddSagaStateMachines(entryAssembly);
x.AddSagas(entryAssembly);
x.AddActivities(entryAssembly);
x.UsingRabbitMq((context, busConfig) =>
{
busConfig.Host(
host: "localhost",
port: 5672,
virtualHost: "masstransit",
configure: hostCOnfig=>
{
hostConfig.Username("guest");
hostConfig.Password("guest");
});
busConfig.ConfigureEndpoints(context);
});
});
}
}

订单服务

订单服务作为下单流程中的核心服务,主要职责包含接收创建订单请求和订单状态机的实现。先来定义OrderController如下:

namespace MassTransit.SmDemo.OrderService.Controllers;
[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
private readonly IBus _bus;
public OrderController(IBus bus)
{
_bus = bus;
}
[HttpPost]
public async Task CreateOrder(CreateOrderDto createOrderDto)
{
await _bus.Publish(new
{
createOrderDto.CustomerId,
createOrderDto.ShoppingCartItems
});
return Ok();
}
}

紧接着,订阅ICreateOrderCommand,执行订单创建逻辑,订单创建完毕后会发布ICreateOrderSucceed事件。

public class CreateOrderConsumer : IConsumer
{
private readonly ILogger _logger;
public CreateOrderConsumer(ILogger logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext context)
{
var shoppingItems =
context.Message.ShoppingCartItems.Select(item => new ShoppingCartItem(item.SkuId, item.Price, item.Qty));
var order = new Order(context.Message.CustomerId).NewOrder(shoppingItems.ToArray());
await OrderRepository.Insert(order);

_logger.LogInformation($"Order {owww.yii666.comrder.OrderId} created successfully");
await context.Publish(new
{
order.OrderId,
order.OrderItems
});
}
}

最后来实现订单状态机,主要包含以下几步:


  1. 定义状态机状态: 一个状态机从启动到结束可能会经历各种异常,包括程序异常或物理故障,为确保状态机能从异常中恢复,因此必须保存状态机的状态。本例中,定义OrderState以保存状态机实例状态数据:

using MassTransit.SmDemo.OrderService.Domains;
namespace MassTransit.SmDemo.OrderService;
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public Guid OrderId { get; set; }
public decimal Amount { get; set; }
public List OrderItems { get; set; }
}


  1. 定义状态机:直接继承自MassTransitStateMachine并同时指定状态实例即可:

namespace MassTransit.SmDemo.OrderService;
public class OrderStateMachine : MassTransitStateMachine
{
}


  1. 注册状态机:这里指定内存持久化方式来持久化状态,也可指定诸如MongoDb、MySQL等数据库进行状态持久化:

return services.AddMassTransit(x =>
{
//...
x.AddSagaStateMachine()
.InMemoryRepository();
}


  1. 定义状态列表:即状态机涉及到的系列状态,并通过State类型定义,本例中为:

    1. 已创建:public State Created { get; private set; }

    2. 库存已扣减:public State InventoryDeducted { get; private set; }

    3. 已支付:public State Paid { get; private set; }

    4. 已取消:public State Canceled { get; private set; }



  2. 定义转移条件:即推动状态流转的事件,通过Event类型定义,本例涉及有:

    1. 订单成功创建事件:public Event OrderCreated {get; private set;}

    2. 库存扣减成功事件:public Event DeduceInventorySucceed {get; private set;}

    3. 库存扣减失败事件:public Event DeduceInventoryFailed {get; private set;}

    4. 订单支付成功事件:public Event PayOrderSucceed {get; private set;}

    5. 订单支付失败事件:public Event PayOrderFailed {get; private set;}

    6. 库存已返还事件:public Event ReturnInventorySucceed { get; private set; }

    7. 订单取消事件:public Event OrderCanceled { get; private set; }



  3. 定义关联关系:由于每个事件都是孤立的,但相关联的事件终会作用到某个具体的状态机实例上,如何关联事件以推动状态机的转移呢?配置关联Id。以下就是将事件消息中的传递的OrderId作为关联ID。

    1. Event(() => OrderCreated, x => x.CorrelateById(m => m.Message.OrderId));

    2. Event(() => DeduceInventorySucceed, x => x.CorrelateById(m => m.Message.OrderId));

    3. Event(() => DeduceInventoryFailed, x => x.CorrelateById(m => m.Message.OrderId));

    4. Event(() => PayOrderSucceed, x => x.CorrelateById(m => m.Message.OrderId));



  4. 定义状态转移:即状态在什么条件下做怎样的动作完成状态的转移,本例中涉及的正向状态转移有:

(1) 初始状态->已创建:触发条件为OrderCreated事件,同时要发送IDeduceInventoryCommand推动库存服务执行库存扣减。

Initially(
When(OrderCreated)
.Then(cOntext=>
{
context.Saga.OrderId = context.Message.OrderId;
context.Saga.OrderItems = context.Message.OrderItems;
context.Saga.Amount = context.Message.OrderItems.Sum(x => x.Price * x.Qty);
})
.PublishAsync(cOntext=> context.Init(new
{
context.Saga.OrderId,
DeduceInventoryItems =
context.Saga.OrderItems.Select(x => new DeduceInventoryItem(x.SkuId, x.Qty)).ToList()
}))
.TransitionTo(Created));

(2) 已创建-> 库存已扣减:触发条件为DeduceInventorySucceed事件,同时要发送IPayOrderCommand推动支付服务执行订单支付。

During(Created,
When(DeduceInventorySucceed)
.Then(cOntext=>
{
context.Publish(new
{
context.Saga.OrderId,
context.Saga.Amount
});
}).TransitionTo(InventoryDeducted),
When(DeduceInventoryFailed).Then(cOntext=>
{
context.Publish(new
{
context.Saga.OrderId
});
})
);

(3) 库存已扣减->已支付:触发条件为PayOrderSucceed事件,转移到已支付后,流程结束。

During(InventoryDeducted,
When(PayOrderFailed).Then(cOntext=>
{
context.Publish(new
{
context.Message.OrderId,
ReturnInventoryItems =
context.Saga.OrderItems.Select(x => new ReturnInventoryItem(x.SkuId, x.Qty)).ToList()
});
}),
When(PayOrderSucceed).TransitionTo(Paid).Then(cOntext=> context.SetCompleted()));

最终完整版的OrderStateMachine如下所示:

using MassTransit.SmDemo.OrderService.Events;
using MassTransit.SmDemo.Shared.Contracts;
namespace MassTransit.SmDemo.OrderService;
public class OrderStateMachine : MassTransitStateMachine
{
public State Created { get; private set; }
public State InventoryDeducted { get; private set; }
public State Paid { get; private set; }
public State Canceled { get; private set; }
public Event OrderCreated { get; private set; }
public Event DeduceInventorySucceed { get; private set; }
public Event DeduceInventoryFailed { get; private set; }
public Event OrderCanceled { get; private set; }
public Event PayOrderSucceed { get; private set; }
public Event PayOrderFailed { get; private set; }
public Event ReturnInventorySucceed { get; private set; }
public Event OrderStateRequested { get; private set; }

public OrderStateMachine()
{
Event(() => OrderCreated, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => DeduceInventorySucceed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => DeduceInventoryFailed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => ReturnInventorySucceed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => PayOrderSucceed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => PayOrderFailed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => OrderCanceled, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => OrderStateRequested, x =>
{
x.CorrelateById(m => m.Message.OrderId);
x.OnM文章来源地址19328.htmlissingInstance(m =>
{
return m.ExecuteAsync(x => x.RespondAsync(new { x.Message.OrderId }));
});
});
InstanceState(x => x.CurrentState);
Initially(
When(OrderCreated)
.Then(cOntext=>
{
context.Saga.OrderId = context.Message.OrderId;
context.Saga.OrderItems = context.Message.OrderItems;
var amount = context.Message.OrderItems.Sum(x => x.Price * x.Qty);
context.Saga.Amount = amount;
})
.PublishAsync(cOntext=> context.Init(new
{
context.Saga.OrderId,
DeduceInventoryItems =
context.Saga.OrderItems.Select(x => new DeduceInventoryItem(x.SkuId, x.Qty)).ToList()
}))
.TransitionTo(Created));
During(Created,
When(DeduceInventorySucceed)
.Then(cOntext=>
{
context.Publish(new
{
contex文章来源地址19328.htmlt.Saga.OrderId,
context.Saga.Amount
});
}).TransitionTo(InventoryDeducted),
When(DeduceInventoryFailed).Then(cOntext=>
{
context.Publish(new
{
context.Saga.OrderId
www.yii666.com });
})
);
During(InventoryDeducted,
When(PayOrderFailed).Then(cOntext=>
{
context.Publish(new
{
context.Message.OrderId,
ReturnInventoryItems =
context.Saga.OrderItems.Select(x => new ReturnInventoryItem(x.SkuId, x.Qty)).ToList()
});
}),
When(PayOrderSucceed).TransitionTo(Paid).Then(cOntext=> context.SetCompleted()),
When(ReturnInventorySucceed)
.ThenAsync(cOntext=> context.Publish(new
{
context.Saga.OrderId
})).TransitionTo(Created));
DuringAny(When(OrderCanceled).TransitionTo(Canceled).ThenAsync(async cOntext=>
{
await Task.Delay(TimeSpan.FromSeconds(10));
await context.SetCompleted();
}));
DuringAny(
When(OrderStateRequested)
.RespondAsync(x => x.Init(new
{
x.Saga.OrderId,
State = x.Saga.CurrentState
}))
);
}
}

库存服务

库存服务在整个下单流程的职责主要是库存的扣减和返还,其仅需要订阅IDeduceInventoryCommandIReturnInventoryCommand两个命令并实现即可。代码如下所示:

using MassTransit.SmDemo.InventoryService.Repositories;
using MassTransit.SmDemo.Shared.Contracts;
namespace MassTransit.SmDemo.InventoryService.Consumers;
public class DeduceInventoryConsumer : IConsumer
{
private readonly ILogger _logger;
public DeduceInventoryConsumer(ILogger logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext context)
{
if (!CheckStock(context.Message.DeduceInventoryItems))
{
_logger.LogWarning($"Insufficient stock for order [{context.Message.OrderId}]!");
await context.Publish(
new { context.Message.OrderId, Reason = "insufficient stock" });
}
else
{
_logger.LogInformation($"Inventory has been deducted for order [{context.Message.OrderId}]!");
DeduceStocks(context.Message.DeduceInventoryItems);
await context.Publish(new { context.Message.OrderId });
}
}
private bool CheckStock(List deduceItems)
{
foreach (var stockItem in deduceItems)
{
if (InventoryRepository.GetStock(stockItem.SkuId) }
return true;
}
private void DeduceStocks(List deduceItems)
{
foreach (var stockItem in deduceItems)
文章来源站点https://www.yii666.com/ {
InventoryRepository.TryDeduceStock(stockItem.SkuId, stockItem.Qty);
}
}
}

namespace MassTransit.SmDemo.InventoryService.Consumers;
public class ReturnInventoryConsumer : IConsumer
{
private readonly ILogger _logger;
public ReturnInventoryConsumer(ILogger logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext context)
{
foreach (var returnInventoryItem in context.Message.ReturnInventoryItems)
{
InventoryRepository.ReturnStock(returnInventoryItem.SkuId, returnInventoryItem.Qty);
}
_logger.LogInformation($"Inventory has been returned for order [{context.Message.OrderId}]!");
await context.Publish(new { context.Message.OrderId });
}
}

支付服务

对于下单流程的支付用例来说,要么成功要么失败,因此仅需要订阅IPayOrderCommand命令即可,具体PayOrderConsumer实现如下:

using MassTransit.SmDemo.Shared.Contracts;
namespace MassTransit.SmDemo.PaymentService.Consumers;
public class PayOrderConsumer : IConsumer
{
private readonly ILogger _logger;
public PayOrderConsumer(ILogger logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext context)
{
await Task.Delay(TimeSpan.FromSeconds(10));
if (context.Message.Amount % 2 == 0)
{_logger.LogInformation($"Order [{context.Message.OrderId}] paid successfully!");
await context.Publish(new { context.Message.OrderId });
}
else
{
_logger.LogWarning($"Order [{context.Message.OrderId}] payment failed!");
await context.Publish(new
{
context.Message.OrderId,
Reason = "Insufficient account balance"
});
}
}
}

运行结果

启动三个项目,并在Swagger中发起订单创建请求,如下图所示:

由于订单总额为奇数,因此支付会失败,最终控制台输出如下图所示:

打开RabbitMQ后台,可以看见MassTransit按照约定创建了以下队列用于服务间的消息传递:

其中order-state队列绑定到类型为fanout的同名order-stateExchange,其绑定关系如下图所示,该Exchange负责从其他同名事件的Exchange转发事件。

总结

通过以上示例的讲解,相信了解到MassTransit StateMachine的强大之处。StateMachine充当着事务编排器的角色,通过集中定义状态、转移条件和状态转移的执行顺序,实现高内聚的事务流转控制,也确保了其他伴生服务仅需关注自己的业务逻辑,而无需关心事务的流转,真正实现了关注点分离。

来源于:MassTransit 知多少 | 基于StateMachine实现Saga编排式分布式事务


推荐阅读
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 本文介绍了Windows操作系统的版本及其特点,包括Windows 7系统的6个版本:Starter、Home Basic、Home Premium、Professional、Enterprise、Ultimate。Windows操作系统是微软公司研发的一套操作系统,具有人机操作性优异、支持的应用软件较多、对硬件支持良好等优点。Windows 7 Starter是功能最少的版本,缺乏Aero特效功能,没有64位支持,最初设计不能同时运行三个以上应用程序。 ... [详细]
  • 如何查询zone下的表的信息
    本文介绍了如何通过TcaplusDB知识库查询zone下的表的信息。包括请求地址、GET请求参数说明、返回参数说明等内容。通过curl方法发起请求,并提供了请求示例。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • 如何用UE4制作2D游戏文档——计算篇
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何用UE4制作2D游戏文档——计算篇相关的知识,希望对你有一定的参考价值。 ... [详细]
  • C# 7.0 新特性:基于Tuple的“多”返回值方法
    本文介绍了C# 7.0中基于Tuple的“多”返回值方法的使用。通过对C# 6.0及更早版本的做法进行回顾,提出了问题:如何使一个方法可返回多个返回值。然后详细介绍了C# 7.0中使用Tuple的写法,并给出了示例代码。最后,总结了该新特性的优点。 ... [详细]
  • 从零学Java(10)之方法详解,喷打野你真的没我6!
    本文介绍了从零学Java系列中的第10篇文章,详解了Java中的方法。同时讨论了打野过程中喷打野的影响,以及金色打野刀对经济的增加和线上队友经济的影响。指出喷打野会导致线上经济的消减和影响队伍的团结。 ... [详细]
  • 本文介绍了在Windows环境下如何配置php+apache环境,包括下载php7和apache2.4、安装vc2015运行时环境、启动php7和apache2.4等步骤。希望对需要搭建php7环境的读者有一定的参考价值。摘要长度为169字。 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • 数字账号安全与数据资产问题的研究及解决方案
    本文研究了数字账号安全与数据资产问题,并提出了解决方案。近期,大量QQ账号被盗事件引起了广泛关注。欺诈者对数字账号的价值认识超过了账号主人,因此他们不断攻击和盗用账号。然而,平台和账号主人对账号安全问题的态度不正确,只有用户自身意识到问题的严重性并采取行动,才能推动平台优先解决这些问题。本文旨在提醒用户关注账号安全,并呼吁平台承担起更多的责任。令牌云团队对此进行了长期深入的研究,并提出了相应的解决方案。 ... [详细]
  • Imtryingtofigureoutawaytogeneratetorrentfilesfromabucket,usingtheAWSSDKforGo.我正 ... [详细]
  • Spring学习(4):Spring管理对象之间的关联关系
    本文是关于Spring学习的第四篇文章,讲述了Spring框架中管理对象之间的关联关系。文章介绍了MessageService类和MessagePrinter类的实现,并解释了它们之间的关联关系。通过学习本文,读者可以了解Spring框架中对象之间的关联关系的概念和实现方式。 ... [详细]
  • 基于Axis、XFire、CXF的webservice客户端调用示例
    本文介绍了如何使用Axis、XFire、CXF等工具来实现webservice客户端的调用,以及提供了使用Java代码进行调用的示例。示例代码中设置了服务接口类、地址,并调用了sayHello方法。 ... [详细]
author-avatar
紫藤老君的八卦炉
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有