我试图将多处理添加到一些代码中,这些代码具有我无法修改的功能.我想将这些函数作为作业异步提交到多处理池.我正在做的事情很像这里显示的代码.但是,我不确定如何跟踪结果.如何知道返回结果对应的应用函数?
要强调的重点是我无法修改现有函数(其他依赖于它们的东西保持原样),并且结果可以按照与函数作业应用于池的顺序不同的顺序返回.
感谢您的任何想法!
编辑:一些尝试代码如下:
import multiprocessing from multiprocessing import Pool import os import signal import time import inspect def multiply(multiplicand1=0, multiplicand2=0): return multiplicand1*multiplicand2 def workFunctionTest(**kwargs): time.sleep(3) return kwargs def printHR(object): """ This function prints a specified object in a human readable way. """ # dictionary if isinstance(object, dict): for key, value in sorted(object.items()): print u'{a1}: {a2}'.format(a1=key, a2=value) # list or tuple elif isinstance(object, list) or isinstance(object, tuple): for element in object: print element # other else: print object class Job(object): def __init__( self, workFunction=workFunctionTest, workFunctionKeywordArguments={'testString': "hello world"}, workFunctionTimeout=1, naturalLanguageString=None, classInstance=None, resultGetter=None, result=None ): self.workFunction=workFunction self.workFunctionKeywordArguments=workFunctionKeywordArguments self.workFunctionTimeout=workFunctionTimeout self.naturalLanguageString=naturalLanguageString self.classInstance=self.__class__.__name__ self.resultGetter=resultGetter self.result=result def description(self): descriptionString="" for key, value in sorted(vars(self).items()): descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value)) return descriptionString def printout(self): """ This method prints a dictionary of all data attributes. """ printHR(vars(self)) class JobGroup(object): """ This class acts as a container for jobs. The data attribute jobs is a list of job objects. """ def __init__( self, jobs=None, naturalLanguageString="null", classInstance=None, result=None ): self.jobs=jobs self.naturalLanguageString=naturalLanguageString self.classInstance=self.__class__.__name__ self.result=result def description(self): descriptionString="" for key, value in sorted(vars(self).items()): descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value)) return descriptionString def printout(self): """ This method prints a dictionary of all data attributes. """ printHR(vars(self)) def initialise_processes(): signal.signal(signal.SIGINT, signal.SIG_IGN) def execute( jobObject=None, numberOfProcesses=multiprocessing.cpu_count() ): # Determine the current function name. functionName=str(inspect.stack()[0][3]) def collateResults(result): """ This is a process pool callback function which collates a list of results returned. """ # Determine the caller function name. functionName=str(inspect.stack()[1][3]) print("{a1}: result: {a2}".format(a1=functionName, a2=result)) results.append(result) def getResults(job): # Determine the current function name. functionName=str(inspect.stack()[0][3]) while True: try: result=job.resultGetter.get(job.workFunctionTimeout) break except multiprocessing.TimeoutError: print("{a1}: subprocess timeout for job".format(a1=functionName, a2=job.description())) #job.result=result return result # Create a process pool. pool1 = multiprocessing.Pool(numberOfProcesses, initialise_processes) print("{a1}: pool {a2} of {a3} processes created".format(a1=functionName, a2=str(pool1), a3=str(numberOfProcesses))) # Unpack the input job object and submit it to the process pool. print("{a1}: unpacking and applying job object {a2} to pool...".format(a1=functionName, a2=jobObject)) if isinstance(jobObject, Job): # If the input job object is a job, apply it to the pool with its associated timeout specification. # Return a list of results. job=jobObject print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description())) # Apply the job to the pool, saving the object pool.ApplyResult to the job object. job.resultGetter=pool1.apply_async( func=job.workFunction, kwds=job.workFunctionKeywordArguments ) # Get results. # Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result. print("{a1}: getting results for job...".format(a1=functionName)) job.result=getResults(job) print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description())) print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result)) # Return the job result from execute. return job.result pool1.terminate() pool1.join() elif isinstance(jobObject, JobGroup): # If the input job object is a job group, cycle through each job and apply it to the pool with its associated timeout specification. for job in jobObject.jobs: print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description())) # Apply the job to the pool, saving the object pool.ApplyResult to the job object. job.resultGetter=pool1.apply_async( func=job.workFunction, kwds=job.workFunctionKeywordArguments ) # Get results. # Cycle through each job and and append the result for the job to a list of results. results=[] for job in jobObject.jobs: # Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result. print("{a1}: getting results for job...".format(a1=functionName)) job.result=getResults(job) print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description())) #print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result)) # Collate the results. results.append(job.result) # Apply the list of results to the job group data attribute results. jobObject.results=results print("{a1}: job group results: {a2}".format(a1=functionName, a2=jobObject.results)) # Return the job result list from execute. return jobObject.results pool1.terminate() pool1.join() else: # invalid input object print("{a1}: invalid job object {a2}".format(a1=functionName, a2=jobObject)) def main(): print('-'*80) print("MULTIPROCESSING SYSTEM DEMONSTRATION\n") # Create a job. print("# creating a job...\n") job1=Job( workFunction=workFunctionTest, workFunctionKeywordArguments={'testString': "hello world"}, workFunctionTimeout=4 ) print("- printout of new job object:") job1.printout() print("\n- printout of new job object in logging format:") print job1.description() # Create another job. print("\n# creating another job...\n") job2=Job( workFunction=multiply, workFunctionKeywordArguments={'multiplicand1': 2, 'multiplicand2': 3}, workFunctionTimeout=6 ) print("- printout of new job object:") job2.printout() print("\n- printout of new job object in logging format:") print job2.description() # Create a JobGroup object. print("\n# creating a job group (of jobs 1 and 2)...\n") jobGroup1=JobGroup( jobs=[job1, job2], ) print("- printout of new job group object:") jobGroup1.printout() print("\n- printout of new job group object in logging format:") print jobGroup1.description() # Submit the job group. print("\nready to submit job group") response=raw_input("\nPress Enter to continue...\n") execute(jobGroup1) response=raw_input("\nNote the results printed above. Press Enter to continue the demonstration.\n") # Demonstrate timeout. print("\n # creating a new job in order to demonstrate timeout functionality...\n") job3=Job( workFunction=workFunctionTest, workFunctionKeywordArguments={'testString': "hello world"}, workFunctionTimeout=1 ) print("- printout of new job object:") job3.printout() print("\n- printout of new job object in logging format:") print job3.description() print("\nNote the timeout specification of only 1 second.") # Submit the job. print("\nready to submit job") response=raw_input("\nPress Enter to continue...\n") execute(job3) response=raw_input("\nNote the recognition of timeouts printed above. This concludes the demonstration.") print('-'*80) if __name__ == '__main__': main()
编辑:由于以下原因,此问题已被搁置[暂停]:
"要求代码的问题必须表明对正在解决的问题的最小理解.包括尝试的解决方案,为什么它们不起作用,以及预期的结果.另请参阅:Stack Overflow问题清单 "
这个问题不是要求代码; 它要求思想,一般指导.对所考虑问题的最小理解得到了证明(注意正确使用术语"多处理","池"和"异步"并注意对先前代码的引用).关于尝试的解决方案,我承认在解决方案上的尝试努力将是有益的.我现在已经添加了这样的代码.我希望我已经解决了导致[暂停]状态的问题.
没有看到实际代码,我只能回答一般性问题.但有两种一般的解决方案.
首先,不要使用callback
和忽略AsyncResult
s ,而是将它们存储在某种集合中.然后你就可以使用那个集合了.例如,如果您希望能够使用该函数作为键查找函数的结果,只需创建一个dict
键入函数的函数:
def in_parallel(funcs): results = {} pool = mp.Pool() for func in funcs: results[func] = pool.apply_async(func) pool.close() pool.join() return {func: result.get() for func, result in results.items()}
或者,您可以更改回调函数以按键将结果存储在集合中.例如:
def in_parallel(funcs): results = {} pool = mp.Pool() for func in funcs: def callback(result, func=func): results[func] = result pool.apply_async(func, callback=callback) pool.close() pool.join() return results
我正在使用函数本身作为密钥.但是你想要使用索引,这同样容易.您拥有的任何价值,都可以用作关键.
同时,您链接的示例实际上只是在一堆参数上调用相同的函数,等待所有这些参数完成,并将结果保留为任意顺序的迭代.这正是imap_unordered
它的作用,但更简单.你可以用链接代码替换整个复杂的东西:
pool = mp.Pool() results = list(pool.imap_unordered(foo_pool, range(10))) pool.close() pool.join()
然后,如果您希望结果按原始顺序而不是按任意顺序排列,则可以切换到imap
或map
替换.所以:
pool = mp.Pool() results = pool.map(foo_pool, range(10)) pool.close() pool.join()
如果你需要类似但过于复杂的东西以适应map
范式,concurrent.futures
可能会让你的生活更轻松multiprocessing
.如果您使用的是Python 2.x,则必须安装backport.但是,你可以做一些更难用AsyncResult
s或callback
s(或map
)做的事情,比如将一大堆期货组合成一个大的未来.请参阅链接文档中的示例.
最后一点:
需要强调的重点是我无法修改现有功能......
如果无法修改函数,则可以始终将其包装.例如,假设我有一个函数返回一个数字的平方,但我正在尝试异步地构建一个dict映射数字到它们的方块,所以我需要将原始数字作为结果的一部分.这很简单:
def number_and_square(x): return x, square(x)
现在,我可以apply_async(number_and_square)
而不仅仅是square
,并获得我想要的结果.
我没有在上面的例子中这样做,因为在第一种情况下我将密钥存储在来自调用端的集合中,在第二种情况下,我将它绑定到回调函数中.但是将它绑定到函数周围的包装器就像这两者中的任何一个一样简单,并且当这两者都不合适时可能是合适的.