如果我以前不知道执行了哪个任务,如何提取任务的结果?这是设置:给定以下源('tasks.py'):
from celery import Celery app = Celery('tasks', backend="db+mysql://u:p@localhost/db", broker = 'amqp://guest:guest@localhost:5672//') @app.task def add(x,y): return x + y @app.task def mul(x,y): return x * y
RabbitMQ 3.3.2在本地运行:
marcs-mbp:sbin marcstreeter$ ./rabbitmq-server RabbitMQ 3.3.2. Copyright (C) 2007-2014 GoPivotal, Inc. ## ## Licensed under the MPL. See http://www.rabbitmq.com/ ## ## ########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log ###### ## /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log ########## Starting broker... completed with 10 plugins.
与Celery 3.1.12在本地运行:
-------------- celery@Marcs-MacBook-Pro.local v3.1.12 (Cipater) ---- **** ----- --- * *** * -- Darwin-13.2.0-x86_64-i386-64bit -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x105dea3d0 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: disabled - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery
然后我可以导入方法并使用'task_id'检索结果:
from tasks import add, mul from celery.result import AsyncResult result = add.delay(2,2) task_id = result.task_id result.get() # 4 result = AsyncResult(id=task_id) result.get() # 4 result = add.AsyncResult(id=task_id) result.get() # 4 # and the same for the 'mul' task. Just imagine I put it here
在下一个示例中,我将在流程之间拆分这些步骤.在一个过程中,我像这样检索'task_id':
from tasks import add result = add.delay(5,5) task_id = result.task_id
在另一个过程中,如果我使用相同的'task_id'(复制并粘贴到另一个REPL,或在不同的HTTP请求中),如下所示:
from celery.result import AsyncResult result = AsyncResult(id="copied_task_id", backend="db+mysql://u:p@localhost/db") result.get() # AttributeError: 'str' object has no attribute 'get_task_meta' result.state # AttributeError: 'str' object has no attribute 'get_task_meta' result.status # AttributeError: 'str' object has no attribute 'get_task_meta'
如果我这样做,在另一个过程中:
from task import add # in this instance I know that an add task was performed result = add.AsyncResult(id="copied_task_id") result.status # "SUCCESSFUL" result.state # "SUCCESSFUL" result.get() # 10
我希望能够在不知道什么任务产生结果的情况下获得结果.在我的真实环境中,我计划将此task_id返回给客户端,让他们通过HTTP请求查询其作业的状态.
好的 - 所以我一直在寻找解决方案很长一段时间,现在我终于正式发布了这个并查看了我发现这个宝石的文档:
class celery.result.AsyncResult(id,backend = None,task_name = None,app = None,parent = None)
查询任务状态.
参数:
id - 见id.
后端 - 见后端.
异常 TimeoutError
超时引发错误.
AsyncResult.app =无
所以我没有提供后端参数,而是提供了"app"参数,而不是这样:
from celery.result import AsyncResult from task import app # Assuming add.delay(10,10) was called in another process # and that I'm using a 'task_id' I retrieved from that process result = AsyncResult(id='copied_task_id', app=app) result.state # 'SUCCESSFUL' result.get() # 20
这对很多人来说可能是显而易见 这不是我.现在我可以说的是,这个解决方案"只是有效",但如果我知道这是获得批准的方式,我会觉得更舒服.如果您知道文档中的一个部分使其更清晰,请将其发布在评论中或作为答案,如果可以,我会选择它作为答案.