热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Celery源码解析三:Task对象的实现

序列文章:Celery源码解析二:Worker的执行引擎Celery源码解析三:Task对象的实现Celery源码解析四:定时任务的实现Celery源码解析五:远程控制管理Cele

序列文章:

  • Celery 源码解析二:Worker 的执行引擎

  • Celery 源码解析三: Task 对象的实现

  • Celery 源码解析四: 定时任务的实现

  • Celery 源码解析五: 远程控制管理

  • Celery 源码解析六:Events 的实现

  • Celery 源码解析七:Worker 之间的交互

  • Celery 源码解析八:State 和 Result

 

Task 的实现在 Celery 中你会发现有两处,一处位于 celery/app/task.py,这是第一个;第二个位于 celery/task/base.py 中,这是第二个。他们之间是有关系的,你可以认为第一个是对外暴露的接口,而第二个是具体的实现!所以,我们由简入繁,先来看看对外的接口:

其实这就是个我们声明 Task 的对象,例如我们使用这么一段代码:

Celery 源码解析三: Task 对象的实现

我们可以看看 add 对象是啥:

In [1]: add
Out[1]: <@task: worker.add of tasks:0x10c9b06d0>

你会发现其实他就是我们的一个 Task 对象,所以你就可以观察一下我们平时使用这个 add 的形式在里面是如何实现的了,例如我们最常使用的可能就两种方式了,分别是:

In [2]: add.delay()
In [3]: add.apply_async()

其他你看一下源码就会发现他们的实现是一样的,就像这样:

Celery 源码解析三: Task 对象的实现

我们现在很清楚,调用 apply_async 是将我们的 Task 提交到 MQ 中,然后获得一个 celery.result.AsyncResult 对象,那么具体都做了哪些工作,还是需要进一步查看的。apply_async 的参数有很多,所以我们需要先给他归个类,这样就好看多了,概括着看,可以分为这么几类:

  • AMQP 类:connection、queue、exchange、routing_key、producer、publisher、headers
  • MQ 策略类:countdown、eta、expires、retry、retry_policy、priority、
  • 管理类:shadow、serializer、compression、add_to_parent
  • 其他:args、kwargs、link、link_error、

这样一看就感觉一目了然多了,AMQP 类的我们就不关注了,毕竟都看了这么多了,应该大家都熟悉了。这里的主要关注点还是在 MQ 策略类和管理类上,着重在 MQ 策略类上,因为管理类的功能稍微比较简单一些。

async 发送消息

apply_async 中,我们可以看到有两处执行逻辑,第一处是:

Celery 源码解析三: Task 对象的实现

这里是直接调用 apply,然后这里的条件 task_always_eager 是什么意思我们还没见过,可以看一下文档:

Celery 源码解析三: Task 对象的实现

ok,了解,其实就是说这是个同步的接口,那么我们就可以对应着看到下面这处应该是异步的实现咯:

Celery 源码解析三: Task 对象的实现

既然如此,我们一个个得来看。

同步发送消息实现

同步执行消息的一层函数比较简单,只是简单的构建了一个 tracer,然后就从 tracer 调用中拿到调用结果,我们看上去会比较舒服:

Celery 源码解析三: Task 对象的实现

但是,这个 tracer 的内容就复杂啦,但是这个 build_tracer 的构建函数不需要太过关注,所以我们需要关注的是 build_tracer 返回的这个 tracer 函数,但是这个函数的内容很多,为了简约一下,所以给大家抽象了一番。同步调用过程中,可以分为几部分功能,分别是:

  • 信号处理:执行前/后/成功这几个时刻需要释放一些信号给感兴趣的成员
  • 失败处理:对于没有执行的情况需要进行细分处理,例如:reject/ignore/retry/exception 等
  • 依赖处理:因为 Celery 支持一些简单的依赖,所以执行完成之后需要执行被依赖的 tasks
  • 执行逻辑:这个就是正常的函数调用咯
  • 其他:例如统计执行时间,出入栈之类的

我们就看下任务的执行逻辑是怎么样的,在代码里面是很简单的一个函数调用,其实就是看 Task 对象有没有实现 __call__ 方法,如果没有就使用 run 方法:

Celery 源码解析三: Task 对象的实现

那 task 的 __call__ 实现也不是太复杂,其实最后调用的也是 run 方法,所以到最后都还是 run 方法的责任啦,但是,这里的基类是不实现 run 方法的,所以这个实现就落实到具体的实现类中了,那么你以为 run 方法会在 celery/app/base.py 中实现?我之前也是这么想的,但是,后来我发现不是的,这个实现其实就是我们在代码里面使用 @app.task 装饰的函数,其实就是讲我们自定义的函数封装成 run,这样调用 run 不就执行的我们的函数了吗?有意思吧,这个封装的方式我们后面再说,也就是说同步的方式我们就到此吧,也差不多了。

异步发送消息实现

看完同步的我们再来看看异步的,在说异步的之前,我们先思考一下,异步的应该是怎样?之前看的时候我猜想异步不就是把 Task 对象塞进 MQ 中么,就应该是这么简单,但是,看完之后发现还是 too young 了,因为从同步中我们就可以看出,还是有很多功课要完成的,不管怎样,一起来再看一遍。

从前边我们说有同步和异步两种形式那里我们可以发现同步和异步的除了功能不一样之外,还有调用的对象也不一样,同步的是调用 Task 自己的方法,也就是说消息被 Task 自己消化了;而异步的确实使用的 Celery 对象的方法,也就是说还得依赖于 Celery 这个 Boss 来实现。这是为啥呢?很明显嘛,Task 自身没有关于 MQ 的任何消息,而只有一个绑定的 Celery 对象,所以从抽象层面就只能交给 Celery 了,而 Celery 却包含了所有你需要的信息,是可以完成这个任务的。

所以,异步的消息到了 Celery 是这么被发出去的:

Celery 源码解析三: Task 对象的实现

这里出现了一个我们还没怎么接触过的 amqp,但是没关系,随着等下的了解,我们会认识到它的,这里的几个关键步骤都是通过 amqp 来完成的,所以我们应该着重看看他们

异步消息体的创建

在 Celery 中,异步消息体是通过 create_task_message 来创建的,我们可以发现,这里是传了一大堆参数进去,但是,无妨,对于这些参数,我们大部分都在前面见过了,不怵,主要还是需要关注一下内部都为消息体做了什么工作:

Celery 源码解析三: Task 对象的实现

这里可以发现两件事情

  1. 消息体的预处理都是在这里完成的,例如检验和转换参数格式
  2. 构建消息就用了四个属性:headerspropertiesbodysent_event

这里其实就是所有的构建消息体的代码了,为什么呢,因为 task_message 是一个 nametuple:

Celery 源码解析三: Task 对象的实现

异步消息的发送

异步消息的发送这里不是直接就调用的一个函数,而是动态得创建了一个 sender ,然后才调用这个 sender 发送的(没搞懂为啥,为了扩展?)。而创建 sender 的逻辑倒是比较简单,所以忽略了,直接来看真正的 send 操作是如何完成的,其实之前提过了,这里真正的 send 操作就像之前我们看同步的执行逻辑一样尿性,又臭又长,真的,又臭又长,而且作者自己都加注释承认了,他的理由是为了性能!

同样得,为了方便我们的理解,我还是采用抽丝剥茧的方式来给大家介绍一下,首先,我习惯性得分个类:

  • MQ 的各项功能:routing_key/exchange/delivery_mode/retry
  • 任务执行的前后处理:发送前/发送后
  • 真正的发送逻辑
  • 其他

其实重头戏应该在 MQ 的参数确定上,因为只要这些参数都确定了,消息的发送只是一个 producer.publish 就解决的事情,所以我们花些精力来看看 MQ 的参数都是怎么决定出来的:

  • queue_name
    1. 调用 task.delay 的时候传的,没传并且也没传 exchange 那就是 default
    2. 不会出现传了 exchange 但是不传 queue
  • routing_key
    1. 调用 task.delay 的时候传的,没传就看 exchange 有没有,没有就是 queue 的值了
    2. 如果参数传了 exchange,那么就是配置中的默认 routing_key
  • exchange:
    1. 调用 task.delay 的时候传的,没传但是 exchange_type 类型是 direct,那么就是 ""
    2. 如果类型不是 direct,那么 queue 有 exchange 就用,没有就使用默认的
  • delivery_mode
    1. 调用 task.delay 的时候传的,没传就看 queue 里面有没有,有就用
    2. 没有就使用默认的
  • retry:
    1. 调用 task.delay 的时候传了就用,没传就用默认的

等这些参数确认完之后,就使用这些参数发送了!

然后这样子就将消息发出去了,等待 Worker 的接收,而 worker 的接受逻辑我们之前已经看到了,其实还是注册的 Consumer 的 on_message

附加

在前面我们说如何构建异步消息体的时候,对于消息体只是简单的用几个 ... 忽略过,但是,对于整体理解来说,我们不应该忽略他们的实质内容,所以在最后我把他们都罗列出来,前后的会用到的。而且你会发现有点意思的是,对于我们的一个异步调用,task 名和 id 都是放在 headers 里头的,而参数什么的却是放在 body 里面,在我自己实现的异步 MQ 里面,这些都是放在 body 里面的,这点我倒是不太欣赏 Celery 的。

headers

Celery 源码解析三: Task 对象的实现

properties

Celery 源码解析三: Task 对象的实现

body

Celery 源码解析三: Task 对象的实现

send_event

Celery 源码解析三: Task 对象的实现


推荐阅读
  • RabbitMQ的消息持久化处理
    1、RabbitMQ的消息持久化处理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。2、auto ... [详细]
  • 无处不在,详解iOS集成第三方登录(SSO授权登录<无需密码>)
    1.前言 不多说,第三登录无处不在!必备技能,今天以新浪微博为例。这是上次写的iOS第三方社交分享:http:www.cnblogs.comqingchep3727559.html ... [详细]
  • AFNetwork框架(零)使用NSURLSession进行网络请求
    本文介绍了AFNetwork框架中使用NSURLSession进行网络请求的方法,包括NSURLSession的配置、请求的创建和执行等步骤。同时还介绍了NSURLSessionDelegate和NSURLSessionConfiguration的相关内容。通过本文可以了解到AFNetwork框架中使用NSURLSession进行网络请求的基本流程和注意事项。 ... [详细]
  • 高质量SQL书写的30条建议
    本文提供了30条关于优化SQL的建议,包括避免使用select *,使用具体字段,以及使用limit 1等。这些建议是基于实际开发经验总结出来的,旨在帮助读者优化SQL查询。 ... [详细]
  • 本文介绍了SPOJ2829题目的解法及优化方法。题目要求找出满足一定条件的数列,并对结果取模。文章详细解释了解题思路和算法实现,并提出了使用FMT优化的方法。最后,对于第三个限制条件,作者给出了处理方法。文章最后给出了代码实现。 ... [详细]
  • rabbitmq杂谈
    rabbitmq中的consumerTag和deliveryTag分别是干啥的,有什么用?同一个会话,consumerTag是固定的可以做此会话的名字,deliveryTag每次接 ... [详细]
  • 本文介绍了在rhel5.5操作系统下搭建网关+LAMP+postfix+dhcp的步骤和配置方法。通过配置dhcp自动分配ip、实现外网访问公司网站、内网收发邮件、内网上网以及SNAT转换等功能。详细介绍了安装dhcp和配置相关文件的步骤,并提供了相关的命令和配置示例。 ... [详细]
  • Oracle分析函数first_value()和last_value()的用法及原理
    本文介绍了Oracle分析函数first_value()和last_value()的用法和原理,以及在查询销售记录日期和部门中的应用。通过示例和解释,详细说明了first_value()和last_value()的功能和不同之处。同时,对于last_value()的结果出现不一样的情况进行了解释,并提供了理解last_value()默认统计范围的方法。该文对于使用Oracle分析函数的开发人员和数据库管理员具有参考价值。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • 也就是|小窗_卷积的特征提取与参数计算
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了卷积的特征提取与参数计算相关的知识,希望对你有一定的参考价值。Dense和Conv2D根本区别在于,Den ... [详细]
  • ALTERTABLE通过更改、添加、除去列和约束,或者通过启用或禁用约束和触发器来更改表的定义。语法ALTERTABLEtable{[ALTERCOLUMNcolu ... [详细]
  • 本文讨论了如何使用IF函数从基于有限输入列表的有限输出列表中获取输出,并提出了是否有更快/更有效的执行代码的方法。作者希望了解是否有办法缩短代码,并从自我开发的角度来看是否有更好的方法。提供的代码可以按原样工作,但作者想知道是否有更好的方法来执行这样的任务。 ... [详细]
  • 本文讨论了在手机移动端如何使用HTML5和JavaScript实现视频上传并压缩视频质量,或者降低手机摄像头拍摄质量的问题。作者指出HTML5和JavaScript无法直接压缩视频,只能通过将视频传送到服务器端由后端进行压缩。对于控制相机拍摄质量,只有使用JAVA编写Android客户端才能实现压缩。此外,作者还解释了在交作业时使用zip格式压缩包导致CSS文件和图片音乐丢失的原因,并提供了解决方法。最后,作者还介绍了一个用于处理图片的类,可以实现图片剪裁处理和生成缩略图的功能。 ... [详细]
  • 本文讨论了一个数列求和问题,该数列按照一定规律生成。通过观察数列的规律,我们可以得出求解该问题的算法。具体算法为计算前n项i*f[i]的和,其中f[i]表示数列中有i个数字。根据参考的思路,我们可以将算法的时间复杂度控制在O(n),即计算到5e5即可满足1e9的要求。 ... [详细]
  • 本文讨论了微软的STL容器类是否线程安全。根据MSDN的回答,STL容器类包括vector、deque、list、queue、stack、priority_queue、valarray、map、hash_map、multimap、hash_multimap、set、hash_set、multiset、hash_multiset、basic_string和bitset。对于单个对象来说,多个线程同时读取是安全的。但如果一个线程正在写入一个对象,那么所有的读写操作都需要进行同步。 ... [详细]
author-avatar
命注定啊_894
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有