热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ(二):入门案例

一、功能功能描述:生产者将消息发送到队列(队列的名字为hello)中,消费者从队列中获取消息。二、生产者定义队列名publicfinalstatic

一、功能

功能描述:生产者将消息发送到队列(队列的名字为hello)中,消费者从队列中获取消息。

这里写图片描述

二、生产者
    // 定义队列名
    public final static String QUEUE_NAME="rabbitMQ.zhxb";
    public void rabbitMQ (){
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 设置RabbitMQ参数
            factory.setHost("IP");
            factory.setUsername("*");
            factory.setPassword("*");
            factory.setPort(*);
            // 创建一个连接
            Connection cOnnection= factory.newConnection();
            // 创建一个通道(大连接中的一根线)
            Channel channel = connection.createChannel();
            // 声明一个队列
            // 第一个参数表示队列名称、第二个参数为是否持久化、第三个参数为是否是独占队列、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义一个消息
            String message = "Hello RabbitMQ";
            // 发送消息到队列
       // 第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("我向"+QUEUE_NAME+"发送一个消息:+'" + message); // 关闭通道和连接 channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }
步骤说明:

连接工厂:生产Connection的的工厂

连接:RabbitMQ的socket链接,它封装了socket协议

通道:把Connection比作一条光纤电缆的话,那么Channel信道就比作成光纤电缆中的其中一束光纤

声明队列:rabbitMQ.zhxb,储存消息的容器(非持久化)

发送消息:将消息发送到交换机,然后交换机通过路由策略(规则)将消息路由到匹配的队列中去

步骤详解:

1.声明一个队列

// 声明一个队列

// 第一个参数表示队列名称、第二个参数为是否持久化、第三个参数为是否是独占队列、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

参数详解
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments);
  • queue: 队列名称
  • durable:是否持久化(true表示是,队列将在服务器重启时生存), 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
  • exclusive:是否排外的/是否是独占队列,有两个作用(创建者可以使连接断开后自动删除,可使其变为某一消费者的私有队列)一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景*/
  • autoDelete:是当所有消费者客户端连接断开时是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当cOnsumers= 0时队列就会自动删除
  • arguments:其他属性(消息什么时候会自动被删除、自动过期、最长长度、最大长度字节、死信交换、死信路由键、最高优先级、懒惰模式)
  • 注意:关于队列的声明,如果使用同一套参数进行声明了,就不能再使用其他参数来声明,要么删除该队列重新删除,可以使用命令行删除也可以在RabbitMQ Management上删除,要么给队列重新起一个名字。
arguments具体说明
  • Message TTL:队列中的消息什么时候会自动被删除,(x-message-ttl)
  • Auto expire:自动过期,(x-expires):
  • Max length:最长长度,(x-max-length)
  • Max length bytes:最大长度字节,(x-max-length-bytes)
  • Dead letter exchange:死信交换,(x-dead-letter-exchange)
  • Dead letter routing key:死信路由键,(x-dead-letter-routing-key)
  • Maximum priority:最高优先级,(x-max-priority)
  • Lazy mode:懒惰模式,(x-queue-mode=lazy)
  • Master locator(x-queue-master-locator)
官方解释:
  • 发布到队列的消息在丢弃之前可以存活多长时间(毫秒)。(设置“ x-message-ttl ”参数。)
  • 在自动删除队列(毫秒)之前,队列可以使用多长时间。(设置“ x-expires ”参数。)
  • 在队列开始从队列中删除之前,队列可以包含多少(就绪)消息。(设置“ x-max-length ”参数。)
  • 队列在开始从头部删除之前可以包含的就绪消息的总体大小。(设置“ x-max-length-bytes ”参数。)
  • 如果邮件被拒绝或过期,将重新发布邮件的交换的可选名称。(设置“ x-dead-letter-exchange ”参数。)
  • 邮件是无效的时候使用的可选替换路由密钥。如果未设置,将使用消息的原始路由密钥。(设置“ x-dead-letter-routing-key ”参数。)
  • 要支持的队列的最大优先级数; 如果未设置,则队列将不支持消息优先级。(设置“ x-max-priority ”参数。)
  • 将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少RAM使用; 如果未设置,队列将保留内存缓存以尽快传递消息。(设置“ x-queue-mode ”参数。)
增加解释:
  • 设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期),单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL,也可以在发布消息的时候单独为某个消息指定剩余生存时间(但在消息发送的设置里)
  • 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp
  • 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim
  • 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B
  • 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX
  • 将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK
  • 优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,
  • 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中

2.发送消息到队列

// 发送消息到队列
// 第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
void basicPublish(String neme, String routingKey, boolean mandatory, boolean immediate, BasicProperties properties, byte[] message) throws IOException;
  • neme:为交换机名称
  • routingKey:路由键(队列映射的路由key),#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
  • mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉
  • immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。(简单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。)
  • properties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
  • message:消息主体
// 给某个消息设置存在时间
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration("6000");
channel.basicPublish("", QUEUE_NAME, properties.build(), message.getBytes("UTF-8"));

3.强调一个概念:

  • 持久化
  • 队列持久化:重启RabbitMQ服务器队列还在
  • 消息持久化:重启RabbitMQ服务器消息还在(设置消息持久化必须先设置队列持久化,要不然队列不持久化,队列都不存在了,消息存在还有什么意义。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到持久化的目的)
三、消费者
    // 定义队列名
    private final static String QUEUE_NAME = "rabbitMQ.zhxb";
    public void rabbitMQCustomer() {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 设置RabbitMQ参数
            factory.setHost("IP");
            factory.setUsername("*");
            factory.setPassword("*");
            factory.setPort(*);
            // 创建一个连接
            Connection cOnnection= factory.newConnection();
            // 创建一个通道
            Channel channel = connection.createChannel();
            // 声明一个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println("消费者一,等待消息产生");
            // DefaultConsumer类实现了Consumer接口,通过传入一个频道,
            // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
            // 消费一个消息
            Consumer cOnsumer= new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("我是消费者一,我消费了:" + message);
                    try {
                        Thread thread = Thread.currentThread();
                        thread.sleep(5000);//暂停1.5秒后程序继续执行
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            // 自动应答回复队列(RabbitMQ中的消息确认机制)
            channel.basicConsume(QUEUE_NAME, true, consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
步骤详解:

1.声明一个队列

// 声明一个队列
// 第一个参数表示队列名称、第二个参数为是否持久化、第三个参数为是否是独占队列、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 规则一单指定,必须保持一致

2.消费一个消息(订阅方式)

 
 
// 消费一个消息
Consumer cOnsumer= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("我消费了:" + message);
}
};

订阅方式:
  • 订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,
  • rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息,
  • 当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去。
消费原理:
  • DefaultConsumer类实现了Consumer接口,
  • consumer通过建立到queue的连接connection,创建channel对象,
  • 通过传入这个频道,告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery来消费这个消息

3.RabbitMQ中的消息确认机制

// 自动应答回复队列(RabbitMQ中的消息确认机制)
channel.basicConsume(QUEUE_NAME, true, consumer);

原理:
  • true:自动应答,即消费者获取到消息,该消息就会从队列中删除掉,
  • false:手动应答,当从队列中取出消息后,需要程序员手动调用方法应答,如果没有应答,该消息还会再放进队列中,就会出现该消息一直没有被消费掉的现象

  


推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • Java 中优先级队列的轮询方法详解与应用 ... [详细]
  • 本文深入探讨了 MXOTDLL.dll 在 C# 环境中的应用与优化策略。针对近期公司从某生物技术供应商采购的指纹识别设备,该设备提供的 DLL 文件是用 C 语言编写的。为了更好地集成到现有的 C# 系统中,我们对原生的 C 语言 DLL 进行了封装,并利用 C# 的互操作性功能实现了高效调用。此外,文章还详细分析了在实际应用中可能遇到的性能瓶颈,并提出了一系列优化措施,以确保系统的稳定性和高效运行。 ... [详细]
  • 本题库精选了Java核心知识点的练习题,旨在帮助学习者巩固和检验对Java理论基础的掌握。其中,选择题部分涵盖了访问控制权限等关键概念,例如,Java语言中仅允许子类或同一包内的类访问的访问权限为protected。此外,题库还包括其他重要知识点,如异常处理、多线程、集合框架等,全面覆盖Java编程的核心内容。 ... [详细]
  • Go语言中Goroutine与通道机制及其异常处理深入解析
    在Go语言中,Goroutine可视为一种轻量级的并发执行单元,其资源消耗远低于传统线程,初始栈大小仅为2KB,而普通线程则通常需要几MB。此外,Goroutine的调度由Go运行时自动管理,能够高效地支持成千上万个并发任务。本文深入探讨了Goroutine的工作原理及其与通道(channel)的配合使用,特别是在异常处理方面的最佳实践,为开发者提供了一套完整的解决方案,以确保程序的稳定性和可靠性。 ... [详细]
  • 本文将深入探讨Java编程语言中顶级类`Object`的源码实现,旨在为Java新手提供进阶指导。`Object`类是所有Java类的基类,了解其内部机制对于提升编程技能至关重要。文章首先介绍了API文档的使用方法,这对于有开发经验的Java程序员来说是不可或缺的工具。通过详细解析`Object`类的关键方法和属性,读者可以更好地理解Java的核心原理和设计思想。此外,文章还提供了实际代码示例,帮助读者在实践中掌握这些知识。 ... [详细]
  • 深入解析JWT的实现与应用
    本文深入探讨了JSON Web Token (JWT) 的实现机制及其应用场景。JWT 是一种基于 RFC 7519 标准的开放性认证协议,用于在各方之间安全地传输信息。文章详细分析了 JWT 的结构、生成和验证过程,并讨论了其在现代 Web 应用中的实际应用案例,为开发者提供了全面的理解和实践指导。 ... [详细]
  • 深入解析零拷贝技术(Zerocopy)及其应用优势
    零拷贝技术(Zero-copy)是Netty框架中的一个关键特性,其核心在于减少数据在操作系统内核与用户空间之间的传输次数。通过避免不必要的内存复制操作,零拷贝显著提高了数据传输的效率和性能。本文将深入探讨零拷贝的工作原理及其在实际应用中的优势,包括降低CPU负载、减少内存带宽消耗以及提高系统吞吐量等方面。 ... [详细]
  • 深入理解Spark框架:RDD核心概念与操作详解
    RDD是Spark框架的核心计算模型,全称为弹性分布式数据集(Resilient Distributed Dataset)。本文详细解析了RDD的基本概念、特性及其在Spark中的关键操作,包括创建、转换和行动操作等,帮助读者深入理解Spark的工作原理和优化策略。通过具体示例和代码片段,进一步阐述了如何高效利用RDD进行大数据处理。 ... [详细]
  • Android ListView 自定义 CheckBox 实现列表项多选功能详解
    本文详细介绍了在Android开发中如何在ListView的每一行添加CheckBox,以实现列表项的多选功能。用户不仅可以通过点击复选框来选择项目,还可以通过点击列表的任意一行来完成选中操作,提升了用户体验和操作便捷性。同时,文章还探讨了相关的事件处理机制和布局优化技巧,帮助开发者更好地实现这一功能。 ... [详细]
  • 掌握并发编程的关键:深入解析三大核心挑战
    掌握并发编程的关键:深入解析三大核心挑战 ... [详细]
  • 深入解析十大经典排序算法:动画演示、原理分析与代码实现
    本文深入探讨了十种经典的排序算法,不仅通过动画直观展示了每种算法的运行过程,还详细解析了其背后的原理与机制,并提供了相应的代码实现,帮助读者全面理解和掌握这些算法的核心要点。 ... [详细]
  • 本文深入剖析了ScheduledThreadPoolExecutor的并发执行机制及其源代码,详细解读了该线程池如何在指定延时或定期执行任务,探讨了其内部的工作原理和优化策略,为开发者提供了宝贵的参考和实践指导。 ... [详细]
  • IIS 7及7.5版本中应用程序池的最佳配置策略与实践
    在IIS 7及7.5版本中,优化应用程序池的配置是提升Web站点性能的关键步骤。具体操作包括:首先定位到目标Web站点的应用程序池,然后通过“应用程序池”菜单找到对应的池,右键选择“高级设置”。在一般优化方案中,建议调整以下几个关键参数:1. **基本设置**: - **队列长度**:默认值为1000,可根据实际需求调整队列长度,以提高处理请求的能力。此外,还可以进一步优化其他参数,如处理器使用限制、回收策略等,以确保应用程序池的高效运行。这些优化措施有助于提升系统的稳定性和响应速度。 ... [详细]
  • Java 并发容器 ConcurrentLinkedQueue 的 peek() 方法解析与应用 ... [详细]
author-avatar
kiki俏佳人2502909673
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有