热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

python学习之rabbitmq

0、讲述rabbit中各部分的含义及作用https:www.jb51.netarticle75647.htm1、rabbitMQ的安装1)在安装rabbitmq之前需

0、讲述rabbit中各部分的含义及作用

https://www.jb51.net/article/75647.htm

1、rabbitMQ的安装

1)在安装rabbitmq之前需要先安装erlang,下载地址如下:

http://www.erlang.org/downloads根据系统选择,安装按提示一直下一步就OK,安装完后,再安装rabbitmq

2、rabbitmq的下载地址:http://www.rabbitmq.com/download.html

3、rabbitmq队列

假设现在需要从武汉到北京去见一个网友,哈哈哈,先的打电话约下,然后确定路线和交通工具吧,这就是下边这段代码的实际模型

import pika
import randomconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #先打电话约下,看能否找到人
channel
= connection.channel() #确定路线
channel.queue_declare(queue
='task_queue', durable=True) #确定交通工具,而且交通工具的名称叫‘task_queue',durable = True表示就是你到了北京以后交通工具依然存在number = random.randint(1, 1000)
message
= 'hello world:{num}'.format(num = number)channel.basic_publish(exchange='', #交换机,此时没有交换机参与,所以参数为空,routing_key='task_queue', #交通工具的名称body=message, #要发送给的内容properties=pika.BasicProperties(delivery_mode=2,) #表示不管路通不通,你携带的消息都不会因为外界情况而消失)
print(" [x] Sent %r" % (message,))
connection.close()

import pika
import timehostname = 'localhost'
parameters
= pika.ConnectionParameters(hostname)
connection
= pika.BlockingConnection(parameters)channel = connection.channel()
channel.queue_declare(queue
='task_queue', durable=True)def callback(ch, method, properties, body):print(" [x] Received %r" % (body,))# time.sleep(5)print(" [x] Done")ch.basic_ack(delivery_tag=method.delivery_tag) #回调函数中需要给发布者发送的消息channel.basic_qos(prefetch_count=1)channel.basic_consume(callback, queue='task_queue', no_ack=False) #no_ack=False当消费者接到消息后,需要调用回掉函数告诉发布者,消息的接受情况
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

下边的代码是通过交换机来实现消息的发送的,具体如下:

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel
= connection.channel()channel.exchange_declare(exchange='logs', #交换机的名称为logsexchange_type='fanout')message = "info: Hello World!"
channel.basic_publish(exchange
='logs',routing_key='',body=message)
print(" [x] Sent %r" % message)
connection.close()

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel
= connection.channel()
channel.exchange_declare(exchange
='logs', #声明交换机的名称以及交换机发送消息的模式,这的名称要和publishor的交换机的名称相同,相当于publishor和consumor同时向exchange寻找,
#否则publishor和consumor相互找不到,会迷路!!exchange_type
='fanout') #这不同的版本有可能会出错,有的说的是可以写为type = 'fanout',在我的电脑上运行会出错,
#改为exchange_type = 'fanout'仍然会出错,后来运行cmd->service.msc->找到rabbitmq关闭后再启动就OK了,
#至于为啥,如果你找到了,跟我说一声,先谢谢啦
result
= channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue #得到队列的名字
channel.queue_bind(exchange
='logs',queue=queue_name) #将交换机和队列的名字绑定,也就是说exchange = ‘logs'的交换机只能公国queue = queue_name来发送消息
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()

4、rabbitmq有选择的接受消息,模型如下

 

更过相关内容详见:http://www.cnblogs.com/alex3714/articles/5248247.html

 5、client发送给指令,server根据指令运行完毕后再将结果返回给client

import pika
import uuid
class FibonacciRpcClient(object):def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))self.channel = self.connection.channel()result = self.channel.queue_declare(exclusive=True)self.callback_queue = result.method.queueself.channel.basic_consume(self.on_response, #类中定义的方法__init__()是为了建立链接,同时定义一个callback_queue,再basic_publish中传递给server端,用于存放运行的结果no_ack=True,queue=self.callback_queue) #这里的basic_consume只是声明,如果要取消息应该取callback__queue中取def on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id: #确定发的指令和收到的结果是相互对应的self.response = bodydef call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id,),body=str(n))while self.response is None:self.connection.process_data_events() #去队列中取数据,不停的循环,非阻塞版的start_consuming(),去调用basic_consume()return int(self.response)fibonacci_rpc = FibonacciRpcClient()
print(
" [x] Requesting fib(30)")
response
= fibonacci_rpc.call(30)
print(
" [.] Got %r" % response)

import pika
import timeconnection
= pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel
= connection.channel()
channel.queue_declare(queue
='rpc_queue')def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body): #method存放的是bsic_consume中读取数据routing_key,props中存放的是从basic_consume中读取的protperties的数据,body存放的是从发送过来的消息。n
= int(body)print(" [.] fib(%s)" % n)response = fib(n)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id= props.correlation_id),body=str(response))ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue
='rpc_queue')print(" [x] Awaiting RPC requests")
channel.start_consuming()

 

转:https://www.cnblogs.com/zhouzhe-blog/p/9445662.html



推荐阅读
  • 微软头条实习生分享深度学习自学指南
    本文介绍了一位微软头条实习生自学深度学习的经验分享,包括学习资源推荐、重要基础知识的学习要点等。作者强调了学好Python和数学基础的重要性,并提供了一些建议。 ... [详细]
  • 源码包安装RabbitMQ3.6
    先安装erlang依赖,也是门编程语言,下载源码包地址:https:www.erlang.orgdownloads20.1首先先安装个依赖 ... [详细]
  • EPICS Archiver Appliance存储waveform记录的尝试及资源需求分析
    本文介绍了EPICS Archiver Appliance存储waveform记录的尝试过程,并分析了其所需的资源容量。通过解决错误提示和调整内存大小,成功存储了波形数据。然后,讨论了储存环逐束团信号的意义,以及通过记录多圈的束团信号进行参数分析的可能性。波形数据的存储需求巨大,每天需要近250G,一年需要90T。然而,储存环逐束团信号具有重要意义,可以揭示出每个束团的纵向振荡频率和模式。 ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • Windows7 64位系统安装PLSQL Developer的步骤和注意事项
    本文介绍了在Windows7 64位系统上安装PLSQL Developer的步骤和注意事项。首先下载并安装PLSQL Developer,注意不要安装在默认目录下。然后下载Windows 32位的oracle instant client,并解压到指定路径。最后,按照自己的喜好对解压后的文件进行命名和压缩。 ... [详细]
  • 本文介绍了Codeforces Round #321 (Div. 2)比赛中的问题Kefa and Dishes,通过状压和spfa算法解决了这个问题。给定一个有向图,求在不超过m步的情况下,能获得的最大权值和。点不能重复走。文章详细介绍了问题的题意、解题思路和代码实现。 ... [详细]
  • RabbitMQ的消息持久化处理
    1、RabbitMQ的消息持久化处理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。2、auto ... [详细]
  • 讨伐Java多线程与高并发——MQ篇
    本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
  • RabbitMq的最终一致性分布式事务
    RabbitMq的最终一致性分布式事务使用rabbitmq的步骤1.运行安装在服务器上的rabbit服务2.在项目中安装依赖3.编写对应的配置文件4.创建对应配置并加上启动注解5. ... [详细]
  • 【Hoxton.SR1版本】Spring Cloud Stream消息驱动
    目录一、简介二、搭建消息生产者端三、搭建消息消费者端四、消息重复消费问题五、消息持久化六、总结一、简介在实际项目中,服务与服务之间的通信往往我们会采用消 ... [详细]
  • rabbitmq 为什么是15672_RabbitMQ~消息的产生和管理(15672)
    上一讲说了rabbitmq在windows环境的部署,而今天主要说一下消息在产生后,如何去查看消息,事实上,rabbitmq为我们提供了功能强大的管理插件,我们只要开启这个插件即可 ... [详细]
  • 一、RabbitMQ是什么1、MQ的主要作用是:异步、消峰、解耦2、高并发、高可用的成熟方案,支持多种消息协议,易于部署和使用Rabbit ... [详细]
  • 这是原文链接:sendingformdata许多情况下,我们使用表单发送数据到服务器。服务器处理数据并返回响应给用户。这看起来很简单,但是 ... [详细]
  • 本文记录了作者对x265开源代码的实现与框架进行学习与探索的过程,包括x265的下载地址与参考资料,以及在Win7 32 bit PC、VS2010平台上的安装与配置步骤。 ... [详细]
  • 也可以直接用#opkginstalltftpd-hpa会直接先下载再自动安装。最后用#opkglist-installed|greptftpd-hpa来查看是不是 ... [详细]
author-avatar
汶汐_782
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有