pika,stop_consuming不起作用

 小志19841126_466 发布于 2022-12-04 02:01

我是rabbitmq和pika的新手,并且在停止消费方面遇到了麻烦.

通道和队列设置:

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)

基本上,消费者和生产者是这样的:

消费者:

def task(task_id):
    def callback(channel, method, properties, body):
        if body != "quit":
            print(body)
        else:
            print(body)
            channel.stop_consuming(task_id)

    channel.basic_consume(callback, queue=task_id, no_ack=True)
    channel.start_consuming()
    print("finish")
    return "finish"

制片人:

proc = Popen(['app/sample.sh'], shell=True, stdout=PIPE)
while proc.returncode is None:  # running
    line = proc.stdout.readline()
    if line:
        channel.basic_publish(
            exchange='',
            routing_key=self.request.id,
            body=line
        )
    else:
        channel.basic_publish(
            exchange='',
            routing_key=self.request.id,
            body="quit"
        )
        break

消费者task给了我输出:

# ... output from sample.sh, as expected

quit
?}q(UstatusqUSUCCESSqU  tracebackqNUresultqNUtask_idqU
1419350416qUchildrenq]u.

但是,"finish"没有打印,所以我猜它是因为channel.stop_consuming(task_id)没有停止消费.如果是这样,那么正确的方法是什么?谢谢.

1 个回答
  • 我有同样的问题.它似乎是由内部start_consuming呼叫引起的self.connection.process_data_events(time_limit=None).这time_limit=None使它挂起.

    我设法通过将调用替换channel.start_consuming()为其实现来解决此问题,黑客攻击:

    while channel._consumer_infos:
        channel.connection.process_data_events(time_limit=1) # 1 second
    

    2022-12-11 02:10 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有