如何跟踪从多处理池返回的异步结果

 JasonXie 发布于 2023-02-10 15:05

我试图将多处理添加到一些代码中,这些代码具有我无法修改的功能.我想将这些函数作为作业异步提交到多处理池.我正在做的事情很像这里显示的代码.但是,我不确定如何跟踪结果.如何知道返回结果对应的应用函数?

要强调的重点是我无法修改现有函数(其他依赖于它们的东西保持原样),并且结果可以按照与函数作业应用于池的顺序不同的顺序返回.

感谢您的任何想法!

编辑:一些尝试代码如下:

import multiprocessing
from multiprocessing import Pool
import os
import signal
import time
import inspect

def multiply(multiplicand1=0, multiplicand2=0):
    return multiplicand1*multiplicand2

def workFunctionTest(**kwargs):
    time.sleep(3)
    return kwargs

def printHR(object):
    """
    This function prints a specified object in a human readable way.
    """
    # dictionary
    if isinstance(object, dict):
        for key, value in sorted(object.items()):
            print u'{a1}: {a2}'.format(a1=key, a2=value)
    # list or tuple
    elif isinstance(object, list) or isinstance(object, tuple):
        for element in object:
            print element
    # other
    else:
        print object

class Job(object):
    def __init__(
        self,
        workFunction=workFunctionTest,
        workFunctionKeywordArguments={'testString': "hello world"},
        workFunctionTimeout=1,
        naturalLanguageString=None,
        classInstance=None,
        resultGetter=None,
        result=None
        ):
        self.workFunction=workFunction
        self.workFunctionKeywordArguments=workFunctionKeywordArguments
        self.workFunctionTimeout=workFunctionTimeout
        self.naturalLanguageString=naturalLanguageString
        self.classInstance=self.__class__.__name__
        self.resultGetter=resultGetter
        self.result=result
    def description(self):
        descriptionString=""
        for key, value in sorted(vars(self).items()):
            descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value))
        return descriptionString
    def printout(self):
        """
        This method prints a dictionary of all data attributes.
        """
        printHR(vars(self))

class JobGroup(object):
    """
    This class acts as a container for jobs. The data attribute jobs is a list of job objects.
    """
    def __init__(
        self,
        jobs=None,
        naturalLanguageString="null",
        classInstance=None,
        result=None
        ):
        self.jobs=jobs
        self.naturalLanguageString=naturalLanguageString
        self.classInstance=self.__class__.__name__
        self.result=result
    def description(self):
        descriptionString=""
        for key, value in sorted(vars(self).items()):
            descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value))
        return descriptionString
    def printout(self):
        """
        This method prints a dictionary of all data attributes.
        """
        printHR(vars(self))

def initialise_processes():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def execute(
        jobObject=None,
        numberOfProcesses=multiprocessing.cpu_count()
        ):
        # Determine the current function name.
    functionName=str(inspect.stack()[0][3])
    def collateResults(result):
        """
        This is a process pool callback function which collates a list of results returned.
        """
        # Determine the caller function name.
        functionName=str(inspect.stack()[1][3])
        print("{a1}: result: {a2}".format(a1=functionName, a2=result))
        results.append(result)
    def getResults(job):
        # Determine the current function name.
        functionName=str(inspect.stack()[0][3])
        while True:
            try:
                result=job.resultGetter.get(job.workFunctionTimeout)
                break
            except multiprocessing.TimeoutError:
                print("{a1}: subprocess timeout for job".format(a1=functionName, a2=job.description()))
        #job.result=result
        return result
    # Create a process pool.
    pool1 = multiprocessing.Pool(numberOfProcesses, initialise_processes)
    print("{a1}: pool {a2} of {a3} processes created".format(a1=functionName, a2=str(pool1), a3=str(numberOfProcesses)))
    # Unpack the input job object and submit it to the process pool.
    print("{a1}: unpacking and applying job object {a2} to pool...".format(a1=functionName, a2=jobObject))
    if isinstance(jobObject, Job):
        # If the input job object is a job, apply it to the pool with its associated timeout specification.
        # Return a list of results.
        job=jobObject
        print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description()))
        # Apply the job to the pool, saving the object pool.ApplyResult to the job object.
        job.resultGetter=pool1.apply_async(
                func=job.workFunction,
                kwds=job.workFunctionKeywordArguments
        )
        # Get results.
        # Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result.
        print("{a1}: getting results for job...".format(a1=functionName))
        job.result=getResults(job)
        print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description()))
        print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result))
        # Return the job result from execute.
        return job.result
        pool1.terminate()
        pool1.join()
    elif isinstance(jobObject, JobGroup):
        # If the input job object is a job group, cycle through each job and apply it to the pool with its associated timeout specification.
        for job in jobObject.jobs:
            print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description()))
            # Apply the job to the pool, saving the object pool.ApplyResult to the job object.
            job.resultGetter=pool1.apply_async(
                    func=job.workFunction,
                    kwds=job.workFunctionKeywordArguments
            )
        # Get results.
        # Cycle through each job and and append the result for the job to a list of results.
        results=[]
        for job in jobObject.jobs:
            # Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result.
            print("{a1}: getting results for job...".format(a1=functionName))
            job.result=getResults(job)
            print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description()))
            #print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result))
            # Collate the results.
            results.append(job.result)
        # Apply the list of results to the job group data attribute results.
        jobObject.results=results
        print("{a1}: job group results: {a2}".format(a1=functionName, a2=jobObject.results))
        # Return the job result list from execute.
        return jobObject.results
        pool1.terminate()
        pool1.join()
    else:
        # invalid input object
        print("{a1}: invalid job object {a2}".format(a1=functionName, a2=jobObject))

def main():
    print('-'*80)
    print("MULTIPROCESSING SYSTEM DEMONSTRATION\n")

    # Create a job.
    print("# creating a job...\n")
    job1=Job(
            workFunction=workFunctionTest,
            workFunctionKeywordArguments={'testString': "hello world"},
            workFunctionTimeout=4
    )
    print("- printout of new job object:")
    job1.printout()
    print("\n- printout of new job object in logging format:")
    print job1.description()

    # Create another job.
    print("\n# creating another job...\n")
    job2=Job(
            workFunction=multiply,
            workFunctionKeywordArguments={'multiplicand1': 2, 'multiplicand2': 3},
            workFunctionTimeout=6
    )
    print("- printout of new job object:")
    job2.printout()
    print("\n- printout of new job object in logging format:")
    print job2.description()

    # Create a JobGroup object.
    print("\n# creating a job group (of jobs 1 and 2)...\n")
    jobGroup1=JobGroup(
            jobs=[job1, job2],
    )
    print("- printout of new job group object:")
    jobGroup1.printout()
    print("\n- printout of new job group object in logging format:")
    print jobGroup1.description()

    # Submit the job group.
    print("\nready to submit job group")
    response=raw_input("\nPress Enter to continue...\n")
    execute(jobGroup1)

    response=raw_input("\nNote the results printed above. Press Enter to continue the demonstration.\n")

    # Demonstrate timeout.
    print("\n # creating a new job in order to demonstrate timeout functionality...\n")
    job3=Job(
            workFunction=workFunctionTest,
            workFunctionKeywordArguments={'testString': "hello world"},
            workFunctionTimeout=1
    )
    print("- printout of new job object:")
    job3.printout()
    print("\n- printout of new job object in logging format:")
    print job3.description()
    print("\nNote the timeout specification of only 1 second.")

    # Submit the job.
    print("\nready to submit job")
    response=raw_input("\nPress Enter to continue...\n")
    execute(job3)

    response=raw_input("\nNote the recognition of timeouts printed above. This concludes the demonstration.")
    print('-'*80)

if __name__ == '__main__':
    main()

编辑:由于以下原因,此问题已被搁置[暂停]:

"要求代码的问题必须表明对正在解决的问题的最小理解.包括尝试的解决方案,为什么它们不起作用,以及预期的结果.另请参阅:Stack Overflow问题清单 "

这个问题不是要求代码; 它要求思想,一般指导.对所考虑问题的最小理解得到了证明(注意正确使用术语"多处理","池"和"异步"并注意对先前代码的引用).关于尝试的解决方案,我承认在解决方案上的尝试努力将是有益的.我现在已经添加了这样的代码.我希望我已经解决了导致[暂停]状态的问题.

1 个回答
  • 没有看到实际代码,我只能回答一般性问题.但有两种一般的解决方案.

    首先,不要使用callback和忽略AsyncResults ,而是将它们存储在某种集合中.然后你就可以使用那个集合了.例如,如果您希望能够使用该函数作为键查找函数的结果,只需创建一个dict键入函数的函数:

    def in_parallel(funcs):
        results = {}
        pool = mp.Pool()
        for func in funcs:
            results[func] = pool.apply_async(func)
        pool.close()
        pool.join()
        return {func: result.get() for func, result in results.items()}
    

    或者,您可以更改回调函数以按键将结果存储在集合中.例如:

    def in_parallel(funcs):
        results = {}
        pool = mp.Pool()
        for func in funcs:
            def callback(result, func=func):
                results[func] = result
            pool.apply_async(func, callback=callback)
        pool.close()
        pool.join()
        return results
    

    我正在使用函数本身作为密钥.但是你想要使用索引,这同样容易.您拥有的任何价值,都可以用作关键.


    同时,您链接的示例实际上只是在一堆参数上调用相同的函数,等待所有这些参数完成,并将结果保留为任意顺序的迭代.这正是imap_unordered它的作用,但更简单.你可以用链接代码替换整个复杂的东西:

    pool = mp.Pool()
    results = list(pool.imap_unordered(foo_pool, range(10)))
    pool.close()
    pool.join()
    

    然后,如果您希望结果按原始顺序而不是按任意顺序排列,则可以切换到imapmap替换.所以:

    pool = mp.Pool()
    results = pool.map(foo_pool, range(10))
    pool.close()
    pool.join()
    

    如果你需要类似但过于复杂的东西以适应map范式,concurrent.futures可能会让你的生活更轻松multiprocessing.如果您使用的是Python 2.x,则必须安装backport.但是,你可以做一些更难用AsyncResults或callbacks(或map)做的事情,比如将一大堆期货组合成一个大的未来.请参阅链接文档中的示例.


    最后一点:

    需要强调的重点是我无法修改现有功能......

    如果无法修改函数,则可以始终将其包装.例如,假设我有一个函数返回一个数字的平方,但我正在尝试异步地构建一个dict映射数字到它们的方块,所以我需要将原始数字作为结果的一部分.这很简单:

    def number_and_square(x):
        return x, square(x)
    

    现在,我可以apply_async(number_and_square)而不仅仅是square,并获得我想要的结果.

    我没有在上面的例子中这样做,因为在第一种情况下我将密钥存储在来自调用端的集合中,在第二种情况下,我将它绑定到回调函数中.但是将它绑定到函数周围的包装器就像这两者中的任何一个一样简单,并且当这两者都不合适时可能是合适的.

    2023-02-10 15:07 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有