芹菜'入门'无法检索结果; 总是等待

 手机用户2602901497 发布于 2022-12-25 18:09

我一直在尝试遵循Celery 与Celery和后续步骤指南的第一步.我的设置是Windows 7 64位,Anaconda Python 2.7(32位),安装的Erlang 32位二进制文​​件,RabbitMQ服务器和芹菜(带pip install celery).

在指南之后,我使用init .py,tasks.py和celery.py 创建了一个proj文件夹.我的init .py是空的.这是celery.py:

from __future__ import absolute_import

from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])

#Optional configuration, see the application user guide
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
)

if __name__ == '__main__':
    app.start()

这是task.py:

from __future__ import absolute_import

from .celery import app

@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

首先我明白我应该确保RabbitMQ服务正在运行.Task Manger的Services选项卡显示RabbitMQ确实正在运行.要开始芹菜服务器并加载我的任务,我打开cmd.exe,导航到proj(我称之为celery_demo的文件夹)的父级,然后运行:

celery -A proj.celery worker -l debug

这给出了这个输出:

C:\Users\bnables\Documents\Python\celery_demo>celery -A proj.celery worker -l debug
[2014-08-25 17:00:09,308: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2014-08-25 17:00:09,313: DEBUG/MainProcess] | Worker: Building graph...
[2014-08-25 17:00:09,315: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoreloader, Autoscaler, StateDB, Beat, Con
sumer}
[2014-08-25 17:00:09,322: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2014-08-25 17:00:09,322: DEBUG/MainProcess] | Consumer: Building graph...
[2014-08-25 17:00:09,332: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Mingle, Gossip, Tasks, Control, Agent, event loop
}

 -------------- celery@MSSLW40013047 v3.1.13 (Cipater)
---- **** -----
--- * ***  * -- Windows-7-6.1.7601-SP1
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         proj:0x3290370
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     amqp
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . proj.tasks.add
  . proj.tasks.mul
  . proj.tasks.xsum

[2014-08-25 17:00:09,345: DEBUG/MainProcess] | Worker: Starting Pool
[2014-08-25 17:00:09,417: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,420: DEBUG/MainProcess] | Worker: Starting Consumer
[2014-08-25 17:00:09,421: DEBUG/MainProcess] | Consumer: Starting Connection
[2014-08-25 17:00:09,457: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL.  See http://www.r
abbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2014 GoPivotal, Inc.', u'capabilities': {u'exchange_exchange_bindings': Tru
e, u'connection.blocked': True, u'authentication_failure_close': True, u'basic.nack': True, u'per_consumer_qos': True, u'consumer_priorities': True, u
'consumer_cancel_notify': True, u'publisher_confirms': True}, u'cluster_name': u'rabbit@MSSLW40013047.ndc.nasa.gov', u'platform': u'Erlang/OTP', u'ver
sion': u'3.3.5'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
[2014-08-25 17:00:09,460: DEBUG/MainProcess] Open OK!
[2014-08-25 17:00:09,460: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2014-08-25 17:00:09,461: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,461: DEBUG/MainProcess] | Consumer: Starting Events
[2014-08-25 17:00:09,516: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL.  See http://www.r
abbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2014 GoPivotal, Inc.', u'capabilities': {u'exchange_exchange_bindings': Tru
e, u'connection.blocked': True, u'authentication_failure_close': True, u'basic.nack': True, u'per_consumer_qos': True, u'consumer_priorities': True, u
'consumer_cancel_notify': True, u'publisher_confirms': True}, u'cluster_name': u'rabbit@MSSLW40013047.ndc.nasa.gov', u'platform': u'Erlang/OTP', u'ver
sion': u'3.3.5'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
[2014-08-25 17:00:09,519: DEBUG/MainProcess] Open OK!
[2014-08-25 17:00:09,520: DEBUG/MainProcess] using channel_id: 1
[2014-08-25 17:00:09,522: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:09,523: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,523: DEBUG/MainProcess] | Consumer: Starting Heart
[2014-08-25 17:00:09,530: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,533: DEBUG/MainProcess] | Consumer: Starting Mingle
[2014-08-25 17:00:09,538: INFO/MainProcess] mingle: searching for neighbors
[2014-08-25 17:00:09,539: DEBUG/MainProcess] using channel_id: 1
[2014-08-25 17:00:09,540: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:10,552: INFO/MainProcess] mingle: all alone
[2014-08-25 17:00:10,552: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,552: DEBUG/MainProcess] | Consumer: Starting Gossip
[2014-08-25 17:00:10,553: DEBUG/MainProcess] using channel_id: 2
[2014-08-25 17:00:10,555: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:10,559: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,559: DEBUG/MainProcess] | Consumer: Starting Tasks
[2014-08-25 17:00:10,566: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,566: DEBUG/MainProcess] | Consumer: Starting Control
[2014-08-25 17:00:10,568: DEBUG/MainProcess] using channel_id: 3
[2014-08-25 17:00:10,569: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:10,572: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,573: DEBUG/MainProcess] | Consumer: Starting event loop
[2014-08-25 17:00:10,575: WARNING/MainProcess] celery@MSSLW40013047 ready.
[2014-08-25 17:00:10,575: DEBUG/MainProcess] basic.qos: prefetch_count->32

-A告诉芹菜在哪里可以找到我的芹菜应用实例.使用也只是proj工作,但因为它只会搜索proj.celery,在这里更明确是很好的. worker是给芹菜的命令告诉它产生一些工人来执行它从proj.celery加载的任务.最后-l debug告诉celery将日志级别设置为debug以便我获得大量信息.这通常是-l info.

为了测试我的任务服务器,我打开一个IPython Qt控制台并导航到该celery_demo文件夹(包含proj).然后我输入from proj.tasks import add.如预期的那样,只需调用add(1, 2)返回3而不使用服务器.当我调用add.delay时会发生这种情况:

add.delay(2, 3)

哪个回报:


在我的cmd.exe窗口中,我得到:

[2014-08-25 17:20:38,109: INFO/MainProcess] Received task: proj.tasks.add[42123ff3-e94e-4673-808a-ec6c847679d8]
[2014-08-25 17:20:38,109: DEBUG/MainProcess] TaskPool: Apply  (args:(u'proj.tasks.add', u'42123ff3-e94e-4673-
808a-ec6c847679d8', [2, 3], {}, {u'timelimit': [None, None], u'utc': True, u'is_eager': False, u'chord': None, u'group': None, u'args': [2, 3], u'retr
ies': 0, u'delivery_info': {u'priority': None, u'redelivered': False, u'routing_key': u'celery', u'exchange': u'celery'}, u'expires': None, u'hostname
': 'celery@MSSLW40013047', u'task': u'proj.tasks.add', u'callbacks': None, u'correlation_id': u'42123ff3-e94e-4673-808a-ec6c847679d8', u'errbacks': No
ne, u'reply_to': u'70ed001d-193c-319c-9447-8d77c231dc10', u'taskset': None, u'kwargs': {}, u'eta': None, u'id': u'42123ff3-e94e-4673-808a-ec6c847679d8
', u'headers': {}}) kwargs:{})
[2014-08-25 17:20:38,124: DEBUG/MainProcess] Task accepted: proj.tasks.add[42123ff3-e94e-4673-808a-ec6c847679d8] pid:4052
[2014-08-25 17:20:38,125: INFO/MainProcess] Task proj.tasks.add[42123ff3-e94e-4673-808a-ec6c847679d8] succeeded in 0.0130000114441s: 5

因此,如最后一行所示,正在计算5的结果.接下来我想存储AsyncResult对象并检查它的状态并获取结果值:

result = add.delay(3, 4)

但是result.state和result.get(timeout = 1)不能按预期工作:

In:  result.state
Out: 'Pending'
In:  result.status
Out: 'Pending'

In: result.get(timeout=1)
---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
 in ()
----> 1 result.get(timeout=1)

C:\Anaconda32\lib\site-packages\celery\result.pyc in get(self, timeout, propagate, interval, no_ack, follow_parents)
    167                 interval=interval,
    168                 on_interval=on_interval,
--> 169                 no_ack=no_ack,
    170             )
    171         finally:

C:\Anaconda32\lib\site-packages\celery\backends\amqp.pyc in wait_for(self, task_id, timeout, cache, propagate, no_ack, on_interval, READY_STATES, PROPAGATE_STATES, **kwargs)
    155                                     on_interval=on_interval)
    156             except socket.timeout:
--> 157                 raise TimeoutError('The operation timed out.')
    158 
    159         if meta['status'] in PROPAGATE_STATES and propagate:

TimeoutError: The operation timed out.

result.state或result.status的预期结果是'SUCCESSFUL',结果result.get(timeout=1)应该是5.

结果存储或消息传递似乎无法正常工作.本教程简单地说backend在调用Celery()CELERY_RESULT_BACKEND配置设置中设置命名参数.在"入门"中,它具有backend='amqp'"后续步骤"所在的位置,backend='amqp://'这也用于github示例中.

我一直在敲打这个问题一段时间,对于如何继续进行一点点线索.关于下一步尝试的任何想法?谢谢!

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有