我有两个自定义任务(TaskA
和TaskB
),都继承自celery.Task
.计划推出TaskA
现在再逢,并TaskA
推出N
时间TaskB
有不同的参数,每次.但由于某种原因,有时相同TaskB
,具有相同的参数,同时执行两次,这会导致数据库出现不同的问题.
class TaskA(celery.Task): def run(self, *args, **kwargs): objects = MyModel.objects.filter(processed=False)\ .values_list('id', flat=True) task_b = TaskB() for o in objects: o.apply_async(args=[o, ]) class TaskB(celery.Task): def run(self, obj_id, *args, **kwargs): obj = MyModel.objects.get(id=obj_id) # do some stuff with obj
我尝试过的事情
我试图使用celery.group
它希望它能解决这些问题,但我得到的只是错误,说run
有2个参数而没有提供.
这就是我尝试TaskB
使用的方式celery.group
:
# somewhere in TaskA task_b = TaskB() g = celery.group([task_b.s(id) for id in objects]) g.apply_async()
我也尝试过这样:
# somewhere in TaskA task_b = TaskB() g = celery.group([task_b.run(id) for id in objects]) g.apply_async()
之前在那里执行任务g.apply_async()
.
题
这个问题是来自我如何启动任务还是其他什么?这是正常的行为吗?
附加信息
在我运行的本地计算机celery 3.1.13
上RabbitMQ 3.3.4
,在服务器上celery 3.1.13
运行Redis 2.8.9
.在本地机器上,我看不到这样的行为,每个任务都执行一次.在服务器上,我看到1到10个这样的任务连续执行两次.
这就是我在本地机器和服务器上运行celery的方法:
celery_beat: celery -A proj beat -l info celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50 celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200
有效的解决方法
我TaskB
根据收到的参数引入了锁定.经过大约10个小时的测试,我看到究竟正在执行两次,但是锁可以防止数据库发生冲突.这确实解决了我的问题,但我仍然想知道它为什么会发生.