如何在RabbitMQ中重新排队消息

 手机用户2502892557 发布于 2023-01-11 08:47

在消费者获得消息之后,消费者/工作者进行一些验证然后调用Web服务.在此阶段,如果发生任何错误或验证失败,我们希望将消息放回最初使用的队列.

我读过RabbitMQ文档.但我对reject,nack和cancel方法之间的区别感到困惑.

1 个回答
  • 简短回答:

    要重新排列特定消息,您可以选择两者basic.rejectbasic.nackmultiple标志设置为false.

    basic.consume 如果您正在使用消息确认,并且在特定时间和消费者退出消费者退出时消费者有未确认消息,则调用也可能导致重新传递消息.

    basic.recover 将在特定频道上重新发送所有未发送的消息.

    答案很长:

    basic.reject并且basic.nack两者都用于相同的目的 - 丢弃或重新排列特定消费者无法处理的消息(在给定时刻,在某些条件下或根本不可能).它们之间的主要区别在于basic.nack支持批量消息处理,basic.reject而不支持.

    官方RabbitMQ网站上的否定致谢文章中描述了这种差异:

    AMQP规范定义了basic.reject允许客户拒绝单个传递的消息,指示代理丢弃它们或重新排队它们的方法.遗憾的是,basic.reject不支持批量负面确认消息.

    为了解决这个问题,RabbitMQ支持basic.nack提供所有功能的方法,basic.reject同时还允许批量处理消息.

    要批量拒绝邮件,客户端会将方法的multiple标志设置basic.nacktrue.然后,代理将拒绝所有未确认的,已传递的消息,包括delivery_tagbasic.nack方法字段中指定的消息.在这方面,basic.nack补充了批量确认语义basic.ack.

    注意,该basic.nack方法是RabbitMQ特定的扩展,而basic.reject方法是AMQP 0.9.1规范的一部分.

    至于basic.cancel方法,它用于通知服务器客户端停止消息消费.注意,客户端可以在basic.cancel发送接收cancel-ok回复的方法之间接收任意消息号.如果客户端使用了消息确认,并且它有任何未确认的消息,则它们将被移回到最初消耗的队列中.

    basic.recover在RabbitMQ中有一些限制:它 - basic.recover with requeue = false - basic.recover synchronicity

    除了勘误表,根据RabbitMQ规格 basic.recover有部分支持(不支持Recovery with requeue = false.)

    注意事项basic.consume:

    basic.consume没有auto-ack(no­ack=false)的情况下启动并且有一些待处理的消息非acked消息,那么当消费者被取消(死亡,致命错误,异常,等等)时,将重新传递未决消息.从技术上讲,在消费者释放它们(ack/nack/reject/recover)之前,不会处理这些待处理的消息(甚至是死信函).只有在那之后它们才会被处理(例如,破坏).

    例如,假设我们连续发布了5条消息:

    Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
    

    然后消耗它们中的3个,但不要它们,然后取消消费者.我们会遇到这种情况:

    Queue(main) (tail) { [4] [3] [2*] [1*] [0*] } (head)
    

    其中star(*)注意到redelivered标志设置为true.

    假设我们有死信的交换集和死信函的队列的情况

    Exchange(e-main)                                   Exchange(e-dead) 
      Queue(main){x-dead-letter-exchange: "e-dead"}       Queue(dead) 
    

    并假设我们发布了5条消息,其expire属性设置为5000(5秒):

    Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
    Queue(dead) (tail) { }(head)
    

    然后我们从main队列中消耗3条消息并保持10秒钟:

    Queue(main) (tail) { [2!] [1!] [0!] } (head)
    Queue(dead) (tail) { [4*] [3*] } (head)
    

    其中感叹号(!)代表未包装的消息.此类消息无法传递给任何消费者,通常无法在管理面板中查看.但是,让我们取消消费者,请记住,它仍然保留3条未确认的消息:

    Queue(main) (tail) { } (head)
    Queue(dead) (tail) { [2*] [1*] [0*] [4*] [3*] } (head)
    

    所以现在头部中的3条消息被放回到原始队列中,但由于它们具有每个消息的TTL设置,因此它们已经死信至死信队列的尾部(当然,通过死信交换).

    PS:

    消费者称为侦听新消息与直接队列访问有所不同(获取一个或多个消息而不关心其他消息).请参阅basic.get方法说明了解更多信

    2023-01-11 08:54 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有