热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

简单谈谈python中的Queue与多进程

本文给大家简单总结了下再Python中的队列对象(queue)以及多进程(multiprocessing),非常的简单实用,有需要的小伙伴可以参考下

最近接触一个项目,要在多个虚拟机中运行任务,参考别人之前项目的代码,采用了多进程来处理,于是上网查了查python中的多进程

一、先说说Queue(队列对象)

Queue是python中的标准库,可以直接import 引用,之前学习的时候有听过著名的“先吃先拉”与“后吃先吐”,其实就是这里说的队列,队列的构造的时候可以定义它的容量,别吃撑了,吃多了,就会报错,构造的时候不写或者写个小于1的数则表示无限多

import Queue

q = Queue.Queue(10)

向队列中放值(put)

q.put(‘yang')

q.put(4)

q.put([‘yan','xing'])

在队列中取值get()

默认的队列是先进先出的

>>> q.get()
‘yang'
>>> q.get()
4
>>> q.get()
[‘yan', ‘xing']

当一个队列为空的时候如果再用get取则会堵塞,所以取队列的时候一般是用到

get_nowait()方法,这种方法在向一个空队列取值的时候会抛一个Empty异常

所以更常用的方法是先判断一个队列是否为空,如果不为空则取值

队列中常用的方法

Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
非阻塞 Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)

二、multiprocessing中使用子进程概念

from multiprocessing import Process

可以通过Process来构造一个子进程

p = Process(target=fun,args=(args))

再通过p.start()来启动子进程

再通过p.join()方法来使得子进程运行结束后再执行父进程

from multiprocessing import Process
import os
 
# 子进程要执行的代码
def run_proc(name):
 print 'Run child process %s (%s)...' % (name, os.getpid())
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Process(target=run_proc, args=('test',))
 print 'Process will start.'
 p.start()
 p.join()
 print 'Process end.'

三、在multiprocessing中使用pool

如果需要多个子进程时可以考虑使用进程池(pool)来管理

from multiprocessing import Pool

from multiprocessing import Pool
import os, time
 
def long_time_task(name):
 print 'Run task %s (%s)...' % (name, os.getpid())
 start = time.time()
 time.sleep(3)
 end = time.time()
 print 'Task %s runs %0.2f seconds.' % (name, (end - start))
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Pool()
 for i in range(5):
  p.apply_async(long_time_task, args=(i,))
 print 'Waiting for all subprocesses done...'
 p.close()
 p.join()
 print 'All subprocesses done.'

pool创建子进程的方法与Process不同,是通过

p.apply_async(func,args=(args))实现,一个池子里能同时运行的任务是取决你电脑的cpu数量,如我的电脑现在是有4个cpu,那会子进程task0,task1,task2,task3可以同时启动,task4则在之前的一个某个进程结束后才开始

上面的程序运行后的结果其实是按照上图中1,2,3分开进行的,先打印1,3秒后打印2,再3秒后打印3

代码中的p.close()是关掉进程池子,是不再向里面添加进程了,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

当时也可以是实例pool的时候给它定义一个进程的多少

如果上面的代码中p=Pool(5)那么所有的子进程就可以同时进行

三、多个子进程间的通信

多个子进程间的通信就要采用第一步中说到的Queue,比如有以下的需求,一个子进程向队列中写数据,另外一个进程从队列中取数据,

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value
  q.put(value)
  time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(True)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break

if __name__=='__main__':
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 pw = Process(target=write, args=(q,))
 pr = Process(target=read, args=(q,))
 # 启动子进程pw,写入:
 pw.start() 
 # 等待pw结束:
 pw.join()
 # 启动子进程pr,读取:
 pr.start()
 pr.join()
 # pr进程里是死循环,无法等待其结束,只能强行终止:
 print
 print '所有数据都写入并且读完'


四、关于上面代码的几个有趣的问题

if __name__=='__main__': 
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

如果main函数写成上面的样本,本来我想要的是将会得到一个队列,将其作为参数传入进程池子里的每个子进程,但是却得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的错误,查了下,大意是队列对象不能在父进程与子进程间通信,这个如果想要使用进程池中使用队列则要使用multiprocess的Manager类

if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,))
 time.sleep(0.5)
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

这样这个队列对象就可以在父进程与子进程间通信,不用池则不需要Manager,以后再扩展multiprocess中的Manager类吧

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

#coding:gbk
 
from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random
 
# 写数据进程执行的代码:
def write(q,lock):
 lock.acquire() #加上锁
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value  
  q.put(value)  
 lock.release() #释放锁 
 
# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(False)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break
 
if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 lock = manager.Lock() #初始化一把锁
 p = Pool()
 pw = p.apply_async(write,args=(q,lock)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'


推荐阅读
  • YOLOv7基于自己的数据集从零构建模型完整训练、推理计算超详细教程
    本文介绍了关于人工智能、神经网络和深度学习的知识点,并提供了YOLOv7基于自己的数据集从零构建模型完整训练、推理计算的详细教程。文章还提到了郑州最低生活保障的话题。对于从事目标检测任务的人来说,YOLO是一个熟悉的模型。文章还提到了yolov4和yolov6的相关内容,以及选择模型的优化思路。 ... [详细]
  • 安装mysqlclient失败解决办法
    本文介绍了在MAC系统中,使用django使用mysql数据库报错的解决办法。通过源码安装mysqlclient或将mysql_config添加到系统环境变量中,可以解决安装mysqlclient失败的问题。同时,还介绍了查看mysql安装路径和使配置文件生效的方法。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文介绍了Python异常的捕获、传递与抛出操作,并提供了相关的操作示例。通过异常的捕获和传递,可以有效处理程序中的错误情况。同时,还介绍了如何主动抛出异常。通过本文的学习,读者可以掌握Python中异常处理的基本方法和技巧。 ... [详细]
  • Java实战之电影在线观看系统的实现
    本文介绍了Java实战之电影在线观看系统的实现过程。首先对项目进行了简述,然后展示了系统的效果图。接着介绍了系统的核心代码,包括后台用户管理控制器、电影管理控制器和前台电影控制器。最后对项目的环境配置和使用的技术进行了说明,包括JSP、Spring、SpringMVC、MyBatis、html、css、JavaScript、JQuery、Ajax、layui和maven等。 ... [详细]
  • 本文是一位90后程序员分享的职业发展经验,从年薪3w到30w的薪资增长过程。文章回顾了自己的青春时光,包括与朋友一起玩DOTA的回忆,并附上了一段纪念DOTA青春的视频链接。作者还提到了一些与程序员相关的名词和团队,如Pis、蛛丝马迹、B神、LGD、EHOME等。通过分享自己的经验,作者希望能够给其他程序员提供一些职业发展的思路和启示。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • Python字典推导式及循环列表生成字典方法
    本文介绍了Python中使用字典推导式和循环列表生成字典的方法,包括通过循环列表生成相应的字典,并给出了执行结果。详细讲解了代码实现过程。 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • Centos7.6安装Gitlab教程及注意事项
    本文介绍了在Centos7.6系统下安装Gitlab的详细教程,并提供了一些注意事项。教程包括查看系统版本、安装必要的软件包、配置防火墙等步骤。同时,还强调了使用阿里云服务器时的特殊配置需求,以及建议至少4GB的可用RAM来运行GitLab。 ... [详细]
  • 本文介绍了使用Python根据字典中的值进行排序的方法,并给出了实验结果。通过将字典转化为记录项,可以按照字典中的值进行排序操作。实验结果显示,按照值进行排序后的记录项为[('b', 2), ('a', 3)]。 ... [详细]
  • Python如何调用类里面的方法
    本文介绍了在Python中调用同一个类中的方法需要加上self参数,并且规范写法要求每个函数的第一个参数都为self。同时还介绍了如何调用另一个类中的方法。详细内容请阅读剩余部分。 ... [详细]
  • Python语法上的区别及注意事项
    本文介绍了Python2x和Python3x在语法上的区别,包括print语句的变化、除法运算结果的不同、raw_input函数的替代、class写法的变化等。同时还介绍了Python脚本的解释程序的指定方法,以及在不同版本的Python中如何执行脚本。对于想要学习Python的人来说,本文提供了一些注意事项和技巧。 ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • 本文介绍了如何使用Python正则表达式匹配MATLAB的函数语法,包括多行匹配和跨行签名的处理方法。同时,作者还分享了自己遇到的问题和解决方案。 ... [详细]
author-avatar
好人___夏洁
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有