热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ApacheAirflow-完成时触发/计划DAG重新运行(文件传感器)

如何解决《ApacheAirflow-完成时触发/计划DAG重新运行(文件传感器)》经验,为你挑选了2个好方法。

早上好.

我正在尝试设置DAG

    观察/感知文件以命中网络文件夹

    处理文件

    归档文件

使用在线教程和stackoverflow我已经能够提出以下DAG和运算符成功实现目标,但是我希望DAG重新安排或在完成时重新运行,以便开始观察/感知另一个文件.

我试图设置一个变量max_active_runs:1,然后schedule_interval: timedelta(secOnds=5)这个是重新安排DAG,但开始排队任务并锁定文件.

欢迎任何关于如何在archive_task之后重新运行DAG的想法?

谢谢

DAG代码

from airflow import DAG
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator
from datetime import datetime, timedelta
from airflow.models import Variable

default_args = {
    'owner': 'glsam',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'provide_context': True,
    'retries': 100,
    'retry_delay': timedelta(secOnds=30),
    'max_active_runs': 1,
    'schedule_interval': timedelta(secOnds=5),
}

dag = DAG('test_sensing_for_a_file', default_args=default_args)

filepath = Variable.get("soucePath_Test")
filepattern = Variable.get("filePattern_Test")
archivepath = Variable.get("archivePath_Test")

sensor_task = OmegaFileSensor(
    task_id='file_sensor_task',
    filepath=filepath,
    filepattern=filepattern,
    poke_interval=3,
    dag=dag)


def process_file(**context):
    file_to_process = context['task_instance'].xcom_pull(
        key='file_name', task_ids='file_sensor_task')
    file = open(filepath + file_to_process, 'w')
    file.write('This is a test\n')
    file.write('of processing the file')
    file.close()


proccess_task = PythonOperator(
    task_id='process_the_file', 
    python_callable=process_file,
    provide_cOntext=True,
    dag=dag
)

archive_task = ArchiveFileOperator(
    task_id='archive_file',
    filepath=filepath,
    archivepath=archivepath,
    dag=dag)

sensor_task >> proccess_task >> archive_task

文件传感器操作员

    import os
    import re

    from datetime import datetime
    from airflow.models import BaseOperator
    from airflow.plugins_manager import AirflowPlugin
    from airflow.utils.decorators import apply_defaults
    from airflow.operators.sensors import BaseSensorOperator


    class ArchiveFileOperator(BaseOperator):
        @apply_defaults
        def __init__(self, filepath, archivepath, *args, **kwargs):
            super(ArchiveFileOperator, self).__init__(*args, **kwargs)
            self.filepath = filepath
            self.archivepath = archivepath

        def execute(self, context):
            file_name = context['task_instance'].xcom_pull(
                'file_sensor_task', key='file_name')
            os.rename(self.filepath + file_name, self.archivepath + file_name)


    class OmegaFileSensor(BaseSensorOperator):
        @apply_defaults
        def __init__(self, filepath, filepattern, *args, **kwargs):
            super(OmegaFileSensor, self).__init__(*args, **kwargs)
            self.filepath = filepath
            self.filepattern = filepattern

        def poke(self, context):
            full_path = self.filepath
            file_pattern = re.compile(self.filepattern)

            directory = os.listdir(full_path)

            for files in directory:
                if re.match(file_pattern, files):
                    context['task_instance'].xcom_push('file_name', files)
                    return True
            return False


    class OmegaPlugin(AirflowPlugin):
        name = "omega_plugin"
        operators = [OmegaFileSensor, ArchiveFileOperator]

Glenn Sampso.. 9

德米特里斯方法运作得很好.

我也在我的阅读设置中找到了schedule_interval=None然后使用TriggerDagRunOperator同样工作

trigger = TriggerDagRunOperator(
    task_id='trigger_dag_RBCPV99_rerun',
    trigger_dag_id="RBCPV99_v2",
    dag=dag)

sensor_task >> proccess_task >> archive_task >> trigger


Dmitri Safin.. 7

设置schedule_interval=None 并使用airflow trigger_dag命令from从BashOperator上一个命令完成时启动下一个执行。

trigger_next = BashOperator(task_id="trigger_next", 
           bash_command="airflow trigger_dag 'your_dag_id'", dag=dag)

sensor_task >> proccess_task >> archive_task >> trigger_next

您可以使用相同的airflow trigger_dag命令手动开始首次运行,然后trigger_next任务将自动触发下一个运行。我们已经在生产中使用了数月,并且运行良好。



1> Glenn Sampso..:

德米特里斯方法运作得很好.

我也在我的阅读设置中找到了schedule_interval=None然后使用TriggerDagRunOperator同样工作

trigger = TriggerDagRunOperator(
    task_id='trigger_dag_RBCPV99_rerun',
    trigger_dag_id="RBCPV99_v2",
    dag=dag)

sensor_task >> proccess_task >> archive_task >> trigger



2> Dmitri Safin..:

设置schedule_interval=None 并使用airflow trigger_dag命令from从BashOperator上一个命令完成时启动下一个执行。

trigger_next = BashOperator(task_id="trigger_next", 
           bash_command="airflow trigger_dag 'your_dag_id'", dag=dag)

sensor_task >> proccess_task >> archive_task >> trigger_next

您可以使用相同的airflow trigger_dag命令手动开始首次运行,然后trigger_next任务将自动触发下一个运行。我们已经在生产中使用了数月,并且运行良好。


推荐阅读
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • Python正则表达式学习记录及常用方法
    本文记录了学习Python正则表达式的过程,介绍了re模块的常用方法re.search,并解释了rawstring的作用。正则表达式是一种方便检查字符串匹配模式的工具,通过本文的学习可以掌握Python中使用正则表达式的基本方法。 ... [详细]
  • 展开全部下面的代码是创建一个立方体Thisexamplescreatesanddisplaysasimplebox.#Thefirstlineloadstheinit_disp ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
  • Python爬虫中使用正则表达式的方法和注意事项
    本文介绍了在Python爬虫中使用正则表达式的方法和注意事项。首先解释了爬虫的四个主要步骤,并强调了正则表达式在数据处理中的重要性。然后详细介绍了正则表达式的概念和用法,包括检索、替换和过滤文本的功能。同时提到了re模块是Python内置的用于处理正则表达式的模块,并给出了使用正则表达式时需要注意的特殊字符转义和原始字符串的用法。通过本文的学习,读者可以掌握在Python爬虫中使用正则表达式的技巧和方法。 ... [详细]
  • 使用圣杯布局模式实现网站首页的内容布局
    本文介绍了使用圣杯布局模式实现网站首页的内容布局的方法,包括HTML部分代码和实例。同时还提供了公司新闻、最新产品、关于我们、联系我们等页面的布局示例。商品展示区包括了车里子和农家生态土鸡蛋等产品的价格信息。 ... [详细]
  • 如何实现织梦DedeCms全站伪静态
    本文介绍了如何通过修改织梦DedeCms源代码来实现全站伪静态,以提高管理和SEO效果。全站伪静态可以避免重复URL的问题,同时通过使用mod_rewrite伪静态模块和.htaccess正则表达式,可以更好地适应搜索引擎的需求。文章还提到了一些相关的技术和工具,如Ubuntu、qt编程、tomcat端口、爬虫、php request根目录等。 ... [详细]
  • Java实战之电影在线观看系统的实现
    本文介绍了Java实战之电影在线观看系统的实现过程。首先对项目进行了简述,然后展示了系统的效果图。接着介绍了系统的核心代码,包括后台用户管理控制器、电影管理控制器和前台电影控制器。最后对项目的环境配置和使用的技术进行了说明,包括JSP、Spring、SpringMVC、MyBatis、html、css、JavaScript、JQuery、Ajax、layui和maven等。 ... [详细]
  • Android开发实现的计时器功能示例
    本文分享了Android开发实现的计时器功能示例,包括效果图、布局和按钮的使用。通过使用Chronometer控件,可以实现计时器功能。该示例适用于Android平台,供开发者参考。 ... [详细]
  • 本文介绍了Oracle存储过程的基本语法和写法示例,同时还介绍了已命名的系统异常的产生原因。 ... [详细]
  • REVERT权限切换的操作步骤和注意事项
    本文介绍了在SQL Server中进行REVERT权限切换的操作步骤和注意事项。首先登录到SQL Server,其中包括一个具有很小权限的普通用户和一个系统管理员角色中的成员。然后通过添加Windows登录到SQL Server,并将其添加到AdventureWorks数据库中的用户列表中。最后通过REVERT命令切换权限。在操作过程中需要注意的是,确保登录名和数据库名的正确性,并遵循安全措施,以防止权限泄露和数据损坏。 ... [详细]
author-avatar
缘无音iy
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有