热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

5分钟快速掌握Python定时任务框架的实现

这篇文章主要介绍了5分钟快速掌握Python定时任务框架,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

APScheduler 简介

在实际开发中我们经常会碰上一些重复性或周期性的任务,比如像每天定时爬取某个网站的数据、一定周期定时运行代码训练模型等,类似这类的任务通常需要我们手动来进行设定或调度,以便其能够在我们设定好的时间内运行。

在 Windows 上我们可以通过计划任务来手动实现,而在 Linux 系统上往往我们会用到更多关于 crontab 的相关操作。但手动管理并不是一个很好的选择,如果我们需要有十几个不同的定时任务需要管理,那么每次通过人工来进行干预未免有些笨拙,那这时候就真的是「人工智能」了。

所以将这些定时任务的调度代码化才是能够让我们很好地从这种手动管理的纯人力操作中解脱出来。

在 Python 生态中对于定时任务的一些操作主要有那么几个:

  • schedule:第三方模块,该模块适合比较轻量级的一些调度任务,但却不适用于复杂时间的调度
  • APScheduler:第三方定时任务框架,是对 Java 第三方定时任务框架 Quartz 的模仿与移植,能提供比 schedule 更复杂的应用场景,并且各种组件都是模块化,易于使用与二次开发。
  • Celery Beat:属于 celery 这分布式任务队列第三方库下的一个定时任务组件,如果使用需要配合 RabbitMQ 或 Redis 这类的消息队列套件,需要花费一定的时间在环境搭建上,但在高版本中已经不支持 Windows。

所以为了满足能够相对复杂的时间条件,又不需要在前期的环境搭建上花费很多时间的前提下,选择 APScheduler 来对我们的调度任务或定时任务进行管理是个性价比极高的选择。而本文主要会带你快速上手有关 APScheduler 的使用。

APScheduler 概念与组件

虽然说官方文档上的内容不是很多,而且所列举的 API 不是很多,但这侧面也反映了这一框架的简单易用。所以在使用 APScheduler 之前,我们需要对这个框架的一些概念简单了解,主要有那么以下几个:

  • 触发器(trigger)
  • 任务持久化(job stores)
  • 执行器(executor)
  • 调度器(scheduler)

触发器(trigger)

所谓的触发器就是用以触发定时任务的组件,在 APScheduler 中主要是指时间触发器,并且主要有三类时间触发器可供使用:

  • date:日期触发器。日期触发器主要是在某一日期时间点上运行任务时调用,是 APScheduler 里面最简单的一种触发器。所以通常也适用于一次性的任务或作业调度。
  • interval:间隔触发器。间隔触发器是在日期触发器基础上扩展了对时间部分,比如时、分、秒、天、周这几个部分的设定。是我们用以对重复性任务进行设定或调度的一个常用调度器。设定了时间部分之后,从起始日期开始(默认是当前)会按照设定的时间去执行任务。
  • cron:cron 表达式触发器。cron 表达式触发器就等价于我们 Linux 上的 crontab,它主要用于更复杂的日期时间进行设定。但需要注意的是,APScheduler 不支持 6 位及以上的 cron 表达式,最多只支持到 5 位。

任务持久化(job stores)

任务持久化主要是用于将设定好的调度任务进行存储,即便是程序因为意外情况,如断电、电脑或服务器重启时,只要重新运行程序时,APScheduler 就会根据对存储好的调度任务结果进行判断,如果出现已经过期但未执行的情况会进行相应的操作。

APScheduler 为我们提供了多种持久化任务的途径,默认是使用 memory 也就是内存的形式,但内存并不是持久化最好的方式。最好的方式则是通过像数据库这样的载体来将我们的定时任务写入到磁盘当中,只要磁盘没有损坏就能将数据给恢复。

APScheduler 支持的且常用的数据库主要有:

  • sqlalchemy 形式的数据库,这里就主要是指各种传统的关系型数据库,如 MySQL、PostgreSQL、SQLite 等。
  • mongodb 非结构化的 Mongodb 数据库,该类型数据库经常用于对非结构化或版结构化数据的存储或操作,如 JSON。
  • redis 内存数据库,通常用作数据缓存来使用,当然通过一些主从复制等方式也能实现当中数据的持久化或保存。

通常我们可以在创建 Scheduler 实例时创建,或是单独为任务指定。配置的方式相对简单,我们只需要指定对应的数据库链接即可。

执行器(executor)

执行器顾名思义就是执行我们任务的对象,在计算机内通常要么是 CPU 调度任务,要么是单独维护一个线程来运行任务。所以 APScheduler 里的执行器通常就是 ThreadPoolExecutor 或 ProcessPoolExecutor 这样的线程池和进程池两种。

当然如果是和协程或异步相关的任务调度,还可以使用对应的 AsyncIOExecutor、TwistedExecutor 和 GeventExecutor 三种执行器。

调度器(scheduler)

调度器的选择主要取决于你当前的程序环境以及 APScheduler 的用途。根据用途的不同,APScheduler 又提供了以下几种调度器:

  • BlockingScheduler:阻塞调度器,当程序中没有任何存在主进程之中运行东西时,就则使用该调度器。
  • BackgroundScheduler:后台调度器,在不使用后面任何的调度器且希望在应用程序内部运行时的后台启动时才进行使用,如当前你已经开启了一个 Django 或 Flask 服务。
  • AsyncIOScheduler:AsyncIO 调度器,如果代码是通过 asyncio 模块进行异步操作,使用该调度器。
  • GeventScheduler:Gevent 调度器,如果代码是通过 gevent 模块进行协程操作,使用该调度器
  • TornadoScheduler:Tornado 调度器,在 Tornado 框架中使用
  • TwistedScheduler:Twisted 调度器,在基于 Twisted 的框架或应用程序中使用
  • QtScheduler:Qt 调度器,在构建 Qt 应用中进行使用。

通常情况下如果不是和 Web 项目或应用集成共存,那么往往都首选 BlockingScheduler 调度器来进行操作,它会在当前进程中启动相应的线程来进行任务调度与处理;反之,如果是和 Web 项目或应用共存,那么需要选择 BackgroundScheduler 调度器,因为它不会干扰当前应用的线程或进程状况。

基于对以上的概念和组件认识,我们就能基本上摸清 APScheduler 的运行流程:

  • 设定调度器(scheduler)用以对任务的调度与安排进行全局统筹
  • 对相应的函数或方法上设定相应的触发器(trigger),并添加到调度器中
  • 如有任务持久化(job stores)需要则需要设定对应的持久化层,否则默认使用内存存储任务
  • 当触发器被触发时,就将任务交由执行器(executor)进行执行

APScheduler 快速上手

虽然 APScheduler 里面的概念和组件看起来有点多,但在使用上并不算很复杂,我们可以通过本节的示例就能够很快使用。

选择对应的 scheduler

在使用之前我们需要先实例化一个 scheduler 对象,所有的 scheduler 对象都被放在了 apscheduler.schedulers 模块下,我们可以直接通过查看 API 文档或者借助 IDE 补全的提示来获取相应的 scheduler 对象。

这里我直接选取了最基础的 BlockingScheduler:

# main.py
 
from apscheduler.schedulers.blocking import BlockingScheduler
 
scheduler = BlockingScheduler()

配置 scheduler

对于 scheduler 的一些配置我们可以直接在实例化对象时就进行配置,当然也可以在创建实例化对象之后再进行配置。

实例化时进行参数配置:

# main.py
from datetime import datetime
 
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
 
# 任务持久化 使用 SQLite
jobstores = {
  'default': SQLAlchemyJobStore(url = 'sqlite:///jobs.db')
}
# 执行器配置
executors = {
  'default': ThreadPoolExecutor(20),
}
# 关于 Job 的相关配置,见官方文档 API
job_defaults = {
  'coalesce': False,
  'next_run_time': datetime.now()
}
scheduler = BlockingScheduler(
 jobstores = jobstores,
 executors = executors,
 job_defaults = job_defaults,
 timezOne= 'Asia/Shanghai'
)

或是通过 scheduler.configure 方法进行同样的操作:

scheduler = BlockingScheduler()
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezOne='Asia/Shanghai')

添加并执行你的任务

创建 scheduler 对象之后,我们需要调用其下的 add_job() 或是 scheduled_job() 方法来将我们需要执行的函数进行注册。前者是以传参的形式指定对应的函数名,而后者则是以装饰器的形式直接对我们要执行的函数进行修饰。

比如我现在有一个输出此时此刻时间的函数 now():

from datetime import datetime
 
def now(trigger):
  print(f"trigger:{trigger} -> {datetime.now()}")

然后我打算每 5 秒的时候运行一次,那我们使用 add_job() 可以这样写:

if __name__ == '__main__':
  scheduler.add_job(now, trigger = "interval", args = ("interval",), secOnds= 5)
  scheduler.start()

在调用 start() 方法之后调度器就会开始执行,并在控制台上看到对应的结果了:

trigger:interval -> 2021-01-16 21:19:43.356674
trigger:interval -> 2021-01-16 21:19:46.679849
trigger:interval -> 2021-01-16 21:19:48.356595

当然使用 @scheduled_job 的方式来装饰我们的任务或许会更加自由一些,于是上面的例子就可以写成这样:

@scheduler.scheduled_job(trigger = "interval", args = ("interval",), secOnds= 5)
def now(trigger):
  print(f"trigger:{trigger} -> {datetime.now()}")
 
if __name__ == '__main__':
  scheduler.start()

运行之后就会在控制台看到同样的结果了。

不过需要注意的是,添加任务一定要在 start() 方法执行前调用,否则会找不到任务或是抛出异常。

将 APScheduler 集成到 Web 项目中

如果你是正在做有关的 Web 项目且存在一些定时任务,那么得益于 APScheduler 由于多样的调度器,我们能够将其和我们的项目结合到一起。

如果你正在使用 Flask,那么 Flask-APScheduler 这一别人写好的第三方包装库就很适合你,虽然它没有相关的文档,但只要你了解了前面我所介绍的有关于 APScheduler 的概念和组件,你就能很轻易地看懂这个第三方库仓库里的示例代码。

如果你使用的不是 Flask 框架,那么 APScheduler 本身也提供了一些对任务或作业的增删改查操作,我们可以自己编写一套合适的 API。

这里我使用的是 FastAPI 这一目前流行的 Web 框架。demo 项目结构如下:

temp-scheduler
├── config.py    # 配置项
├── main.py     # API 文件
└── scheduler.py  # APScheduler 相关设置

安装依赖

这里我们需要的依赖不多,只需要简单几个即可:

pip install fastapi apscheduler sqlalchemy uvicorn

配置项

如果项目中模块过多,那么使用一个文件或模块来进行统一管理是最好的选择。这里的 config.py 我们主要像 Flask 的配置那样简单设定:

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
 
class SchedulerConfig:
 
  JOBSTORES = {"default": SQLAlchemyJobStore(url="sqlite:///job.db")}
  EXECUTORS = {"default": ThreadPoolExecutor(20)}
  JOB_DEFAULTS = {"coalesce": False}
 
  @classmethod
  def to_dict(cls):
    return {
      "jobstores": cls.JOBSTORES,
      "executors": cls.EXECUTORS,
      "job_defaults": cls.JOB_DEFAULTS,
    }

在 SchedulerConfig 配置项中我们可以自己实现一个 to_dict() 类方法,以便我们后续传参时通过解包的方式直接传入配置参数即可。

Scheduler 相关设置

scheduler.py 模块的设定也比较简单,即设定对应的 scheduler 调度器即可。由于是演示 demo 我还将要定期执行的任务也放在了这个模块当中:

import logging
from datetime import datetime
 
from apscheduler.schedulers.background import BackgroundScheduler
 
from config import SchedulerConfig
 
scheduler = BackgroundScheduler()
logger = logging.getLogger(__name__)
 
def init_scheduler() -> None:
  # config scheduler
  scheduler.configure(**SchedulerConfig.to_dict())
 
  logger.info("scheduler is running...")
 
  # schedule test
  scheduler.add_job(
    func=mytask,
    trigger="date",
    args=("APScheduler Initialize.",),
    next_run_time=datetime.now(),
  )
  scheduler.start()
 
def mytask(message: str) -> None:
  print(f"[{datetime.now()}] message: {message}")

在这一部分中:

  • init_scheduler() 方法主要用于在 API 服务启动时被调用,然后对 scheduler 对象的配置以及测试
  • mytask() 则是我们要定期执行的任务,后续我们可以通过 APScheduler 提供的方法来自行添加任务

API 设置

在 main.py 模块就主要存放着我们由 FastAPI 所构建的相关 API。如果在后续开发时存在多个接口,此时就需要将不同接口放在不同模块文件中,以达到路由的分发与管理,类似于 Flask 的蓝图模式。

import logging
import uuid
from datetime import datetime
from typing import Any, Dict, Optional, Sequence, Union
 
from fastapi import FastAPI
from pydantic import BaseModel
 
from scheduler import init_scheduler, mytask, scheduler
 
logger = logging.getLogger(__name__)
 
app = FastAPI(title="APScheduler API")
app.add_event_handler("startup", init_scheduler)
 
class Job(BaseModel):
  id: Union[int, str, uuid.UUID]
  name: Optional[str] = None
  func: Optional[str] = None
  args: Optional[Sequence[Optional[str]]] = None
  kwargs: Optional[Dict[str, Any]] = None
  executor: Optional[str] = None
  misfire_grace_time: Optional[str] = None
  coalesce: Optional[bool] = None
  max_instances: Optional[int] = None
  next_run_time: Optional[Union[str, datetime]] = None
 
@app.post("/add")
def add_job(
  message: str,
  trigger: str,
  trigger_args: Optional[dict],
  id: Union[str, int, uuid.UUID],
):
  try:
    scheduler.add_job(
      func=mytask,
      trigger=trigger,
      kwargs={"message": message},
      id=id,
      **trigger_args,
    )
  except Exception as e:
    logger.exception(e.args)
    return {"status_code": 0, "message": "添加失败"}
  return {"status_code": 1, "message": "添加成功"}
 
@app.delete("/delete/{id}")
def delete_job(id: Union[str, int, uuid.UUID]):
  """delete exist job by id"""
  try:
    scheduler.remove_job(job_id=id)
  except Exception:
    return dict(
      message="删除失败",
      status_code=0,
    )
  return dict(
    message="删除成功",
    status_code=1,
  )
 
@app.put("/reschedule/{id}")
def reschedule_job(
  id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict]
):
  try:
    scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args)
  except Exception as e:
    logger.exception(e.args)
    return dict(
      message="修改失败",
      status_code=0,
    )
  return dict(
    message="修改成功",
    status_code=1,
  )
 
@app.get("/job")
def get_all_jobs():
  jobs = None
  try:
    job_list = scheduler.get_jobs()
    if job_list:
      jobs = [Job(**task.__getstate__()) for task in job_list]
  except Exception as e:
    logger.exception(e.args)
    return dict(
      message="查询失败",
      status_code=0,
      jobs=jobs,
    )
  return dict(
    message="查询成功",
    status_code=1,
    jobs=jobs,
  )
 
@app.get("/job/{id}")
def get_job_by_id(id: Union[int, str, uuid.UUID]):
  jobs = []
  try:
    job = scheduler.get_job(job_id=id)
    if job:
      jobs = [Job(**job.__getstate__())]
  except Exception as e:
    logger.exception(e.args)
    return dict(
      message="查询失败",
      status_code=0,
      jobs=jobs,
    )
  return dict(
    message="查询成功",
    status_code=1,
    jobs=jobs,
  )

以上代码看起来很多,其实核心的就那么几点:

FastAPI 对象 app 的初始化。这里用到的 add_event_handler() 方法就有点像 Flask 中的 before_first_request,会在 Web 服务请求伊始进行操作,理解为初始化相关的操作即可。

API 接口路由。路由通过 app 对象下的对应 HTTP 方法来实现,如 GET、POST、PUT 等。这里的装饰器用法其实也和 Flask 很类似,就不多赘述。

scheduler 对象的增删改查。从 scheduler.py 模块中引入我们创建好的 scheduler 对象之后就可以直接用来做增删改查的操作:

  • 增:使用 add_job() 方法,其主要的参数是要运行的函数(或方法)、触发器以及触发器参数等
  • 删:使用 delete_job() 方法,我们需要传入一个对应任务的 id 参数,用以能够查找到对应的任务
  • 改:使用 reschedule_job() 方法,这里也需要一个对应任务的 id 参数,以及需要重新修改的触发器及其参数
  • 查:使用 get_jobs() 和 get_job() 两个方法,前者是直接获取到当前调度的所有任务,返回的是一个包含了 APScheduler.job.Job 对象的列表,而后者是通过 id 参数来查找对应的任务对象;这里我通过底层源码使用 __getstate__() 来获取到任务的相关信息,这些信息我们通过事先设定好的 Job 对象来对其进行序列化,最后将信息从接口中返回。

运行

完成以上的所有操作之后,我们就可以打开控制台,进入到该目录下并激活我们的虚拟环境,之后运行:

uvicorn main:app 

之后我们就能在 FastAPI 默认的地址 http://127.0.0.1:8000/docs  中看到关于全部接口的 Swagger 文档页面了:

fastapi 集成的 swagger 页面

之后我们可以直接在文档里面或使用 Postman 来自己进行接口测试即可。

结尾

本文介绍了有关于 APScheduler 框架的概念及其用法,并进行了简单的实践。

得益于 APScheduler 的模块化设计才可以让我们更方便地去理解、使用它,并将其运用到我们实际的开发过程中。

从 APScheduler 目前的 Github 仓库代码以及 issue 来看,作者已经在开始重构 4.0 版本,当中的一些源代码和 API 也有较大的变动,相信在 4.0 版本中将会引入更多的新特性。

但如果现阶段你正打算使用或已经使用 APScheduler 用于实际生产中,那么希望本文能对会你有所帮助。

到此这篇关于5分钟快速掌握Python定时任务框架的实现的文章就介绍到这了,更多相关Python 定时任务内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!


推荐阅读
  • Centos下安装memcached+memcached教程
    本文介绍了在Centos下安装memcached和使用memcached的教程,详细解释了memcached的工作原理,包括缓存数据和对象、减少数据库读取次数、提高网站速度等。同时,还对memcached的快速和高效率进行了解释,与传统的文件型数据库相比,memcached作为一个内存型数据库,具有更高的读取速度。 ... [详细]
  • 基于layUI的图片上传前预览功能的2种实现方式
    本文介绍了基于layUI的图片上传前预览功能的两种实现方式:一种是使用blob+FileReader,另一种是使用layUI自带的参数。通过选择文件后点击文件名,在页面中间弹窗内预览图片。其中,layUI自带的参数实现了图片预览功能。该功能依赖于layUI的上传模块,并使用了blob和FileReader来读取本地文件并获取图像的base64编码。点击文件名时会执行See()函数。摘要长度为169字。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • Java验证码——kaptcha的使用配置及样式
    本文介绍了如何使用kaptcha库来实现Java验证码的配置和样式设置,包括pom.xml的依赖配置和web.xml中servlet的配置。 ... [详细]
  • 高质量SQL书写的30条建议
    本文提供了30条关于优化SQL的建议,包括避免使用select *,使用具体字段,以及使用limit 1等。这些建议是基于实际开发经验总结出来的,旨在帮助读者优化SQL查询。 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 本文介绍了Composer依赖管理的重要性及使用方法。对于现代语言而言,包管理器是标配,而Composer作为PHP的包管理器,解决了PEAR的问题,并且使用简单,方便提交自己的包。文章还提到了使用Composer能够避免各种include的问题,避免命名空间冲突,并且能够方便地安装升级扩展包。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 篇首语:本文由编程笔记#小编为大家整理,主要介绍了软件测试知识点之数据库压力测试方法小结相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 本文介绍了如何在MySQL中将零值替换为先前的非零值的方法,包括使用内联查询和更新查询。同时还提供了选择正确值的方法。 ... [详细]
  • 安卓select模态框样式改变_微软Office风格的多端(Web、安卓、iOS)组件库——Fabric UI...
    介绍FabricUI是微软开源的一套Office风格的多端组件库,共有三套针对性的组件,分别适用于web、android以及iOS,Fab ... [详细]
  • 本文介绍了解决二叉树层序创建问题的方法。通过使用队列结构体和二叉树结构体,实现了入队和出队操作,并提供了判断队列是否为空的函数。详细介绍了解决该问题的步骤和流程。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
author-avatar
mobiledu2502882733
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有