热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

rabbitMQ的消息可靠性投递,手动确认,消费端限流,队列过期时间的实现

1、消息可靠性投递当生产者向交换机发送消息的时候,可能会发生消息泄露。比如:当交换器重启的时候,生产者这事向交换机发送消息,
1、消息可靠性投递

在这里插入图片描述

  • 当生产者向交换机发送消息的时候,可能会发生消息泄露。比如:当交换器重启的时候,生产者这事向交换机发送消息,交换机没有接受到消息,那么消息就会被丢失
  • 当交换机向队列发送消息的时候,也可能发生消息的泄露。

为了确保消息的可靠性传递,提供了两种方式:

  • confirm 确认模式
  • return 返回模式

实现的代码如下:

  • 启动了中的配置:

server:port: 8080spring:rabbitmq:# rabbitmq的ip地址host: 192.168.31.70# 开启确认模式,默认为nonepublisher-confirm-type: correlated# 开启返回模式,默认为falsepublisher-returns: true

  • 测试类

@RestController
public class ProductController {&#64;Autowiredprivate RabbitTemplate rabbitTemplate;&#64;GetMapping("/hello")public String hello(){// 生产者向交换机传递消息的时候会走这个方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {&#64;Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// ack表示传递的消息是否收到&#xff0c;false为未收到&#xff0c;true为收到if (!ack){System.out.println("aaaaaa");}}});// 交换机向队列传递消息的时候会走这个方法rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {//ReturnedMessage里面包含传递的消息内容、交换机的信息、路由key的信息&#64;Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println(returned);}});Map map &#61; new HashMap();map.put("id","1");map.put("num","10");String s &#61; JSON.toJSONString(map);for (int i &#61; 0; i < 10; i&#43;&#43;) {rabbitTemplate.convertAndSend("rabbit_exchange_Topic","aaa.orange.rabbit",s);}return "下单成功";}
}

2、Consumer ACK

ACK&#xff1a;表示消费者收到消息后的确认方式

  • 自动确认&#xff1a;一旦消息被消费者收到&#xff0c;则自动确认收到消息&#xff0c;并将信息从队列中移除
  • 手动确认&#xff1a;需要调用channel.basicAck()手动签收&#xff0c;如果出现异常&#xff0c;则调用channel.basicNack()方法&#xff0c;让自动重新发送消息

在实际开发过程中&#xff0c;如果在消费者消费消息的时候&#xff0c;业务处理的时候&#xff0c;出现异常&#xff0c;那么该消息就可能丢失。需要使用手动模式&#xff0c;在业务处理完之后再使用channel.basicAck()方法手动签收。

代码实现&#xff1a;

  • 配置文件

server:port: 8082
spring:rabbitmq:host: 192.168.31.70listener:simple:# 开启手动确认模式acknowledge-mode: manual

  • 代码测试&#xff1a;

&#64;RestController
public class ConsumerOneController {&#64;RabbitListener(queues &#61; {"rabbit_hello_topics_one"})public void test(Message message , Channel channel){long deliveryTag &#61; message.getMessageProperties().getDeliveryTag();byte[] body &#61; message.getBody();System.out.println(new String(body));try{//long deliveryTag, 消息发送的标志// boolean multiple 是否允许多确认&#xff0c;指的是&#xff0c;如果之前有消息没有被确认&#xff0c;那么这里设置为true&#xff0c;就可以将之前的消息一并确认System.out.println("业务逻辑");
// channel.basicAck(deliveryTag,true);}catch (Exception e){//long deliveryTag,// boolean multiple,// boolean requeue 是否允许队列重新发布信息try {channel.basicNack(deliveryTag,true,true);} catch (IOException ex) {ex.printStackTrace();}}}
}

3、消费端限流

服务器消费的信息是有限制的&#xff0c;假如队列里面有10000条信息&#xff0c;这10000条信息全部被消费者一次性消费&#xff0c;可能会导致消费者直接宕机

  • 必须设置为手动确认
  • 必须配置限流的个数
  • 配置文件如下&#xff1a;

spring:rabbitmq:host: 192.168.213.188listener:simple:#表示手动确认acknowledge-mode: manual# 表示自动确认模式# acknowledge-mode: none# 设置每次消费的个数。prefetch: 5

  • 测试如下&#xff1a;
  • 我们可以将手动设置为不确认&#xff0c;看是否只是收到5条信息

&#64;RestController
public class ConsumerOneController {&#64;RabbitListener(queues &#61; {"rabbit_hello_topics_one"})public void test(Message message , Channel channel){long deliveryTag &#61; message.getMessageProperties().getDeliveryTag();byte[] body &#61; message.getBody();System.out.println(new String(body));try{//long deliveryTag, 消息发送的标志// boolean multiple 是否允许多确认System.out.println("业务逻辑");//channel.basicAck(deliveryTag,true);}catch (Exception e){//long deliveryTag,// boolean multiple,// boolean requeue 是否允许队列重新发布信息try {channel.basicNack(deliveryTag,true,true);} catch (IOException ex) {ex.printStackTrace();}}}
}

4、TTL

  • 1、可以设置队列的过期时间&#xff0c;所有放到该队列中的消息&#xff0c;只要时间过了就直接消失&#xff0c;并且该消息必须在头部
  • 2、给消息设置过期时间&#xff0c;该消息时间到了之后&#xff0c;必须在队列的头部才能消失

代码测试&#xff1a;

//为队列设置过期时间 相当于该队列里面的消息都由过期时间&#64;Testpublic void testSend(){rabbitTemplate.convertAndSend("myexchange","","hello xiaoxi");}//设置消息的过期时间 如果由设置了队列的过期时间 也设置了消息的过期时间 谁的过期时间短 以谁为准。//该消息必须在头部才能从队列中移除。&#64;Testpublic void testSend02(){for(int i&#61;0;i<10;i&#43;&#43;) {if(i&#61;&#61;3){MessagePostProcessor messagePostProcessor &#61; new MessagePostProcessor() {&#64;Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置消息的过期时间 message.getMessageProperties().setExpiration("20000");return message;}};//String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessorrabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"&#43;i, messagePostProcessor);}else {//String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessorrabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"&#43;i);}}}

5、通过代码的方式去创建队列以及绑定

&#64;Configuration
public class RabbitConfig {private final static String EXCHANEG_NAME&#61;"kaiwanxiao";private final static String QUEUE_NAME&#61;"xiaokai";&#64;Beanpublic Exchange getExchange(){Exchange build &#61; ExchangeBuilder.directExchange(EXCHANEG_NAME).durable(true).autoDelete().build();return build;}&#64;Beanpublic Queue getQueue(){Queue build &#61; QueueBuilder.durable(QUEUE_NAME).autoDelete().withArgument("x-message-ttl", 20000).build();return build;}&#64;Beanpublic Binding getBinding(Queue queue ,Exchange exchange){Binding info &#61; BindingBuilder.bind(queue).to(exchange).with("info").noargs();return info;}}


推荐阅读
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • Hibernate延迟加载深入分析-集合属性的延迟加载策略
    本文深入分析了Hibernate延迟加载的机制,特别是集合属性的延迟加载策略。通过延迟加载,可以降低系统的内存开销,提高Hibernate的运行性能。对于集合属性,推荐使用延迟加载策略,即在系统需要使用集合属性时才从数据库装载关联的数据,避免一次加载所有集合属性导致性能下降。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 标题: ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 本文介绍了pack布局管理器在Perl/Tk中的使用方法及注意事项。通过调用pack()方法,可以控制部件在显示窗口中的位置和大小。同时,本文还提到了在使用pack布局管理器时,应注意将部件分组以便在水平和垂直方向上进行堆放。此外,还介绍了使用Frame部件或Toplevel部件来组织部件在窗口内的方法。最后,本文强调了在使用pack布局管理器时,应避免在中间切换到grid布局管理器,以免造成混乱。 ... [详细]
  • 开发笔记:spring boot项目打成war包部署到服务器的步骤与注意事项
    本文介绍了将spring boot项目打成war包并部署到服务器的步骤与注意事项。通过本文的学习,读者可以了解到如何将spring boot项目打包成war包,并成功地部署到服务器上。 ... [详细]
  • 从相邻元素对还原数组的解题思路和代码
    本文介绍了从相邻元素对还原数组的解题思路和代码。思路是使用HashMap存放邻接关系,并找出起始点,然后依次取。代码使用了HashMap来存放起始点所在的adjacentPairs中的位置,并对重复的起始点进行处理。 ... [详细]
  • 微信官方授权及获取OpenId的方法,服务器通过SpringBoot实现
    主要步骤:前端获取到code(wx.login),传入服务器服务器通过参数AppID和AppSecret访问官方接口,获取到OpenId ... [详细]
author-avatar
傻要傻到嗨样
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有