热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

netcore下RabbitMQ队列、死信队列、延时队列及小应用

原标题:netcore下RabbitMQ队列、死信队列、延时队列及小应用关于安装rabbitmq这里一笔掠过了。下面进入正题:1.新建aspnetcor

原标题:netcore下RabbitMQ队列、死信队列、延时队列及小应用

关于安装rabbitmq这里一笔掠过了。

下面进入正题:

1.新建aspnetcorewebapi空项目,NormalQueue,删除controllers文件夹已经无关的文件,这里为了偷懒不用console控制台:

public class Program
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddHostedService
();
builder.Services.AddHostedService
();
builder.Services.AddHostedService
();
var app = builder.Build();
/文章来源站点https://www.yii666.com// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.MapGet(
"/normal/{message}", ([FromRoute] string message) =>
{
ConnectionFactory factory
= new ConnectionFactory();
factory.HostName
= "localhost";
factory.Port
= 5672;
using (IConnection cOnnection= factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
var queueName = "rbTest202301";
channel.QueueDeclare(queueName,
true, false, false, null);
{
string sendMessage = string.Format("Message_{0}", message);
byte[] buffer = Encoding.UTF8.GetBytes(sendMessage);
IBasicProperties basicProperties
= channel.CreateBasicProperties();
basicProperties.DeliveryMode
= 2; //持久化 1=非持久化
channel.BasicPublish("", queueName, basicProperties, buffer);
Console.WriteLine(
"消息发送成功:" + sendMessage);
}
}
}
});
app.MapGet(
"/deadletterexchange/{message}",([FromRoute] string message) =>{
DeadLetterExchange.Send(message);
});
app.MapGet(
"/delayexchange/{message}", ([FromRoute] string message) => {
DelayExchange.SendMessage(message);
});
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
}
}

大概的介绍一下program文件:

这里有三个mini控制器,从这里发送对应的消息到rabbitmq

"/normal/{message}" 普通队列,

"/deadletterexchange/{message}" 死信队列

"/deadletterexchange/{message}" 延时队列

      builder.Services.AddHostedService();
builder.Services.AddHostedService
();
builder.Services.AddHostedService
();

这里就是消费的服务,注册成HostedService。

ConsumerService代码如下:

public class ConsumerService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine(
"normal Rabbitmq消费端开始工作!");
while (!stoppingToken.IsCancellationRequested)
{
ConnectionFactory factory
= new ConnectionFactory();
factory.HostName
= "localhost";
factory.Port
= 5672;

IConnection connection
= factory.CreateConnection();
{
IModel channel
= connection.CreateModel();
{
var queueName = "rbTest202301";
channel.QueueDeclare(queueName,
true, false, false, null);
//输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
channel.BasicQos(0, 1, false);
//在队列上定义一个消费者
var cOnsumer= new EventingBasicConsumer(channel);
channel.BasicConsume(queueName,
false, consumer);
consumer.Received
+= (ch, ea) =>
{
byte[] bytes = ea.Body.ToArray();
string str = Encoding.UTF8.GetString(bytes);
Console.WriteLine(
"队列消息:" + str.ToString());
//回复确认
channel.BasicAck(ea.DeliveryTag, false);
};
}
}
await Task.Delay(5000);
}
}
}

DeadLetterExchangeConsuerService代码如下:

public class DeadLetterExchangeConsuerService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine(
"RabbitMQ消费端死信队列开始工作");
while (!stoppingToken.IsCancellationRequested)
{
DeadLetterExchange.Consumer();
await Task.Delay(5000);
}
}
}


public class DeadLetterExchange
{
public static string dlxExchange = "dlx.exchange";
public static string dlxQueueName = "dlx.queue";
static string exchange = "direct-exchange";
static string queueName = "queue_Testdlx";
static string dlxExchangeKey = "x-dead-letter-exchange";
static string dlxQueueKey = "x-dead-letter-rounting-key";
public static void Send(string message)
{
using (var cOnnection= new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection())
{
using(var channel = connection.CreateModel())
{

channel.ExchangeDeclare(exchange, ExchangeType.Direct,
true, false); //创建交换机
channel.QueueDeclare(queueName, true, false, false,new Dictionary<string, object>
{
{ dlxExchangeKey,dlxExchange },
{dlxQueueKey,dlxQueueName }
});
// 创建队列
channel.QueueBind(queueName, exchange, queueName);
var properties = channel.CreateBasicProperties();
properties.Persistent
= true;//持久化
channel.BasicPublish(exchange,queueName,properties,Encoding.UTF8.GetBytes(message));
Console.WriteLine($
"向队列:{queueName}发送消息:{message}");
}
}
}

public static void Consumer()
{
var cOnnection= new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(dlxExchange, ExchangeType.Direct,
true, false); //创建sixin交换机
channel.QueueDeclare(dlxQueueName, true, false, false); // 创建sixin队列
channel.QueueBind(dlxQueueName, dlxExchange, dlxQueueName); //绑定sixin队列sixin交换机

channel.ExchangeDeclare(exchange, ExchangeType.Direct,
true, false); //创建交换机
channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object>
{
{ dlxExchangeKey,dlxExchange },
{dlxQueueKey,dlxQueueName }
});
// 创建队列
channel.QueueBind(queueName, exchange, queueName);
var cOnsumer= new EventingBasicConsumer(channel);
channel.BasicQos(
0, 1, false);
consumer.Received
+= (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($
"队列{queueName}消费消息:{message},不做ack确认");
channel.BasicNack(ea.DeliveryTag,
false, requeue: false);
};
channel.BasicConsume(queueName, autoAck:
false, consumer);
}
}

DelayExchangeConsumerService代码如下:

public class DelayExchangeConsumerService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine(
"RabbitMQ消费端延迟队列开始工作");
while (!stoppingToken.IsCancellationRequested)
{

DelayExchange.Consumer();
await Task.Delay(5000);
}
}
}


public class DelayExchange
{
public static void SendMessage(string message)
{
//死信交换机
string dlxexChange = "dlx.exchange";
//死信队列
string dlxQueueName = "dlx.queue";
//消息交换机
string exchange = "direct-exchange";
//消息队列
string queueName = "delay_queue";
using (var cOnnection= new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection())
{
using (var channel = connection.CreateModel())
{
////创建死信交换机
//channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
////创建死信队列
//channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
////死信队列绑定死信交换机
//channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);
// 创建消息交换机
channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
//创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
new Dictionary<string, object> {
{
"x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机)
{ "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
{ "x-message-ttl",10000} //设置队列的消息过期时间
});
//消息队列绑定消息交换机
channel.QueueBind(queueName, exchange, routingKey: queueName);
var properties = channel.CreateBasicProperties();
properties.Persistent
= true;
//properties.Expiration = "5000";发布消息,延时5s
//发布消息
channel.BasicPublish(exchange: exchange,
routingKey: queueName,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(message));
Console.WriteLine($
"{DateTime.Now},向队列:{queueName}发送消息:{message}");
}
}
}
public static void Consumer()
{
//死信交换机
string dlxexChange = "dlx.exchange";
//死信队列
string dlxQueueName = "dlx.queue";
var cOnnection= new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection();
{
//创建信道
var channel = connection.CreateModel();
{
//创建死信交换机
channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
//创建死信队列
channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
//死信队列绑定死信交换机
channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);
var consumwww.yii666.comer = new EventingBasicConsumer(channel);
channel.BasicQos(prefetchSize:
0, prefetchCount: 1, global: true);
consumer.Received
+= (model, ea) =>
{
//处理业务
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($
"{DateTime.Now},队列{dlxQueueName}消费消息:{message}");
channel.BasicAck(ea.DeliveryTag,
false);
};
channel.BasicConsume(dlxQueueName, autoAck:
false, consumer);
}
}
}
}

延时队列实际应用场景可能比较复杂,比如每条消息的过期时间不一样,收到的消息的顺序有可能会乱掉。这些不做深究,自行百度。

关于死信队列常见应用场景之一下单,支付,支付超时的各种场景,下面通过一个简单的例子模拟一下

同样的新建一个空的webapi项目DeadLetterQueue,

program代码如下:

public class Program
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddHostedService
();
builder.Services.AddHostedService
();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.MapGet(
"/normal/{message}", ([FromRoute] string message) =>
{
ConnectionFactory factory
= new ConnectionFactory();
factory.HostName
= "localhost";
factory.Port
= 5672;
using (IConnection cOnnection= factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
var queueName = "rbTest2023010";

//channel.ExchangeDeclare("exchange.dlx", ExchangeType.Direct, true);
//channel.QueueDeclare("queue.dlx", true, false, false, null);
channel.ExchangeDeclare("exchange.normal", ExchangeType.Fanout, true);
channel.QueueDeclare(queueName,
true, false, false,
new Dictionary<string, object>
{
{
"x-message-ttl" ,10000},
{
"x-dead-letter-exchange","exchange.dlx" },
{
"x-dead-letter-routing-key","routingkey" }
}
);

channel.QueueBind(queueName,
"exchange.normal", "");
{
string sendMessage = string.Format("Message_{0}", message);
byte[] buffer = Encoding.UTF8.GetBytes(sendMessage);
IBasicProperties basicProperties
= channel.CreateBasicProperties();
basicProperties.DeliveryMode
= 2; //持久化 1=非持久化
channel.BasicPublish("exchange.normal", queueName, basicProperties, buffer);
Console.WriteLine($
"{DateTime.Now}消息发送成功:{sendMessage}" );
}
}
}
});
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
}
}

下单后消费代码ConsumerService如下

public class ConsumerService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine(
"normal Rabbitmq消费端开始工作!");
while (!stoppingToken.IsCancellationRequested)
{
ConnectionFactory factory
= new ConnectionFactory();
factory.HostName
= "localhost";
factory.Port
= 5672;
IConnection connection
= factory.CreateConnection();
{
IModel channel
= connection.CreateModel();
{
var queueName = "rbTest2023010";
channel.ExchangeDeclare(
"exchange.normal", ExchangeType.Fanout, true);
channel.QueueDeclare(queueName,
true, false, false, new Dictionary<string, object>
{
{
"x-message-ttl" ,10000},
{
"x-dead-letter-exchange","exchange.dlx" },
{
"x-dead-letter-routing-key","routingkey" }
});
channel.QueueBind(queueName,
"exchange.normal", "");
//输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
channel.BasicQos(0, 1, false);
//在队列上定义一个消费者
var cOnsumer= new EventingBasicConsumer(channel);
channel.BasicConsume(queueName,
false, consumer);
consumer.Received
+= (ch, ea) =>
{
byte[] bytes = ea.Body.ToArray();
string str = Encoding.UTF8.GetString(bytes);
Console.WriteLine($
"{DateTime.Now}来自死信队列获取的消息: {str.ToString()}");
//回复确认
if (str.Contains("跳过")) //假设超时不处理,留给后面deadconsumerservice处理
{
Console.WriteLine($
"{DateTime.Now}来自死信队列获取的消息: {str.ToString()},该消息被拒绝");
channel.BasicNack(ea.DeliveryTag,
false,false);
}
else //正常消息处理
{
Console.WriteLine($
"{DateTime.Now}来自死信队列获取的消息: {str.ToString()},该消息被接受");
channel.BasicAck(ea.DeliveryTag,
false);
}
};
}
}
await Task.文章来源地址19049.htmlDelay(5000);
}
}
}

通过模拟发送的消息加入跳过两个字会拒收这条消息,这样就会跳到设置的exchange.dlx交换机队列去,如果没有跳过那么这条消息就正常处理掉,消费确认。

超时不处理后我们通过新的消费服务DeadConsumerService来处理这异常的消费,比如回复库存,订单状态改为取消等等

public class DeadConsumerService:BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine(
"normal Rabbitmq消费端开始工作!");
while (!stoppingToken.IsCancellationRequested)
{
ConnectionFactory factory
= new ConnectionFactory();
factory.HostName
= "localhost";
factory.Port
= 5672;

IConnection connection
= factory.CreateConnection();
{
IModel channel
= connection.CreateModel();
{
var queueName = "queue.dlx";
channel.ExchangeDeclare(
"exchange.dlx", ExchangeType.Direct, true);
channel.QueueDeclare(
"queue.dlx", true, false, false, null);

channel.QueueDeclare(queueName,
true, false, false, null);
channel.QueueBind(queueName,
"exchange.dlx", "");
//输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
channel.BasicQos(0, 1, false);
//在队列上定义一个消费者
var cOnsumer= new EventingBasicConsumer(channel);
channel.BasicConsume(
"queue.dlx", false, consumer);
consumer.Received
+= (ch, ea) =>
{
byte[] bytes = ea.Body.ToArray();
string str = Encoding.UTF8.GetString(bytes);
Console.WriteLine($
"{DateTime.Now}超时未处理的消息: {str.ToString()}");
//回复确认
{
channel.BasicAck(ea.DeliveryTag,
false);
}
};
文章来源地址19049.html}
}
await Task.Delay(5000);
www.yii666.com }
}
}

运行结果:

关于rabbitmq的死信队列和延时队列的介绍什么的这里不去贴baidu了,应用demo就这么多了,代码这里exercisebook/RabbitMQ.Test at main liuzhixin405/exercisebook (github.com) 。小面分享一个完整一点的例子。

exercisebook/cat.seckill/cat.seckill at main liuzhixin405/exercisebook (github.com)

感觉自己还是不合适写这些玩意儿,没有那么细心和耐心,有这时间真不如写写demo。

来源于:netcore下RabbitMQ队列、死信队列、延时队列及小应用


推荐阅读
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
  • 本文介绍了Windows操作系统的版本及其特点,包括Windows 7系统的6个版本:Starter、Home Basic、Home Premium、Professional、Enterprise、Ultimate。Windows操作系统是微软公司研发的一套操作系统,具有人机操作性优异、支持的应用软件较多、对硬件支持良好等优点。Windows 7 Starter是功能最少的版本,缺乏Aero特效功能,没有64位支持,最初设计不能同时运行三个以上应用程序。 ... [详细]
  • Imtryingtofigureoutawaytogeneratetorrentfilesfromabucket,usingtheAWSSDKforGo.我正 ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • Metasploit攻击渗透实践
    本文介绍了Metasploit攻击渗透实践的内容和要求,包括主动攻击、针对浏览器和客户端的攻击,以及成功应用辅助模块的实践过程。其中涉及使用Hydra在不知道密码的情况下攻击metsploit2靶机获取密码,以及攻击浏览器中的tomcat服务的具体步骤。同时还讲解了爆破密码的方法和设置攻击目标主机的相关参数。 ... [详细]
  • Android Studio Bumblebee | 2021.1.1(大黄蜂版本使用介绍)
    本文介绍了Android Studio Bumblebee | 2021.1.1(大黄蜂版本)的使用方法和相关知识,包括Gradle的介绍、设备管理器的配置、无线调试、新版本问题等内容。同时还提供了更新版本的下载地址和启动页面截图。 ... [详细]
  • 本文详细介绍了MysqlDump和mysqldump进行全库备份的相关知识,包括备份命令的使用方法、my.cnf配置文件的设置、binlog日志的位置指定、增量恢复的方式以及适用于innodb引擎和myisam引擎的备份方法。对于需要进行数据库备份的用户来说,本文提供了一些有价值的参考内容。 ... [详细]
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • imx6ull开发板驱动MT7601U无线网卡的方法和步骤详解
    本文详细介绍了在imx6ull开发板上驱动MT7601U无线网卡的方法和步骤。首先介绍了开发环境和硬件平台,然后说明了MT7601U驱动已经集成在linux内核的linux-4.x.x/drivers/net/wireless/mediatek/mt7601u文件中。接着介绍了移植mt7601u驱动的过程,包括编译内核和配置设备驱动。最后,列举了关键词和相关信息供读者参考。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • web.py开发web 第八章 Formalchemy 服务端验证方法
    本文介绍了在web.py开发中使用Formalchemy进行服务端表单数据验证的方法。以User表单为例,详细说明了对各字段的验证要求,包括必填、长度限制、唯一性等。同时介绍了如何自定义验证方法来实现验证唯一性和两个密码是否相等的功能。该文提供了相关代码示例。 ... [详细]
  • 本文介绍了深入浅出Linux设备驱动编程的重要性,以及两种加载和删除Linux内核模块的方法。通过一个内核模块的例子,展示了模块的编译和加载过程,并讨论了模块对内核大小的控制。深入理解Linux设备驱动编程对于开发者来说非常重要。 ... [详细]
author-avatar
ouyan1985_998
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有