作者:好宝贝蛋_282 | 来源:互联网 | 2022-12-08 22:51
我使用以下堆栈:
Python 3.6
Celery v4.2.1(经纪人:RabbitMQ v3.6.0)
Django v2.0.4.
根据Celery的文档,在不同队列上运行计划任务应该像在CELERY_ROUTES上为任务定义相应队列一样简单,但是所有任务似乎都在Celery的默认队列上执行.
这是my_app/settings.py上的配置:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_ROUTES = {
'app1.tasks.*': {'queue': 'queue1'},
'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
'app1_test': {
'task': 'app1.tasks.app1_test',
'schedule': 15,
},
'app2_test': {
'task': 'app2.tasks.app2_test',
'schedule': 15,
},
}
任务只是用于测试路由的简单脚本:
文件app1/tasks.py:
from my_app.celery import app
import time
@app.task()
def app1_test():
print('I am app1_test task!')
time.sleep(10)
文件app2/tasks.py:
from my_app.celery import app
import time
@app.task()
def app2_test():
print('I am app2_test task!')
time.sleep(10)
当我使用所有必需的队列运行Celery时:
celery -A my_app worker -B -l info -Q celery,queue1,queue2
RabbitMQ将显示只有默认队列" celery "正在运行任务:
sudo rabbitmqctl list_queues
# Tasks executed by each queue:
# - celery 2
# - queue1 0
# - queue2 0
有人知道如何解决这种意外行为吗?
问候,
1> Ander..:
我有它工作,这里有几点需要注意:
根据Celery的4.2.0文档,CELERY_ROUTES应该是定义队列路由的变量,但它只适用于我使用CELERY_TASK_ROUTES.任务路由似乎独立于Celery Beat,因此这仅适用于手动安排的任务:
app1_test.delay()
app2_test.delay()
要么
app1_test.apply_async()
app2_test.apply_async()
为了使它与Celery Beat一起使用,我们只需要明确定义队列.文件my_app/settings.py的最终设置如下:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = {
'app1.tasks.*': {'queue': 'queue1'},
'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
'app1_test': {
'task': 'app1.tasks.app1_test',
'schedule': 15,
'options': {'queue': 'queue1'}
},
'app2_test': {
'task': 'app2.tasks.app2_test',
'schedule': 15,
'options': {'queue': 'queue2'}
},
}
我希望这可以为其他开发人员节省一些时间.
"CELERY_ROUTES"与"CELERY_TASK_ROUTES"混淆的解释:`CELERY_ROUTES`是旧的芹菜设置名称,现在已被`task_routes`取代.但是_django_设置文件中的_celery_设置必须是大写的(例如`TASK_ROUTES`).为了避免与其他django设置冲突,建议使用`CELERY_`为celery设置添加前缀,从而产生`CELERY_TASK_ROUTES`.通过执行以下操作来加载:`app.config_from_object('django.conf:settings',namespace ='CELERY')`.因此`CELERY_TASK_ROUTES`只是新设置名称的上限和前缀更改.
2> JPG..:
向queue
装饰器添加参数可能会帮助您,
@app.task(queue='queue1')
def app1_test():
print('I am app1_test task!')
time.sleep(10)
感谢@JPG,这将是一个有效的选择,尽管如此,我还是更喜欢在Django设置文件上定义队列,以获得更大的灵活性。这样,我可以根据环境使用不同的队列名称:测试,分段,生产