Rabbitmq-是否在同一频道上消费和发布?

 猪猪寂寞的王子_125 发布于 2022-12-31 11:18

我有两个通过pika连接到Rabbitmq的python进程。每个主题消耗一组主题,另一个主题作为响应发布。一种使用SelectConnection,另一种使用TornadoConnection。

目前,这两个都是测试程序,它们模拟用户与服务器之间的对话,并且每个程序的on_message()都经过简单的硬编码,以分支到接收到的routing_key上,并发布相应的响应。

最初,经过一段随机的时间(通常不超过2分钟),我会收到类似以下的错误消息:

UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead

在搜索了有关堆栈溢出和其他地方的大量文章之后,我逐渐了解到此错误与竞态条件有关,竞态条件是在basic_publish完成之前消耗了一些东西。

我对代码进行了更改,因此,我不进行立即的basic_publish()而是将回调传递给connection.add_timeout(),延迟为1秒。进行此更改后,我可以进行大量运行,其中两个进程之间相互进行“对话”的时间大于1小时,而不会重现错误。

我的问题是,这仅仅是因为我在模拟一个用户而起作用?我需要两个消费和发布渠道吗?

def on_message(self, unused_channel, basic_deliver, properties, body):
    if self._sibling_app_id == properties.app_id:
        self.dispatch_message(basic_deliver, properties, body)


def dispatch_message(self, basic_deliver, properties, body):
    (user_id, msg_type) = basic_deliver.routing_key.rsplit('.', 1)

    if "login-response" == msg_type:
        print body
    elif "gid-assignment" == msg_type:
        print body
    elif "tutor-logout" == msg_type:
        print body
    elif "tutor-turn" == msg_type:
        message = "i don't know"
        routing_key = "%s.input" % user_id
        callback = self.delayed_publish_message(routing_key, message)
        self.schedule_next_message(callback, 1)
    elif "nlu" == msg_type:
        message = "dnk"
        routing_key = "%s.nlu-response" % user_id
        callback = self.delayed_publish_message(routing_key, message)
        self.schedule_next_message(callback, 1)
    else:
        print "invalid message-type: %s" % msg_type
        print body

def delayed_publish_message(self, routing_key, message):
    """returns a callback which can be passed to schedule_next_message()"""
    def delayed_publish_cb():
        self.publish_message(routing_key, message)
    return delayed_publish_cb


def schedule_next_message(self, cb, publish_interval=None):
    if self._stopping:
        return
    if publish_interval is None:
        publish_interval = self.PUBLISH_INTERVAL
    if -1 == publish_interval:
        return
    self._connection.add_timeout(publish_interval, cb)


def publish_message(self, routing_key, message):
    if self._stopping:
        return
    properties = pika.BasicProperties(app_id=self._app_id,
                                                          content_type='text/plain')
    self._channel.basic_publish(self.EXCHANGE, routing_key,
                                                 message, properties)

itsafire.. 6

通道将被单向使用。该AMQP协议规范约为说的很清楚:

AMQP会话将两个单向通道相关联,以在两个容器之间形成双向的顺序对话。单个连接可能同时具有多个活动的独立会话,直至协商的通道限制。每个对等方都将“连接”和“会话”都建模为端点,这些端点存储与所讨论的“连接”或“会话”有关的本地和上次已知的远程状态。

因此,您应该为应用程序使用输入和输出通道。

1 个回答
  • 通道将被单向使用。该AMQP协议规范约为说的很清楚:

    AMQP会话将两个单向通道相关联,以在两个容器之间形成双向的顺序对话。单个连接可能同时具有多个活动的独立会话,直至协商的通道限制。每个对等方都将“连接”和“会话”都建模为端点,这些端点存储与所讨论的“连接”或“会话”有关的本地和上次已知的远程状态。

    因此,您应该为应用程序使用输入和输出通道。

    2022-12-31 11:21 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有