扭曲:等待子任务完成

 韵丽雅舍现代专业女子养生馆 发布于 2023-02-13 13:07

在我的代码中,我有两个假设的任务:一个从生成器获取URL并使用Twisted的Cooperator批量下载它们,另一个获取下载的源并异步解析它.我正在尝试将所有获取和解析任务封装到一个Deferred对象中,该对象在下载所有页面并解析所有源时回调.

我想出了以下解决方案:

from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage


BATCH_SIZE = 5

def main_task():
    result = defer.Deferred()
    state = {'count': 0, 'done': False}

    def on_parse_finish(r):
        state['count'] -= 1
        if state['done'] and state['count'] == 0:
            result.callback(True)

    def process(source):
        deferred = parse(source)
        state['count'] += 1
        deferred.addCallback(on_parse_finish)

    def fetch_urls():
        for url in get_urls():
            deferred = getPage(url)
            deferred.addCallback(process)
            yield deferred

    def on_finish(r):
        state['done'] = True

    deferreds = []

    coop = task.Cooperator()
    urls = fetch_urls()
    for _ in xrange(BATCH_SIZE):
        deferreds.append(coop.coiterate(urls))

    main_tasks = defer.DeferredList(deferreds)
    main_tasks.addCallback(on_finish)

    return defer.DeferredList([main_tasks, result])

# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)

代码可以工作,但我觉得好像我要么丢失一些明显的东西,要么忽略了一个简单的扭曲模式,这会让这更简单.是否有更好的方法可以返回单个Deferred,在所有提取和解析完成后回调?

1 个回答
  • 正如目前所写,我认为这个代码将具有有限数量的并行下载,但是无限数量的并行解析作业.这是故意的吗?我将假设"不",因为如果您的网络碰巧很快并且您的解析器变得很慢,因为URL的数量接近无穷大,您的内存使用情况也是如此:).

    所以这里有一个限制并行性的东西,但是依次执行下载解析:

    from twisted.internet import defer, task
    from twisted.web.client import getPage
    
    BATCH_SIZE = 5
    
    def main_task(reactor):
        def fetch_urls():
            for url in get_urls():
                yield getPage(url).addCallback(parse)
    
        coop = task.Cooperator()
        urls = fetch_urls()
    
        return (defer.DeferredList([coop.coiterate(urls)
                                   for _ in xrange(BATCH_SIZE)])
                .addCallback(task_finished))
    
    task.react(main_task)
    

    这是有效的,因为parse(显然)返回a Deferred,将其作为回调添加到由getPage结果返回的结果中Deferred,该结果将不会调用添加的回调,coiterate直到parse完成其业务.

    既然你问的是惯用的Twisted代码,我也可以自由地对它进行一些现代化(使用task.react而不是手动运行反应器,内联表达式使事情更简洁等等).

    如果你确实想要比并行读取更多的并行解析,那么这样的事情可能会更好:

    from twisted.internet import defer, task
    from twisted.web.client import getPage
    
    PARALLEL_FETCHES = 5
    PARALLEL_PARSES = 10
    
    def main_task(reactor):
        parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)
    
        def parseWhenReady(r):
            def parallelParse(_):
                parse(r).addBoth(
                    lambda result: parseSemaphore.release().addCallback(
                        lambda _: result
                    )
                )
            return parseSemaphore.acquire().addCallback(parallelParse)
    
        def fetch_urls():
            for url in get_urls():
                yield getPage(url).addCallback(parseWhenReady)
    
        coop = task.Cooperator()
        urls = fetch_urls()
    
        return (defer.DeferredList([coop.coiterate(urls)
                                   for _ in xrange(PARALLEL_FETCHES)])
                .addCallback(lambda done:
                             defer.DeferredList(
                                [parseSemaphore.acquire()
                                 for _ in xrange(PARALLEL_PARSES)]
                             ))
                .addCallback(task_finished))
    
    task.react(main_task)
    

    您可以看到parseWhenReady返回Deferred返回的内容acquire,因此一旦并行解析开始,并行提取将继续,因此即使解析器过载,您也不会继续不加区分地提​​取.但是,parallelParse小心地弃权Deferred返回parse或者返回release,因为抓取应该能够继续进行.

    (请注意,由于您的初始示例不可运行,我根本没有测试过其中任何一个.希望即使存在错误,目的也很明确.)

    2023-02-13 13:10 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有