热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

.net中rabbitmq如何使用

小编给大家分享一下.net中rabbitmq如何使用,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,

小编给大家分享一下.net中rabbitmq如何使用,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

    什么是rabbitMQ

            RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,
            能够实现异步消息处理      

            RabbitMQ是一个消息代理:它接受和转发消息。
            你可以把它想象成一个邮局:当你把你想要发布的邮件放在邮箱中时,你可以确定邮差先生最终将邮件发送给你的收件人。在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
            RabbitMQ和邮局的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块      

            优点:异步消息处理
                  业务解耦(下订单操作:扣减库存、生成订单、发红包、发短信),
                            将下单操作主流程:扣减库存、生成订单
                            然后通过MQ消息队列完成通知,发红包、发短信
                  错峰流控 (通知量 消息量 订单量大的情况实现MQ消息队列机制,淡季情况下访问量会少)     

                  灵活的路由(Flexible Routing)
                    在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。       

            RabbitMQ网站端口号:15672
            程序里面实现的端口为:5672

    Rabbitmq的关键术语

      1、绑定器(Binding):根据路由规则绑定Queue和Exchange。

      2、路由键(Routing Key):Exchange根据关键字进行消息投递。

      3、交换机(Exchange):指定消息按照路由规则进入指定队列

      4、消息队列(Queue):消息的存储载体

      5、生产者(Producer):消息发布者。

      6、消费者(Consumer):消息接收者。

    Rabbitmq的运作

      从下图可以看出,发布者(Publisher)是把消息先发送到交换器(Exchange),再从交换器发送到指定队列(Queue),而先前已经声明交换器与队列绑定关系,最后消费者(Customer)通过订阅或者主动取指定队列消息进行消费。

    .net中rabbitmq如何使用

      那么刚刚提到的订阅和主动取可以理解成,推(被动),拉(主动)。

      推,只要队列增加一条消息,就会通知空闲的消费者进行消费。(我不找你,就等你找我,观察者模式)

      拉,不会通知消费者,而是由消费者主动轮循或者定时去取队列消息。(我需要才去找你)

      使用场景我举个例子,假如有两套系统 订单系统和发货系统,从订单系统发起发货消息指令,为了及时发货,发货系统需要订阅队列,只要有指令就处理。

      可是程序偶尔会出异常,例如网络或者DB超时了,把消息丢到失败队列,这个时候需要重发机制。但是我又不想while(IsPostSuccess == True),因为只要出异常了,会在某个时间段内都会有异常,这样的重试是没意义的。

      这个时候不需要及时的去处理消息,有个JOB定时或者每隔几分钟(失败次数*间隔分钟)去取失败队列消息,进行重发。

    Publish(发布)的封装

      步骤:初始化链接->声明交换器->声明队列->换机器与队列绑定->发布消息。注意的是,我将Model存到了ConcurrentDictionary里面,因为声明与绑定是非常耗时的,其次,往重复的队列发送消息是不需要重新初始化的。

    /// 
              /// 交换器声明
              /// 
              /// 
              /// 交换器
              /// 交换器类型:
              /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全
              /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的
              /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog
             /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都
             /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout
             /// 交换机转发消息是最快的。
             /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多
             /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”
             /// 只会匹配到“audit.irs”。
             /// 持久化
             /// 自动删除
             /// 参数
             private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,
                 bool durable = true,
                 bool autoDelete = false, IDictionary arguments = null)
             {
                 exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();
                 iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);
             }
     
             /// 
             /// 队列声明
             /// 
             /// 
             /// 队列
             /// 持久化
             /// 排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,
             /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可
             /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连
             /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者
             /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
             /// 自动删除
             /// 参数
             private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,
                bool autoDelete = false, IDictionary arguments = null)
             {
                 queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();
                 channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
            }
     
            /// 
            /// 获取Model
            /// 
           /// 交换机名称
            /// 队列名称
             /// 
             /// 是否持久化
            /// 
             private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)
            {
              return ModelDic.GetOrAdd(queue, key =>
                {
                     var model = _conn.CreateModel();
                    ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);
                     QueueDeclare(model, queue, isProperties);
                    model.QueueBind(queue, exchange, routingKey);
                    ModelDic[queue] = model;
                     return model;
                });
            }
     
           /// 
           /// 发布消息
             /// 
           /// 路由键
             /// 队列信息
            /// 交换机名称
            /// 队列名
           /// 是否持久化
            /// 
             public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)
             {
               var channel = GetModel(exchange, queue, routingKey, isProperties);
    
                 try
                 {
                   channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());
                 }
                catch (Exception ex)
                {
                   throw ex.GetInnestException();
               }
           }

      下次是本机测试的发布速度截图:

    .net中rabbitmq如何使用

      4.2W/S属于稳定速度,把反序列化(ToJson)会稍微快一些。

    Subscribe(订阅)的封装

      发布的时候是申明了交换器和队列并绑定,然而订阅的时候只需要声明队列就可。从下面代码能看到,捕获到异常的时候,会把消息送到自定义的“死信队列”里,由另外的JOB进行定时重发,因此,finally是应答成功的。

    /// 
            /// 获取Model
            /// 
            /// 队列名称
            /// 
            /// 
            private static IModel GetModel(string queue, bool isProperties = false)
            {
                return ModelDic.GetOrAdd(queue, value =>
                 {
                     var model = _conn.CreateModel();
                     QueueDeclare(model, queue, isProperties);
    
                     //每次消费的消息数
                     model.BasicQos(0, 1, false);
    
                     ModelDic[queue] = model;
    
                     return model;
                 });
            }    
    
            /// 
            /// 接收消息
            /// 
            /// 
            /// 队列名称
            /// 
            /// 消费处理
            /// 
            public void Subscribe(string queue, bool isProperties, Action handler, bool isDeadLetter) where T : class
            {
                //队列声明
                var channel = GetModel(queue, isProperties);
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var msgStr = body.DeserializeUtf8();
                    var msg = msgStr.FromJson();
                    try
                    {
                        handler(msg);
                    }
                    catch (Exception ex)
                    {
                        ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                        if (!isDeadLetter)
                            PublishToDead(queue, msgStr, ex);
                    }
                    finally
                    {
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                };
                channel.BasicConsume(queue, false, consumer);
            }

      下次是本机测试的发布速度截图:

    .net中rabbitmq如何使用

      快的时候有1.9K/S,慢的时候也有1.7K/S

    Pull(拉)的封装

      直接上代码:

     /// 
            /// 获取消息
            /// 
            /// 
            /// 
            /// 
            /// 
            /// 消费处理
            private void Poll(string exchange, string queue, string routingKey, Action handler) where T : class
            {
                var channel = GetModel(exchange, queue, routingKey);
    
                var result = channel.BasicGet(queue, false);
                if (result.IsNull())
                    return;
    
                var msg = result.Body.DeserializeUtf8().FromJson();
                try
                {
                    handler(msg);
                }
                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                }
                finally
                {
                    channel.BasicAck(result.DeliveryTag, false);
                }
            }

    .net中rabbitmq如何使用

      快的时候有1.8K/s,稳定是1.5K/S

    Rpc(远程调用)的封装

      首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:

      1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常

      2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。

     /// 
            /// RPC客户端
            /// 
            /// 
            /// 
            /// 
            /// 
            /// 
            /// 
            public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
            {
                var channel = GetModel(exchange, queue, routingKey, isProperties);
    
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queue, true, consumer);
    
                try
                {
                    var correlationId = Guid.NewGuid().ToString();
                    var basicProperties = channel.CreateBasicProperties();
                    basicProperties.ReplyTo = queue;
                    basicProperties.CorrelationId = correlationId;
    
                    channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());
    
                    var sw = Stopwatch.StartNew();
                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();
                        if (ea.BasicProperties.CorrelationId == correlationId)
                        {
                            return ea.Body.DeserializeUtf8();
                        }
    
                        if (sw.ElapsedMilliseconds > 30000)
                            throw new Exception("等待响应超时");
                    }
                }
                catch (Exception ex)
                {
                    throw ex.GetInnestException();
                }
            }    
    
            /// 
            /// RPC服务端
            /// 
            /// 
            /// 
            /// 
            /// 
            /// 
            /// 
            public void RpcService(string exchange, string queue, bool isProperties, Func handler, bool isDeadLetter)
            {
                //队列声明
                var channel = GetModel(queue, isProperties);
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var msgStr = body.DeserializeUtf8();
                    var msg = msgStr.FromJson();
    
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;
    
                    try
                    {
                        msg = handler(msg);
                    }
                    catch (Exception ex)
                    {
                        ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                    }
                    finally
                    {
                        channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                };
                channel.BasicConsume(queue, false, consumer);
            }

      可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。

    以上是“.net中rabbitmq如何使用”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注编程笔记行业资讯频道!


    推荐阅读
    • 讨伐Java多线程与高并发——MQ篇
      本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
    • rabbitmq杂谈
      rabbitmq中的consumerTag和deliveryTag分别是干啥的,有什么用?同一个会话,consumerTag是固定的可以做此会话的名字,deliveryTag每次接 ... [详细]
    • java线程池的实现原理源码分析
      这篇文章主要介绍“java线程池的实现原理源码分析”,在日常操作中,相信很多人在java线程池的实现原理源码分析问题上存在疑惑,小编查阅了各式资 ... [详细]
    • RabbitMq的最终一致性分布式事务
      RabbitMq的最终一致性分布式事务使用rabbitmq的步骤1.运行安装在服务器上的rabbit服务2.在项目中安装依赖3.编写对应的配置文件4.创建对应配置并加上启动注解5. ... [详细]
    • OpenStack 的 Nova 和 Glance 组件
      简单回顾一下OpenStack三大组件的用途:OpenStackCompute(Nova),为云组织的控制器,它提供一个工具来部署云&#x ... [详细]
    •  Rabbitmq是对AMQP协议的一种实现。使用范围也比较广泛,主要用于消息异步通讯。 ... [详细]
    • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
    • 栈和队列的共同处和不同处
      本文主要介绍了栈和队列的共同处和不同处。栈和队列都是由几个数据特性相同的元素组成的有限序列,也就是线性表。队列是限定仅在表的一端插入元素、在另一端删除元素的线性表,遵循先进先出的原则。栈是限定仅在表尾进行插入或删除操作的线性表,遵循后进先出的原则。 ... [详细]
    • 重入锁(ReentrantLock)学习及实现原理
      本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
    • 本文介绍了在MFC下利用C++和MFC的特性动态创建窗口的方法,包括继承现有的MFC类并加以改造、插入工具栏和状态栏对象的声明等。同时还提到了窗口销毁的处理方法。本文详细介绍了实现方法并给出了相关注意事项。 ... [详细]
    • 第七课主要内容:多进程多线程FIFO,LIFO,优先队列线程局部变量进程与线程的选择线程池异步IO概念及twisted案例股票数据抓取 ... [详细]
    • 一、死锁现象与递归锁进程也是有死锁的所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作 ... [详细]
    • mapreduce源码分析总结
      这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
    • 项目需要将音视频文件上传服务器,考虑并发要求高,通过七牛来实现。直接上代码usingQiniu.IO;usingQiniu.IO.Resumable;usingQiniu.RPC; ... [详细]
    • 阿里首席架构师科普RPC框架
      RPC概念及分类RPC全称为RemoteProcedureCall,翻译过来为“远程过程调用”。目前,主流的平台中都支持各种远程调用技术,以满足分布式系统架构中不同的系统之间的远程 ... [详细]
    author-avatar
    铭娟佑廷雅冰
    这个家伙很懒,什么也没留下!
    PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
    Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有