多处理是python中一个强大的工具,我想更深入地理解它.我想知道何时使用常规 锁和队列以及何时使用多处理管理器在所有进程之间共享这些.
我提出了以下测试场景,其中包含四种不同的多处理条件:
使用池和NO管理器
使用池和管理器
使用单个流程和NO Manager
使用单个进程和Manager
所有条件都执行作业功能the_job
.the_job
由一些由锁固定的印刷组成.此外,函数的输入只是放入队列(以查看它是否可以从队列中恢复).该输入是一个简单的索引idx
从range(10)
在称为主脚本创建start_scenario
(在底部示出).
def the_job(args): """The job for multiprocessing. Prints some stuff secured by a lock and finally puts the input into a queue. """ idx = args[0] lock = args[1] queue=args[2] lock.acquire() print 'I' print 'was ' print 'here ' print '!!!!' print '1111' print 'einhundertelfzigelf\n' who= ' By run %d \n' % idx print who lock.release() queue.put(idx)
条件的成功定义为完全从队列中调用输入,请参见read_queue
底部的函数.
条件1和2是相当不言自明的.条件1涉及创建锁和队列,并将这些传递给进程池:
def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. FAILS! """ mypool = mp.Pool(ncores) lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) mypool.imap(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue)
(帮助函数make_iterator
在本文的底部给出.)条件1失败RuntimeError: Lock objects should only be shared between processes through inheritance
.
条件2非常相似,但现在锁和队列在经理的监督下:
def scenario_2_pool_manager(jobfunc, args, ncores): """Runs a pool of processes WITH a Manager for the lock and queue. SUCCESSFUL! """ mypool = mp.Pool(ncores) lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) mypool.imap(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue)
在条件3中,手动启动新进程,并在没有管理器的情况下创建锁和队列:
def scenario_3_single_processes_no_manager(jobfunc, args, ncores): """Runs an individual process for every task WITHOUT a Manager, SUCCESSFUL! """ lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue)
条件4类似但现在再次使用经理:
def scenario_4_single_processes_manager(jobfunc, args, ncores): """Runs an individual process for every task WITH a Manager, SUCCESSFUL! """ lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue)
在这两个条件中 - 3和4 - 我为10个任务中的每个任务启动一个新流程,the_job
其中最多ncores进程同时运行.这是通过以下辅助函数实现的:
def do_job_single_processes(jobfunc, iterator, ncores): """Runs a job function by starting individual processes for every task. At most `ncores` processes operate at the same time :param jobfunc: Job to do :param iterator: Iterator over different parameter settings, contains a lock and a queue :param ncores: Number of processes operating at the same time """ keep_running=True process_dict = {} # Dict containing all subprocees while len(process_dict)>0 or keep_running: terminated_procs_pids = [] # First check if some processes did finish their job for pid, proc in process_dict.iteritems(): # Remember the terminated processes if not proc.is_alive(): terminated_procs_pids.append(pid) # And delete these from the process dict for terminated_proc in terminated_procs_pids: process_dict.pop(terminated_proc) # If we have less active processes than ncores and there is still # a job to do, add another process if len(process_dict) < ncores and keep_running: try: task = iterator.next() proc = mp.Process(target=jobfunc, args=(task,)) proc.start() process_dict[proc.pid]=proc except StopIteration: # All tasks have been started keep_running=False time.sleep(0.1)
只有条件1失败(RuntimeError: Lock objects should only be shared between processes through inheritance
)而其他3个条件成功.我试图围绕这个结果.
为什么池需要在所有进程之间共享锁和队列,但条件3中的各个进程不需要?
我所知道的是,对于池条件(1和2),来自迭代器的所有数据都通过酸洗传递,而在单个进程条件(3和4)中,来自迭代器的所有数据都是通过主进程的继承传递的(我是使用Linux).我想直到从子进程内部更改内存,访问父进程使用的相同内存(写时复制).但是只要一个人说lock.acquire()
,这应该改变,并且子进程确实使用放在内存中其他位置的不同锁,不是吗?一个子进程如何知道兄弟已经激活了一个不通过管理员共享的锁?
最后,有点相关的是我的问题,有多少不同的条件3和4.两者都有单独的流程,但它们在经理的使用上有所不同.两者都被认为是有效的代码吗?或者,如果实际上不需要经理,应该避免使用经理吗?
对于那些只想复制并粘贴所有代码以执行代码的人来说,这里是完整的脚本:
__author__ = 'Me and myself' import multiprocessing as mp import time def the_job(args): """The job for multiprocessing. Prints some stuff secured by a lock and finally puts the input into a queue. """ idx = args[0] lock = args[1] queue=args[2] lock.acquire() print 'I' print 'was ' print 'here ' print '!!!!' print '1111' print 'einhundertelfzigelf\n' who= ' By run %d \n' % idx print who lock.release() queue.put(idx) def read_queue(queue): """Turns a qeue into a normal python list.""" results = [] while not queue.empty(): result = queue.get() results.append(result) return results def make_iterator(args, lock, queue): """Makes an iterator over args and passes the lock an queue to each element.""" return ((arg, lock, queue) for arg in args) def start_scenario(scenario_number = 1): """Starts one of four multiprocessing scenarios. :param scenario_number: Index of scenario, 1 to 4 """ args = range(10) ncores = 3 if scenario_number==1: result = scenario_1_pool_no_manager(the_job, args, ncores) elif scenario_number==2: result = scenario_2_pool_manager(the_job, args, ncores) elif scenario_number==3: result = scenario_3_single_processes_no_manager(the_job, args, ncores) elif scenario_number==4: result = scenario_4_single_processes_manager(the_job, args, ncores) if result != args: print 'Scenario %d fails: %s != %s' % (scenario_number, args, result) else: print 'Scenario %d successful!' % scenario_number def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. FAILS! """ mypool = mp.Pool(ncores) lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) mypool.map(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue) def scenario_2_pool_manager(jobfunc, args, ncores): """Runs a pool of processes WITH a Manager for the lock and queue. SUCCESSFUL! """ mypool = mp.Pool(ncores) lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) mypool.map(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue) def scenario_3_single_processes_no_manager(jobfunc, args, ncores): """Runs an individual process for every task WITHOUT a Manager, SUCCESSFUL! """ lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue) def scenario_4_single_processes_manager(jobfunc, args, ncores): """Runs an individual process for every task WITH a Manager, SUCCESSFUL! """ lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue) def do_job_single_processes(jobfunc, iterator, ncores): """Runs a job function by starting individual processes for every task. At most `ncores` processes operate at the same time :param jobfunc: Job to do :param iterator: Iterator over different parameter settings, contains a lock and a queue :param ncores: Number of processes operating at the same time """ keep_running=True process_dict = {} # Dict containing all subprocees while len(process_dict)>0 or keep_running: terminated_procs_pids = [] # First check if some processes did finish their job for pid, proc in process_dict.iteritems(): # Remember the terminated processes if not proc.is_alive(): terminated_procs_pids.append(pid) # And delete these from the process dict for terminated_proc in terminated_procs_pids: process_dict.pop(terminated_proc) # If we have less active processes than ncores and there is still # a job to do, add another process if len(process_dict) < ncores and keep_running: try: task = iterator.next() proc = mp.Process(target=jobfunc, args=(task,)) proc.start() process_dict[proc.pid]=proc except StopIteration: # All tasks have been started keep_running=False time.sleep(0.1) def main(): """Runs 1 out of 4 different multiprocessing scenarios""" start_scenario(1) if __name__ == '__main__': main()
dano.. 32
multiprocessing.Lock
使用OS提供的Semaphore对象实现.在Linux上,子进程从父进程继承了Semaphore的句柄os.fork
.这不是信号量的副本; 它实际上是继承父项所具有的相同句柄,与继承文件描述符的方式相同.另一方面,Windows不支持os.fork
,所以它必须腌制Lock
.它通过multiprocessing.Lock
使用Windows DuplicateHandle
API 创建对象内部使用的Windows信号量的重复句柄来实现此目的,该API声明:
重复句柄引用与原始句柄相同的对象.因此,对象的任何更改都通过两个句柄反映出来
该DuplicateHandle
API允许你给复制的句柄子进程的所有权,这样孩子进程可以取储存之后使用它.通过创建子项拥有的重复句柄,您可以有效地"共享"锁定对象.
这是信号量对象 multiprocessing/synchronize.py
class SemLock(object): def __init__(self, kind, value, maxvalue): sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) debug('created semlock with handle %s' % sl.handle) self._make_methods() if sys.platform != 'win32': def _after_fork(obj): obj._semlock._after_fork() register_after_fork(self, _after_fork) def _make_methods(self): self.acquire = self._semlock.acquire self.release = self._semlock.release self.__enter__ = self._semlock.__enter__ self.__exit__ = self._semlock.__exit__ def __getstate__(self): # This is called when you try to pickle the `Lock`. assert_spawning(self) sl = self._semlock return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) def __setstate__(self, state): # This is called when unpickling a `Lock` self._semlock = _multiprocessing.SemLock._rebuild(*state) debug('recreated blocker with handle %r' % state[0]) self._make_methods()
注意assert_spawning
调用in __getstate__
,在pickle对象时调用它.这是如何实现的:
# # Check that the current thread is spawning a child process # def assert_spawning(self): if not Popen.thread_is_spawning(): raise RuntimeError( '%s objects should only be shared between processes' ' through inheritance' % type(self).__name__ )
该函数是Lock
通过调用确保你"继承"的函数thread_is_spawning
.在Linux上,该方法只返回False
:
@staticmethod def thread_is_spawning(): return False
这是因为Linux不需要pickle来继承Lock
,所以如果__getstate__
实际上是在Linux上调用,我们就不能继承.在Windows上,还有更多内容:
def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) class Popen(object): ''' Start a subprocess to run the code of a process object ''' _tls = thread._local() def __init__(self, process_obj): ... # send information to child prep_data = get_preparation_data(process_obj._name) to_child = os.fdopen(wfd, 'wb') Popen._tls.process_handle = int(hp) try: dump(prep_data, to_child, HIGHEST_PROTOCOL) dump(process_obj, to_child, HIGHEST_PROTOCOL) finally: del Popen._tls.process_handle to_child.close() @staticmethod def thread_is_spawning(): return getattr(Popen._tls, 'process_handle', None) is not None
这里,如果对象具有属性,则thread_is_spawning
返回.我们可以看到创建了属性,然后我们想要继承的数据从父级传递给子级,然后删除属性.所以只会在期间.根据这个python-ideas邮件列表线程,这实际上是一个人为限制,用于模拟与Linux 相同的行为.Windows实际上可以随时支持传递,因为可以随时运行.True
Popen._tls
process_handle
process_handle
__init__
dump
thread_is_spawning
True
__init__
os.fork
Lock
DuplicateHandle
所有上述内容都适用于该Queue
对象,因为它在Lock
内部使用.
我会说继承Lock
对象比使用a更好Manager.Lock()
,因为当你使用a时Manager.Lock
,你所做的每一次调用Lock
必须通过IPC发送到Manager
进程,这比使用Lock
在调用中的共享要慢得多处理.但是,这两种方法都是完全有效的.
最后,可以使用/ keyword参数将a传递给a Lock
的所有成员Pool
而不使用a :Manager
initializer
initargs
lock = None def initialize_lock(l): global lock lock = l def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. """ lock = mp.Lock() mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,)) queue = mp.Queue() iterator = make_iterator(args, queue) mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly. mypool.close() mypool.join() return read_queue(queue)
这是有效的,因为传递的参数传递给在其中运行的对象initargs
的__init__
方法,因此它们最终被继承而不是被pickle.Process
Pool
multiprocessing.Lock
使用OS提供的Semaphore对象实现.在Linux上,子进程从父进程继承了Semaphore的句柄os.fork
.这不是信号量的副本; 它实际上是继承父项所具有的相同句柄,与继承文件描述符的方式相同.另一方面,Windows不支持os.fork
,所以它必须腌制Lock
.它通过multiprocessing.Lock
使用Windows DuplicateHandle
API 创建对象内部使用的Windows信号量的重复句柄来实现此目的,该API声明:
重复句柄引用与原始句柄相同的对象.因此,对象的任何更改都通过两个句柄反映出来
该DuplicateHandle
API允许你给复制的句柄子进程的所有权,这样孩子进程可以取储存之后使用它.通过创建子项拥有的重复句柄,您可以有效地"共享"锁定对象.
这是信号量对象 multiprocessing/synchronize.py
class SemLock(object): def __init__(self, kind, value, maxvalue): sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) debug('created semlock with handle %s' % sl.handle) self._make_methods() if sys.platform != 'win32': def _after_fork(obj): obj._semlock._after_fork() register_after_fork(self, _after_fork) def _make_methods(self): self.acquire = self._semlock.acquire self.release = self._semlock.release self.__enter__ = self._semlock.__enter__ self.__exit__ = self._semlock.__exit__ def __getstate__(self): # This is called when you try to pickle the `Lock`. assert_spawning(self) sl = self._semlock return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) def __setstate__(self, state): # This is called when unpickling a `Lock` self._semlock = _multiprocessing.SemLock._rebuild(*state) debug('recreated blocker with handle %r' % state[0]) self._make_methods()
注意assert_spawning
调用in __getstate__
,在pickle对象时调用它.这是如何实现的:
# # Check that the current thread is spawning a child process # def assert_spawning(self): if not Popen.thread_is_spawning(): raise RuntimeError( '%s objects should only be shared between processes' ' through inheritance' % type(self).__name__ )
该函数是Lock
通过调用确保你"继承"的函数thread_is_spawning
.在Linux上,该方法只返回False
:
@staticmethod def thread_is_spawning(): return False
这是因为Linux不需要pickle来继承Lock
,所以如果__getstate__
实际上是在Linux上调用,我们就不能继承.在Windows上,还有更多内容:
def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) class Popen(object): ''' Start a subprocess to run the code of a process object ''' _tls = thread._local() def __init__(self, process_obj): ... # send information to child prep_data = get_preparation_data(process_obj._name) to_child = os.fdopen(wfd, 'wb') Popen._tls.process_handle = int(hp) try: dump(prep_data, to_child, HIGHEST_PROTOCOL) dump(process_obj, to_child, HIGHEST_PROTOCOL) finally: del Popen._tls.process_handle to_child.close() @staticmethod def thread_is_spawning(): return getattr(Popen._tls, 'process_handle', None) is not None
这里,如果对象具有属性,则thread_is_spawning
返回.我们可以看到创建了属性,然后我们想要继承的数据从父级传递给子级,然后删除属性.所以只会在期间.根据这个python-ideas邮件列表线程,这实际上是一个人为限制,用于模拟与Linux 相同的行为.Windows实际上可以随时支持传递,因为可以随时运行.True
Popen._tls
process_handle
process_handle
__init__
dump
thread_is_spawning
True
__init__
os.fork
Lock
DuplicateHandle
所有上述内容都适用于该Queue
对象,因为它在Lock
内部使用.
我会说继承Lock
对象比使用a更好Manager.Lock()
,因为当你使用a时Manager.Lock
,你所做的每一次调用Lock
必须通过IPC发送到Manager
进程,这比使用Lock
在调用中的共享要慢得多处理.但是,这两种方法都是完全有效的.
最后,可以使用/ keyword参数将a传递给a Lock
的所有成员Pool
而不使用a :Manager
initializer
initargs
lock = None def initialize_lock(l): global lock lock = l def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. """ lock = mp.Lock() mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,)) queue = mp.Queue() iterator = make_iterator(args, queue) mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly. mypool.close() mypool.join() return read_queue(queue)
这是有效的,因为传递的参数传递给在其中运行的对象initargs
的__init__
方法,因此它们最终被继承而不是被pickle.Process
Pool