我是rabbitmq和pika的新手,并且在停止消费方面遇到了麻烦.
通道和队列设置:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)
基本上,消费者和生产者是这样的:
消费者:
def task(task_id): def callback(channel, method, properties, body): if body != "quit": print(body) else: print(body) channel.stop_consuming(task_id) channel.basic_consume(callback, queue=task_id, no_ack=True) channel.start_consuming() print("finish") return "finish"
制片人:
proc = Popen(['app/sample.sh'], shell=True, stdout=PIPE) while proc.returncode is None: # running line = proc.stdout.readline() if line: channel.basic_publish( exchange='', routing_key=self.request.id, body=line ) else: channel.basic_publish( exchange='', routing_key=self.request.id, body="quit" ) break
消费者task
给了我输出:
# ... output from sample.sh, as expected quit ?}q(UstatusqUSUCCESSqU tracebackqNUresultqNUtask_idqU 1419350416qUchildrenq]u.
但是,"finish"
没有打印,所以我猜它是因为channel.stop_consuming(task_id)
没有停止消费.如果是这样,那么正确的方法是什么?谢谢.
我有同样的问题.它似乎是由内部start_consuming
呼叫引起的self.connection.process_data_events(time_limit=None)
.这time_limit=None
使它挂起.
我设法通过将调用替换channel.start_consuming()
为其实现来解决此问题,黑客攻击:
while channel._consumer_infos: channel.connection.process_data_events(time_limit=1) # 1 second