在我的代码中,我有两个假设的任务:一个从生成器获取URL并使用Twisted的Cooperator批量下载它们,另一个获取下载的源并异步解析它.我正在尝试将所有获取和解析任务封装到一个Deferred对象中,该对象在下载所有页面并解析所有源时回调.
我想出了以下解决方案:
from twisted.internet import defer, task, reactor, threads from twisted.web.client import getPage BATCH_SIZE = 5 def main_task(): result = defer.Deferred() state = {'count': 0, 'done': False} def on_parse_finish(r): state['count'] -= 1 if state['done'] and state['count'] == 0: result.callback(True) def process(source): deferred = parse(source) state['count'] += 1 deferred.addCallback(on_parse_finish) def fetch_urls(): for url in get_urls(): deferred = getPage(url) deferred.addCallback(process) yield deferred def on_finish(r): state['done'] = True deferreds = [] coop = task.Cooperator() urls = fetch_urls() for _ in xrange(BATCH_SIZE): deferreds.append(coop.coiterate(urls)) main_tasks = defer.DeferredList(deferreds) main_tasks.addCallback(on_finish) return defer.DeferredList([main_tasks, result]) # `main_task` is meant to be used with `blockingCallFromThread` # The following should block until all fetch/parse tasks are completed: # threads.blockingCallFromThread(reactor, main_task)
代码可以工作,但我觉得好像我要么丢失一些明显的东西,要么忽略了一个简单的扭曲模式,这会让这更简单.是否有更好的方法可以返回单个Deferred,在所有提取和解析完成后回调?
正如目前所写,我认为这个代码将具有有限数量的并行下载,但是无限数量的并行解析作业.这是故意的吗?我将假设"不",因为如果您的网络碰巧很快并且您的解析器变得很慢,因为URL的数量接近无穷大,您的内存使用情况也是如此:).
所以这里有一个限制并行性的东西,但是依次执行下载解析:
from twisted.internet import defer, task from twisted.web.client import getPage BATCH_SIZE = 5 def main_task(reactor): def fetch_urls(): for url in get_urls(): yield getPage(url).addCallback(parse) coop = task.Cooperator() urls = fetch_urls() return (defer.DeferredList([coop.coiterate(urls) for _ in xrange(BATCH_SIZE)]) .addCallback(task_finished)) task.react(main_task)
这是有效的,因为parse
(显然)返回a Deferred
,将其作为回调添加到由getPage
结果返回的结果中Deferred
,该结果将不会调用添加的回调,coiterate
直到parse
完成其业务.
既然你问的是惯用的Twisted代码,我也可以自由地对它进行一些现代化(使用task.react
而不是手动运行反应器,内联表达式使事情更简洁等等).
如果你确实想要比并行读取更多的并行解析,那么这样的事情可能会更好:
from twisted.internet import defer, task from twisted.web.client import getPage PARALLEL_FETCHES = 5 PARALLEL_PARSES = 10 def main_task(reactor): parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES) def parseWhenReady(r): def parallelParse(_): parse(r).addBoth( lambda result: parseSemaphore.release().addCallback( lambda _: result ) ) return parseSemaphore.acquire().addCallback(parallelParse) def fetch_urls(): for url in get_urls(): yield getPage(url).addCallback(parseWhenReady) coop = task.Cooperator() urls = fetch_urls() return (defer.DeferredList([coop.coiterate(urls) for _ in xrange(PARALLEL_FETCHES)]) .addCallback(lambda done: defer.DeferredList( [parseSemaphore.acquire() for _ in xrange(PARALLEL_PARSES)] )) .addCallback(task_finished)) task.react(main_task)
您可以看到parseWhenReady
返回Deferred
返回的内容acquire
,因此一旦并行解析开始,并行提取将继续,因此即使解析器过载,您也不会继续不加区分地提取.但是,parallelParse
小心地弃权Deferred
返回parse
或者返回release
,因为抓取应该能够继续进行.
(请注意,由于您的初始示例不可运行,我根本没有测试过其中任何一个.希望即使存在错误,目的也很明确.)