作者:缘无音iy | 来源:互联网 | 2023-01-31 11:00
早上好.
我正在尝试设置DAG
观察/感知文件以命中网络文件夹
处理文件
归档文件
使用在线教程和stackoverflow我已经能够提出以下DAG和运算符成功实现目标,但是我希望DAG重新安排或在完成时重新运行,以便开始观察/感知另一个文件.
我试图设置一个变量max_active_runs:1
,然后schedule_interval: timedelta(secOnds=5)
这个是重新安排DAG,但开始排队任务并锁定文件.
欢迎任何关于如何在archive_task之后重新运行DAG的想法?
谢谢
DAG代码
from airflow import DAG
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator
from datetime import datetime, timedelta
from airflow.models import Variable
default_args = {
'owner': 'glsam',
'depends_on_past': False,
'start_date': datetime.now(),
'provide_context': True,
'retries': 100,
'retry_delay': timedelta(secOnds=30),
'max_active_runs': 1,
'schedule_interval': timedelta(secOnds=5),
}
dag = DAG('test_sensing_for_a_file', default_args=default_args)
filepath = Variable.get("soucePath_Test")
filepattern = Variable.get("filePattern_Test")
archivepath = Variable.get("archivePath_Test")
sensor_task = OmegaFileSensor(
task_id='file_sensor_task',
filepath=filepath,
filepattern=filepattern,
poke_interval=3,
dag=dag)
def process_file(**context):
file_to_process = context['task_instance'].xcom_pull(
key='file_name', task_ids='file_sensor_task')
file = open(filepath + file_to_process, 'w')
file.write('This is a test\n')
file.write('of processing the file')
file.close()
proccess_task = PythonOperator(
task_id='process_the_file',
python_callable=process_file,
provide_cOntext=True,
dag=dag
)
archive_task = ArchiveFileOperator(
task_id='archive_file',
filepath=filepath,
archivepath=archivepath,
dag=dag)
sensor_task >> proccess_task >> archive_task
文件传感器操作员
import os
import re
from datetime import datetime
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.operators.sensors import BaseSensorOperator
class ArchiveFileOperator(BaseOperator):
@apply_defaults
def __init__(self, filepath, archivepath, *args, **kwargs):
super(ArchiveFileOperator, self).__init__(*args, **kwargs)
self.filepath = filepath
self.archivepath = archivepath
def execute(self, context):
file_name = context['task_instance'].xcom_pull(
'file_sensor_task', key='file_name')
os.rename(self.filepath + file_name, self.archivepath + file_name)
class OmegaFileSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath, filepattern, *args, **kwargs):
super(OmegaFileSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
self.filepattern = filepattern
def poke(self, context):
full_path = self.filepath
file_pattern = re.compile(self.filepattern)
directory = os.listdir(full_path)
for files in directory:
if re.match(file_pattern, files):
context['task_instance'].xcom_push('file_name', files)
return True
return False
class OmegaPlugin(AirflowPlugin):
name = "omega_plugin"
operators = [OmegaFileSensor, ArchiveFileOperator]
Glenn Sampso..
9
德米特里斯方法运作得很好.
我也在我的阅读设置中找到了schedule_interval=None
然后使用TriggerDagRunOperator同样工作
trigger = TriggerDagRunOperator(
task_id='trigger_dag_RBCPV99_rerun',
trigger_dag_id="RBCPV99_v2",
dag=dag)
sensor_task >> proccess_task >> archive_task >> trigger
Dmitri Safin..
7
设置schedule_interval=None
并使用airflow trigger_dag
命令from从BashOperator
上一个命令完成时启动下一个执行。
trigger_next = BashOperator(task_id="trigger_next",
bash_command="airflow trigger_dag 'your_dag_id'", dag=dag)
sensor_task >> proccess_task >> archive_task >> trigger_next
您可以使用相同的airflow trigger_dag
命令手动开始首次运行,然后trigger_next
任务将自动触发下一个运行。我们已经在生产中使用了数月,并且运行良好。
1> Glenn Sampso..:
德米特里斯方法运作得很好.
我也在我的阅读设置中找到了schedule_interval=None
然后使用TriggerDagRunOperator同样工作
trigger = TriggerDagRunOperator(
task_id='trigger_dag_RBCPV99_rerun',
trigger_dag_id="RBCPV99_v2",
dag=dag)
sensor_task >> proccess_task >> archive_task >> trigger
2> Dmitri Safin..:
设置schedule_interval=None
并使用airflow trigger_dag
命令from从BashOperator
上一个命令完成时启动下一个执行。
trigger_next = BashOperator(task_id="trigger_next",
bash_command="airflow trigger_dag 'your_dag_id'", dag=dag)
sensor_task >> proccess_task >> archive_task >> trigger_next
您可以使用相同的airflow trigger_dag
命令手动开始首次运行,然后trigger_next
任务将自动触发下一个运行。我们已经在生产中使用了数月,并且运行良好。