热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

重学“消息队列”,详解RabbitMQ消息确认机制!

引言RabbitMQ的模型是生产者发送信息到Broker(代理),消费者从Broker中取出信息。但是生产者怎么知道消息是否真的发送到Broker中了呢?Br
引言

RabbitMQ的模型是生产者发送信息到 Broker (代理),消费者从 Broker 中取出信息。但是生产者怎么知道消息是否真的发送到 Broker 中了呢?Broker 又怎么知道消息到底有没有被消费者消费?

如果由于网络原因出现故障,生产者生产的消息未到达 Broker 或者 Broker 的消息被虚假消费,而它们又不知道,就会产生很严重的问题,如重复消费等。

重学“消息队列”,详解 RabbitMQ 消息确认机制!
image
01 RabbitMQ的消息确认流程
重学“消息队列”,详解 RabbitMQ 消息确认机制!
image

从图中可以看出:

消息确认机制分为生产者确认和消费者确认

  • ConfirmCallback 生产者
  • ReturnCallback 生产者
  • ACK 消费者
02 生产者确认
  • 消息到达RabbitMQ的Exchange:Exchange向生产者发送Confirm确认。成功抑或失败都会返回一个confirmCallback
  • 消息成功达到Exchange,但是从Exchange投递Queue失败:向生产者返回一个returnCallback。只有失败才会返回
03 消费者确认
  • 消费者收到消息后需要对 RabbitMQ Server 进行消息 ACK 确认,RabbitMQ 根据确认信息决定是删除队列中的该信息还是重新发送
04 代码实现 4.1 生产者确认

重点在于生产者重写下面两个方法

  • rabbitMQTemplate.setConfirmCallback
  • rabbitMQTemplate.setReturnCallback

1.开启生产者消息确认

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: root
    password: root
    #    开启两个模式的生产者消息确认
    publisher-confirm-type: simple
    publisher-returns: true

2.声明交换机、队列,绑定交换机和队列

@Configuration
public class RabbitMQConfig {

    private static final String SB_TOPIC_EXCHANGE="sb_topic_exchange";
    private static final String SB_TOPIC_QUEUE="sb_topic_queue1";

    // 注入交换机 topic类型
    @Bean("topicExchange")
    public Exchange topicExchange(){
        return ExchangeBuilder.topicExchange(SB_TOPIC_EXCHANGE).durable(true)
                .autoDelete().build();
    }
    // 声明队列
    @Bean
    public Queue queue1(){
        return QueueBuilder.durable(SB_TOPIC_QUEUE).build();
    }

    // 绑定队列和交换机
    @Bean
    public Binding exchangQueue(@Qualifier("queue1") Queue queue, @Qualifier("topicExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("user.#").noargs();
    }

3.创建消费者

@Component
@RabbitListener(queues = "sb_topic_queue1")
public class Consumer {

    @RabbitHandler
    public void testPublishConfirm(String msg) {
        System.out.println("收到的信息:"+msg);
    }
}

4.创建生产者

创建生产者发送消息到消息队列,模拟两种异常情况

@SpringBootTest
class RabiitmqSpringbootApplicationTests {

    @Autowired
    RabbitTemplate template;

    @Test
    void testConfirmTrue() {
        // 设置confirm回调函数
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {
                if (b) System.out.println("消息发送成功");
                else System.out.println("消息发送失败");
            }

        });
        // 模拟生产者发送信息--正常情况
        template.convertAndSend("sb_topic_exchange","user.info","日志级别:info;日志模块:user;日志信息:*****");
    }

    @Test
    void testConfirmFalse() {
        // 设置confirm回调函数
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {
                if (b) System.out.println("消息发送成功");
                else System.out.println("消息发送失败");
            }

        });
        // 模拟生产者发送信息
        // 不存在的交换机--异常情况
     template.convertAndSend("sb_topic_exchange_noexist","user.info","日志级别:info;日志模块:user;日志信息:*****");
    }

    @Test
    void testReturnFalse() {
        // 设置return回调函数
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, java.lang.String s, java.lang.String s1, java.lang.String s2) {
                System.out.println(message.toString());
                System.out.println(s+"*********");
            }

        });
        template.setMandatory(true);
        // 模拟生产者发送信息
        // 正确的交换机 错误的routekey -- 异常情况
     template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****");
    }

4.2 消费者确认

重点在于消费者的下面两个方法

  • channel.basicAck 消费者签收
  • channel.basicNAck 消费者拒绝签收

1.开启消费者确认模式

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: root
    password: root
#    设置消费端手动签收
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual

2.创建消费者

/**
 * 注入消费者--手动签到
 */
@Component
@RabbitListener(queues = "sb_topic_queue1")
public class Consumer2 {

    @RabbitHandler
    public void testComsumer(String msg, Channel channel, Message message) throws InterruptedException, IOException {
        // 消费端设置手动签收代码
        try {
            System.out.println(msg);
            // 正常签收,mq收到此消息被正常签收后即可从队列中删除vi信息
            // 是哟了那个channel的方法
            // 第一个参数是deliverytag 标识哪条信息 第二个参数是是否批量签收
            // int i=2/0; 模拟异常
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
            System.out.println("消费者签收了该信息,服务器你可以删了");
        }catch (Exception e){
            // 异常拒绝签收,让mq重发此信息
            System.out.println("该信息丢了,给我重发");
       channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
           // 该信息丢了,但是不需要你重发
     // channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
        }
    }
}

3.创建生产者

@SpringBootTest
class RabiitmqSpringbootApplicationTests {

    @Autowired
    RabbitTemplate template;

    @Test
    void testConsumerAck() {
     template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****");
    }
}

原文链接:https://www.cnblogs.com/sang-bit/p/14793341.html


推荐阅读
  • RabbitMq之发布确认高级部分1.为什么会需要发布确认高级部分?在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢 ... [详细]
  • 消息中间件RabbitMQ 高级特性之消费端ACK与重回队列
    什么是消费端的ACK和重回队列?消费端的手工ACK和NACK消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿如果由于服务器宕机等严重问题 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了在SpringBoot中集成thymeleaf前端模版的配置步骤,包括在application.properties配置文件中添加thymeleaf的配置信息,引入thymeleaf的jar包,以及创建PageController并添加index方法。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 目录1、将mysql数据导出到SQL文件中(数据库存在的情况)2、将现有的sql文件数据导入到数据库中(前提数据库存在) 3、利用Navicat导出SQL文件和导入SQL文件1)从 ... [详细]
  • 五、RabbitMQ Java Client基本使用详解
    JavaClient的5.x版本系列需要JDK8,用于编译和运行。在Android上,仅支持Android7.0或更高版本。4.x版本系列支持7.0之前 ... [详细]
  • 6(自)、交换机之关键字模式
    上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(error),而不存储那些警告(warnning)或者 ... [详细]
  • java多线程获取线程返回结果
    我们在使用java多线程编写相关业务代码时,往往有这样一种情况,某个线程依赖于其他线程执行结果。也就是说,我们需要在一个线程中获取另一个线程的信息。可以分为两种情况,一种是轮询,一 ... [详细]
  • PHP设置MySQL字符集的方法及使用mysqli_set_charset函数
    本文介绍了PHP设置MySQL字符集的方法,详细介绍了使用mysqli_set_charset函数来规定与数据库服务器进行数据传送时要使用的字符集。通过示例代码演示了如何设置默认客户端字符集。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 本文介绍了在MacOS系统上安装MySQL的步骤,并详细说明了如何设置MySQL服务的开机启动和如何修改MySQL的密码。通过下载MySQL的macos版本并按照提示一步一步安装,在系统偏好设置中可以找到MySQL的图标进行设置。同时,还介绍了通过终端命令来修改MySQL的密码的具体操作步骤。 ... [详细]
  • MySQL语句大全:创建、授权、查询、修改等【MySQL】的使用方法详解
    本文详细介绍了MySQL语句的使用方法,包括创建用户、授权、查询、修改等操作。通过连接MySQL数据库,可以使用命令创建用户,并指定该用户在哪个主机上可以登录。同时,还可以设置用户的登录密码。通过本文,您可以全面了解MySQL语句的使用方法。 ... [详细]
  • 如何在php文件中添加图片?
    本文详细解答了如何在php文件中添加图片的问题,包括插入图片的代码、使用PHPword在载入模板中插入图片的方法,以及使用gd库生成不同类型的图像文件的示例。同时还介绍了如何生成一个正方形文件的步骤。希望对大家有所帮助。 ... [详细]
  • 本文介绍了如何使用PHP代码将表格导出为UTF8格式的Excel文件。首先,需要连接到数据库并获取表格的列名。然后,设置文件名和文件指针,并将内容写入文件。最后,设置响应头部,将文件作为附件下载。 ... [详细]
author-avatar
董丹一只花
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有