热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

如何用.NETCore操作RabbitMQ

这篇文章主要介绍了如何用.NETCore操作RabbitMQ,对中间件感兴趣的同学,可以参考下

什么是RabbitMQ?

RabbitMQ是由erlang语言开发的一个基于AMQP(Advanced Message Queuing Protocol)协议的企业级消息队列中间件。可实现队列,订阅/发布,路由,通配符等工作模式。

为什么要使用RabbitMQ?

  • 异步处理:比如发送邮件,发送短信等不需要等待处理结果的操作
  • 应用解耦:比如下单成功后,通知仓库发货,不需要等待仓库回应,通过消息队列去通知仓库,降低应用间耦合程序,可并行开发两个功能模块
  • 流量削锋:在抢购或者其他的活动页,服务处于爆发式请求状态,如果直连数据库,数据库容易被拖垮。抢购商品也容易出现库存超卖的情况。通过队列可有效解决该问题。
  • 日志处理:在单机中,日志直接写入到文件目录中,但是在分布式应用中,日志需要有统一的处理机制,可通过消息队列统一由某个消费端做处理。
  • 消息通信:如生产端和消费端可通过队列进行异步通信

如何安装RabbitMQ?

Windows端

1.安装erlang语言运行环境
https://erlang.org/download/otp_win64_23.2.exe
下载后直接下一步即可

2.安装RabbitMQ
https://www.rabbitmq.com/install-windows.html
直接点击安装下一步即可按章

3.安装RabbitMQ的Web管理平台

RabbitMQ的管理平台是通过插件的形式使用,需要手动启用管理平台
在Windows下,RabbitMQ默认被安装到C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.14 下。
打开sbin ,在cmd或者powershell中执行

rabbitmq-plugins.bat enable rabbitmq_management

安装完成后,浏览器打开 http://localhost:15672/ 即可看到RabbitMQ的管理界面。输入默认账号密码 guest 成功登录。

Linux环境安装

1.Ubuntu:https://www.rabbitmq.com/install-debian.html

2.Centos:https://www.rabbitmq.com/install-rpm.html

RabbitMQ的基本概念

生产者

发送消息的端

消费者

获取消息并处理的端

Connection

一个终端连接。每一个Connection都可以在RabbitMQ后台看到

Channel

Channel是建立在Connection上的一个虚拟通信管道。一般情况下,往消息队列中写入多条消息,为了不每条消息都建立一个TCP连接,所以RabbitMQ的做法是多条消息可以公用一个Connection,大大提高MQ的负载能力。

Exchange

Exchange是一个虚拟交换机。每一条消息都必须要通过交换机才能能进入对应的队列,可以理解为网络设备中的交换机,是一个意思。

Queue

Queue是一个存储消息的内部对象,所有的Rabbit MQ消息都存储在Queue中。生产者所生产的消息会存储在Queue中,消费者获取的消息也是从Queue中获取。

如何在.NET Core中使用RabbitMQ?

nuget安装

dotnet add package RabbitMQ.Client

创建生产者

const string QUEUENAME = "HELLO_MQ";
//创建连接对象工厂
var factory = new ConnectionFactory()
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost",
    Port = 5672,  //RabbitMQ默认的端口
};

while (true)
{
    using var cOnn= factory.CreateConnection();
    var chanel = conn.CreateModel();

    chanel.QueueDeclare(QUEUENAME, true, false, false);
    Console.WriteLine("输入生产内容:");
    var input = Console.ReadLine();
    chanel.BasicPublish("", QUEUENAME, null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
}

在循环中,输入一个值,按下enter,即可推送一条消息到队列。

也可以直接在RabbitMQ的管理后台查看

可以看到我们发送的消息已经被RabbitMQ存储在Queue中了。只等某个幸运的消费者前来消费。

创建消费者

const string QUEUENAME = "HELLO_MQ";
var factory = new ConnectionFactory()
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost",
    Port = 5672,
};

var cOnn= factory.CreateConnection();
var chanel = conn.CreateModel();
chanel.QueueDeclare(QUEUENAME, true, false, false);
EventingBasicConsumer cOnsumer= new EventingBasicConsumer(chanel);
consumer.Received += (a, e) =>
{
    Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
    chanel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
};
chanel.BasicConsume(QUEUENAME, false, consumer);

Console.WriteLine("启动成功");
Console.ReadLine();

启动成功后,consumer的Received方法,会收到一条来自MQ的消息,

如果处理完成后,不调用chennel的BasicAck方法,那么这条消息依然会存在,下次有消费者出现,会再次推送给消费者。

简单的RabbitMQ Hello World到这里就算完成了。接下来就是稍微高级一点的应用

RabbitMQ的工作模式

Work Queue 工作队列模式

工作队列模式的意思就是一个生产者对应多个消费者。RabbitMQ会使用轮询去给每个消费者发送消息。

publish/subscribe

发布订阅模式是属于比较用多的一种。

发布订阅,是由交换机发布消息给多个队列。多个队列再对应多个消费者。

发布订阅模式对应的交换机类型的fanout。

消费者

A

const string QUEUENAME = "HELLO_MQ_B";
const string TESTEXCHANGE = "TESTEXCHANGE";
var factory = new ConnectionFactory()
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost",
    Port = 5672,
};

var cOnn= factory.CreateConnection();
var channel = conn.CreateModel();
//定义队列
channel.QueueDeclare(QUEUENAME, true, false, false);
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
//绑定队列到交换机
channel.QueueBind(QUEUENAME, TESTEXCHANGE, "");
var cOnsumer= new EventingBasicConsumer(channel);
consumer.Received += (a, e) =>
{
    Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
    channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
};
channel.BasicConsume(QUEUENAME, false, consumer);

Console.WriteLine("启动成功");
Console.ReadLine();

B

const string QUEUENAME = "HELLO_MQ";
const string TESTEXCHANGE = "TESTEXCHANGE";
var factory = new ConnectionFactory()
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost",
    Port = 5672,
};

var cOnn= factory.CreateConnection();
var channel = conn.CreateModel();
//定义队列
channel.QueueDeclare(QUEUENAME, true, false, false);
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
//绑定队列到交换机
channel.QueueBind(QUEUENAME, TESTEXCHANGE, "");
var cOnsumer= new EventingBasicConsumer(channel);
consumer.Received += (a, e) =>
{
    Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
    channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
};
channel.BasicConsume(QUEUENAME, false, consumer);

Console.WriteLine("启动成功");
Console.ReadLine();

生产者

const string QUEUENAME = "HELLO_MQ";
const string QUEUENAME_B = "HELLO_MQ_B";
const string TESTEXCHANGE = "TESTEXCHANGE";

//创建连接对象工厂
var factory = new ConnectionFactory()
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost",
    Port = 5672,  //RabbitMQ默认的端口
};
using var cOnn= factory.CreateConnection();
while (true)
{

    var channel = conn.CreateModel();
    //定义交换机
    channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
    Console.WriteLine("输入生产内容:");
    var input = Console.ReadLine();
    channel.BasicPublish(TESTEXCHANGE,"", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
}

在生产者运行成功后,RabbitMQ后台会出现一个交换机,点击交换机会看到交换机下绑定了两个队列

从生产者发送消息到队列,两个消费者会同时收到消息

routing模式

routing模式对应的交换机类型是direct,和发布订阅模式的区别在于:routing模式下,可以指定一个routingkey,用于区分消息

生产者

var channel = conn.CreateModel();
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
//绑定队列到交换机
Console.WriteLine("输入生产内容:");
var input = Console.ReadLine();
channel.BasicPublish(TESTEXCHANGE, "INFO", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));

消费者 A

//定义队列
channel.QueueDeclare(QUEUENAME, true, false, false);
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
//绑定队列到交换机
channel.QueueBind(QUEUENAME, TESTEXCHANGE, "INFO");

消费者 B

//定义队列
channel.QueueDeclare(QUEUENAME, true, false, false);
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
//绑定队列到交换机
channel.QueueBind(QUEUENAME, TESTEXCHANGE, "ERROR");

绑定成功后,发送消息,消费者A可以收到消息,消费者B无法收到消息。

如果遇到指定routingKey生产一条消息,结果 AB消费者都收到的情况。建议在RabbitMQ后台的交换机下看一下绑定的Queue是否重复绑定了多个routingKey.

topic通配符模式

在通配符模式下,RabbitMQ使用模糊匹配来决定把消息推送给哪个生产者。通配符有两个符号来匹配routingKey

1.*匹配一个字符 如:*.qq.com 可匹配 1.qq.com

2.#匹配一个或者多个字符。 如:*.qq.com 可匹配 1.qq.com或者1111.qq.com

其他的操作基本和routing模式一样。

header模式

header模式是把routingkey放到header中.取消掉了routingKey。并使用一个字典传递 K、V的方式来匹配。
比如同时要给用户发送邮件和短信,可直接通过header的键值对来匹配绑定的值,把消息传递给发短信和邮件的生产者.

以上就是如何用.NETCore操作RabbitMQ的详细内容,更多关于.NETCore 操作RabbitMQ的资料请关注其它相关文章!


推荐阅读
  • 如何将CentOS8转换为CentOSStream
    CentOS Stream是一个持续交付的Linux发行版,它在RHEL之前处于领先地位。它将具有滚动发布,即不断进行更改。CentOS将成为一个上游版本,它将具有测试补丁和更新。 ... [详细]
  • 系统安装Debian系统的安装方式和Ubuntu系统的安装方式几乎是一样的,毕竟Ubuntu系统是基于Debian的,就如同CentOS基于Redhat ... [详细]
  • 像跟踪分布式服务调用那样跟踪Go函数调用链 | Gopher Daily (2020.12.07) ʕ◔ϖ◔ʔ
    每日一谚:“Acacheisjustamemoryleakyouhaven’tmetyet.”—Mr.RogersGo技术专栏“改善Go语⾔编程质量的50个有效实践” ... [详细]
  • Ubuntu 9.04中安装谷歌Chromium浏览器及使用体验[图文]
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
  • 如何查看电脑系统版本_腾讯云服务器系统版本怎么看?Windows和Centos版本怎么选?...
    腾讯云服务器系统版本怎么看?想要知道自己的腾讯云服务器系统版本是哪个,可以登录云服务器后台管理系统查看,或者使用命令行查询,如果不会操作& ... [详细]
  • 在Docker中,将主机目录挂载到容器中作为volume使用时,常常会遇到文件权限问题。这是因为容器内外的UID不同所导致的。本文介绍了解决这个问题的方法,包括使用gosu和suexec工具以及在Dockerfile中配置volume的权限。通过这些方法,可以避免在使用Docker时出现无写权限的情况。 ... [详细]
  • Webmin远程命令执行漏洞复现及防护方法
    本文介绍了Webmin远程命令执行漏洞CVE-2019-15107的漏洞详情和复现方法,同时提供了防护方法。漏洞存在于Webmin的找回密码页面中,攻击者无需权限即可注入命令并执行任意系统命令。文章还提供了相关参考链接和搭建靶场的步骤。此外,还指出了参考链接中的数据包不准确的问题,并解释了漏洞触发的条件。最后,给出了防护方法以避免受到该漏洞的攻击。 ... [详细]
  • 本文介绍了Linux系统中正则表达式的基础知识,包括正则表达式的简介、字符分类、普通字符和元字符的区别,以及在学习过程中需要注意的事项。同时提醒读者要注意正则表达式与通配符的区别,并给出了使用正则表达式时的一些建议。本文适合初学者了解Linux系统中的正则表达式,并提供了学习的参考资料。 ... [详细]
  • imx6ull开发板驱动MT7601U无线网卡的方法和步骤详解
    本文详细介绍了在imx6ull开发板上驱动MT7601U无线网卡的方法和步骤。首先介绍了开发环境和硬件平台,然后说明了MT7601U驱动已经集成在linux内核的linux-4.x.x/drivers/net/wireless/mediatek/mt7601u文件中。接着介绍了移植mt7601u驱动的过程,包括编译内核和配置设备驱动。最后,列举了关键词和相关信息供读者参考。 ... [详细]
  • CEPH LIO iSCSI Gateway及其使用参考文档
    本文介绍了CEPH LIO iSCSI Gateway以及使用该网关的参考文档,包括Ceph Block Device、CEPH ISCSI GATEWAY、USING AN ISCSI GATEWAY等。同时提供了多个参考链接,详细介绍了CEPH LIO iSCSI Gateway的配置和使用方法。 ... [详细]
  • centos安装Mysql的方法及步骤详解
    本文介绍了centos安装Mysql的两种方式:rpm方式和绿色方式安装,详细介绍了安装所需的软件包以及安装过程中的注意事项,包括检查是否安装成功的方法。通过本文,读者可以了解到在centos系统上如何正确安装Mysql。 ... [详细]
  • CentOS7.8下编译muduo库找不到Boost库报错的解决方法
    本文介绍了在CentOS7.8下编译muduo库时出现找不到Boost库报错的问题,并提供了解决方法。文章详细介绍了从Github上下载muduo和muduo-tutorial源代码的步骤,并指导如何编译muduo库。最后,作者提供了陈硕老师的Github链接和muduo库的简介。 ... [详细]
  • 在Windows10系统上使用VMware创建CentOS虚拟机的详细步骤教程
    本文详细介绍了在Windows10系统上使用VMware创建CentOS虚拟机的步骤,包括准备条件、安装VMware、下载CentOS ISO文件、创建虚拟机并进行自定义配置、设置虚拟机的ISO与网络、进行安装和配置等。通过本文的指导,读者可以轻松地创建自己的CentOS虚拟机并进行相应的配置和操作。 ... [详细]
  • {moduleinfo:{card_count:[{count_phone:1,count:1}],search_count:[{count_phone:4 ... [详细]
  • Ubuntu 用户安装 Linux Kernel 3.15 RC1
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
author-avatar
丹丹2502912601
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有