将Kombu与RabbitMQ结合使用以实现经典的发布/订阅设计模式。我创建了一个创建主题的生产者:
from kombu import Connection, Exchange, Queue media_exchange = Exchange('media', 'topic', durable=False) video_queue = Queue('video', exchange=media_exchange, routing_key='video') with Connection('amqp://guest:guest@localhost//') as conn: producer = conn.Producer(serializer='json') producer.publish('Hello World!', exchange=media_exchange, routing_key='video', declare=[video_queue])
然后,我创建了一个要从发布者那里消费的消费者:
from kombu import Connection, Exchange, Queue media_exchange = Exchange('media', type='topic', durable=False) video_queue = Queue('video', exchange=media_exchange, routing_key='video') def process_media(body, message): print(body) #message.ack() with Connection('amqp://guest:guest@localhost//') as conn: with conn.Consumer(video_queue, callbacks=[process_media]) as consumer: # Process messages and handle events on all channels while True: conn.drain_events()
然后启动两个消费者,每个消费者在一个单独的终端中;两者都等待消息:
terminal 1: python consumer.py terminal 2: python consumer.py
当我运行生产者时,只有一个消费者收到该消息。
生产者在交换中发布,而不是在队列中发布。队列由使用者定义。当为每个使用者使用不同的队列名称时,所有人都会收到消息。当使用多个使用者使用同一队列时,这就是负载平衡,这就是为什么只有一个使用者可以收到消息的原因。