这是一个漫长的过程.
我有一个用户名和密码列表.对于每一个我想登录帐户并做一些事情.我想用几台机器来更快地做到这一点.我想这样做的方法是有一台主机,其工作只是有一个cron,它不时检查rabbitmq队列是否为空.如果是,请从文件中读取用户名和密码列表,然后将其发送到rabbitmq队列.然后有一堆订阅了该队列的机器,其作业正在接收用户/通行证,在其上做东西,确认它,然后转到下一个,直到队列为空,然后主机填满它再次.到目前为止,我认为我的一切都失败了.
现在来了我的问题.我已经检查过每个用户/传递要做的事情不是那么密集,因此我可以让每台机器使用python的线程同时执行其中三个.实际上对于一台机器,我实现了这个,我将用户/传递加载到python Queue(),然后让三个线程使用Queue().现在我想做类似的事情,但不是从python Queue()中消耗,每个机器的每个线程都应该从rabbitmq队列中消耗.这就是我被困住的地方.为了运行测试,我开始使用rabbitmq的教程.
send.py:
import pika, sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') message = ' '.join(sys.argv[1:]) channel.basic_publish(exchange='', routing_key='hello', body=message) connection.close()
worker.py
import time, pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print ' [x] received %r' % (body,) time.sleep( body.count('.') ) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello', no_ack=False) channel.start_consuming()
对于上面的内容,您可以运行两个worker.py,它将订阅rabbitmq队列并按预期使用.
我没有rabbitmq的线程是这样的:
runit.py
class Threaded_do_stuff(threading.Thread): def __init__(self, user_queue): threading.Thread.__init__(self) self.user_queue = user_queue def run(self): while True: login = self.user_queue.get() do_stuff(user=login[0], pass=login[1]) self.user_queue.task_done() user_queue = Queue.Queue() for i in range(3): td = Threaded_do_stuff(user_queue) td.setDaemon(True) td.start() ## fill up the queue for user in list_users: user_queue.put(user) ## go! user_queue.join()
这也可以按预期工作:你填满队列并有3个线程订阅它.现在我想要做的是像runit.py,而不是使用python Queue(),使用像worker.py这样的队列实际上是一个rabbitmq队列.
这是我尝试过但没有用的东西(我不明白为什么)
rabbitmq_runit.py
import time, threading, pika class Threaded_worker(threading.Thread): def callback(self, ch, method, properties, body): print ' [x] received %r' % (body,) time.sleep( body.count('.') ) ch.basic_ack(delivery_tag = method.delivery_tag) def __init__(self): threading.Thread.__init__(self) self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) self.channel = self.connection.channel() self.channel.queue_declare(queue='hello') self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume(self.callback, queue='hello') def run(self): print 'start consuming' self.channel.start_consuming() for _ in range(3): print 'launch thread' td = Threaded_worker() td.setDaemon(True) td.start()
我希望这会启动三个线程,每个线程都被.start_consuming()阻塞,它只是停留在那里等待rabbitmq队列发送它们.相反,该程序启动,执行一些打印,然后退出.存在的模式也很奇怪:
launch thread launch thread start consuming launch thread start consuming
特别注意到有一个"开始消耗"缺失.
这是怎么回事?
编辑:我找到一个类似问题的答案就是在这里 使用带有多个线程(Python Kombu)的rabbitmq消息队列, 答案是"使用芹菜",无论这意味着什么.我不买,我不需要像芹菜那样复杂的东西.特别是,我不是要设置RPC,而是不需要读取do_stuff例程的回复.
编辑2:我期望的打印模式如下.我做
python send.py first message...... python send.py second message. python send.py third message. python send.py fourth message.
并且打印图案将是
launch thread start consuming [x] received 'first message......' launch thread start consuming [x] received 'second message.' launch thread start consuming [x] received 'third message.' [x] received 'fourth message.'
dano.. 14
问题是你正在制作线程守护程序:
td = Threaded_worker() td.setDaemon(True) # Shouldn't do that. td.start()
一旦主线程退出 s,守护线程将被终止:
线程可以标记为"守护程序线程".这个标志的意义在于,当只剩下守护进程线程时,整个Python程序都会退出.初始值继承自创建线程.可以通过守护程序属性设置该标志.
退出setDaemon(True)
,您应该看到它的行为符合您的预期.
此外,pika常见问题解答还有一个关于如何在线程中使用它的说明:
Pika在代码中没有任何线程概念.如果要将Pika与线程一起使用,请确保在该线程中创建每个线程的Pika连接.跨线程共享一个Pika连接是不安全的.
这表明你应该将你正在做的所有内容都移动__init__()
到这里run()
,以便在你从队列中实际消耗的同一个线程中创建连接.