Celery/Redis同时执行多次并行任务

 烛光一米_530 发布于 2023-01-01 15:24

我有两个自定义任务(TaskATaskB),都继承自celery.Task.计划推出TaskA现在再逢,并TaskA推出N时间TaskB有不同的参数,每次.但由于某种原因,有时相同TaskB,具有相同的参数,同时执行两次,这会导致数据库出现不同的问题.

class TaskA(celery.Task):

    def run(self, *args, **kwargs):
        objects = MyModel.objects.filter(processed=False)\
                                 .values_list('id', flat=True)
        task_b = TaskB()
        for o in objects:
            o.apply_async(args=[o, ])

class TaskB(celery.Task):

    def run(self, obj_id, *args, **kwargs):
        obj = MyModel.objects.get(id=obj_id)
        # do some stuff with obj

我尝试过的事情

我试图使用celery.group它希望它能解决这些问题,但我得到的只是错误,说run有2个参数而没有提供.

这就是我尝试TaskB使用的方式celery.group:

# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.s(id) for id in objects])
g.apply_async()

我也尝试过这样:

# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.run(id) for id in objects])
g.apply_async()

之前在那里执行任务g.apply_async().

这个问题是来自我如何启动任务还是其他什么?这是正常的行为吗?

附加信息

在我运行的本地计算机celery 3.1.13RabbitMQ 3.3.4,在服务器上celery 3.1.13运行Redis 2.8.9.在本地机器上,我看不到这样的行为,每个任务都执行一次.在服务器上,我看到1到10个这样的任务连续执行两次.

这就是我在本地机器和服务器上运行celery的方法:

celery_beat: celery -A proj beat -l info

celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50

celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200

有效的解决方法

TaskB根据收到的参数引入了锁定.经过大约10个小时的测试,我看到究竟正在执行两次,但是锁可以防止数据库发生冲突.这确实解决了我的问题,但我仍然想知道它为什么会发生.

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有