作者:我细细 | 来源:互联网 | 2023-01-03 15:58
我似乎不明白如何将模块导入到apache airflow DAG定义文件中.我想这样做是为了能够创建一个库,例如,使用相似的设置声明任务更简洁.
这是我能想到的最简单的例子,它复制了这个问题:我修改了气流教程(https://airflow.apache.org/tutorial.html#recap),只需导入一个模块并从该模块运行一个定义.像这样:
目录结构:
- dags/
-- __init__.py
-- lib.py
-- tutorial.py
tutorial.py:
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
# Here is my added import
from lib import print_double
# And my usage of the imported def
print_double(2)
## -- snip, because this is just the tutorial code,
## i.e., some standard DAG defintion stuff --
print_double
只是一个简单的def,它将你给它的任何输入乘以2,并打印结果,但显然这甚至不重要,因为这是一个导入问题.
我能够airflow test tutorial print_date 2015-06-01
按照教程文档成功运行 - dag运行,而且print_double成功.4
按预期打印到控制台.一切都很顺利.
然后我去了网络用户界面,我受到了欢迎Broken DAG: [/home/airflow/airflow/dags/tutorial.py] No module named 'lib'
.取消暂停dag并尝试使用UI手动运行会导致"运行"状态,但它永远不会成功或失败.它只是永远地"奔跑".我可以按照自己的意愿排队,但他们都会坐在"跑步"状态.
我检查了气流日志,并没有看到任何有用的调试信息.
那我错过了什么?
1> Ash Berlin-T..:
您是否正在使用Airflow 1.9.0?这可能在此固定。
这个问题是由Airflow加载DAG的方式引起的:它不只是将它们导入为普通的python模块,因为它希望能够在不重新启动进程的情况下重新加载它。结果.
是不在python搜索路径中。
如果1.9.0无法解决此问题,最简单的更改是放入export PYTHOnPATH=/home/airflow/airflow/:$PYTHONPATH
启动脚本。具体的格式取决于您使用的内容(systemd与init脚本等)。
2> viru..:
再次添加sys路径对我有用,
import sys
sys.path.insert(0,os.path.abspath(os.path.dirname(__file__)))