我有两个通过pika连接到Rabbitmq的python进程。每个主题消耗一组主题,另一个主题作为响应发布。一种使用SelectConnection,另一种使用TornadoConnection。
目前,这两个都是测试程序,它们模拟用户与服务器之间的对话,并且每个程序的on_message()都经过简单的硬编码,以分支到接收到的routing_key上,并发布相应的响应。
最初,经过一段随机的时间(通常不超过2分钟),我会收到类似以下的错误消息:
UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead
在搜索了有关堆栈溢出和其他地方的大量文章之后,我逐渐了解到此错误与竞态条件有关,竞态条件是在basic_publish完成之前消耗了一些东西。
我对代码进行了更改,因此,我不进行立即的basic_publish()而是将回调传递给connection.add_timeout(),延迟为1秒。进行此更改后,我可以进行大量运行,其中两个进程之间相互进行“对话”的时间大于1小时,而不会重现错误。
我的问题是,这仅仅是因为我在模拟一个用户而起作用?我需要两个消费和发布渠道吗?
def on_message(self, unused_channel, basic_deliver, properties, body): if self._sibling_app_id == properties.app_id: self.dispatch_message(basic_deliver, properties, body) def dispatch_message(self, basic_deliver, properties, body): (user_id, msg_type) = basic_deliver.routing_key.rsplit('.', 1) if "login-response" == msg_type: print body elif "gid-assignment" == msg_type: print body elif "tutor-logout" == msg_type: print body elif "tutor-turn" == msg_type: message = "i don't know" routing_key = "%s.input" % user_id callback = self.delayed_publish_message(routing_key, message) self.schedule_next_message(callback, 1) elif "nlu" == msg_type: message = "dnk" routing_key = "%s.nlu-response" % user_id callback = self.delayed_publish_message(routing_key, message) self.schedule_next_message(callback, 1) else: print "invalid message-type: %s" % msg_type print body def delayed_publish_message(self, routing_key, message): """returns a callback which can be passed to schedule_next_message()""" def delayed_publish_cb(): self.publish_message(routing_key, message) return delayed_publish_cb def schedule_next_message(self, cb, publish_interval=None): if self._stopping: return if publish_interval is None: publish_interval = self.PUBLISH_INTERVAL if -1 == publish_interval: return self._connection.add_timeout(publish_interval, cb) def publish_message(self, routing_key, message): if self._stopping: return properties = pika.BasicProperties(app_id=self._app_id, content_type='text/plain') self._channel.basic_publish(self.EXCHANGE, routing_key, message, properties)
itsafire.. 6
通道将被单向使用。该AMQP协议规范约为说的很清楚:
AMQP会话将两个单向通道相关联,以在两个容器之间形成双向的顺序对话。单个连接可能同时具有多个活动的独立会话,直至协商的通道限制。每个对等方都将“连接”和“会话”都建模为端点,这些端点存储与所讨论的“连接”或“会话”有关的本地和上次已知的远程状态。
因此,您应该为应用程序使用输入和输出通道。
通道将被单向使用。该AMQP协议规范约为说的很清楚:
AMQP会话将两个单向通道相关联,以在两个容器之间形成双向的顺序对话。单个连接可能同时具有多个活动的独立会话,直至协商的通道限制。每个对等方都将“连接”和“会话”都建模为端点,这些端点存储与所讨论的“连接”或“会话”有关的本地和上次已知的远程状态。
因此,您应该为应用程序使用输入和输出通道。