作者:journeylis-1998_246 | 来源:互联网 | 2023-01-28 13:05
请考虑以下DAG示例,其中第一个任务get_id_creds
是从数据库中提取凭据列表.此操作告诉我数据库中的哪些用户可以运行进一步的数据预处理,并将这些ID写入文件/tmp/ids.txt
.然后,我将这些ID扫描到我的DAG中,并使用它们生成upload_transaction
可以并行运行的任务列表.
我的问题是:使用气流是否有更正确,更动态的方式来做到这一点?我在这里感觉笨拙和脆弱.如何直接将有效ID列表从一个进程传递到定义后续下游进程?
from datetime import datetime, timedelta
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime.now(),
'schedule_interval': None
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
get_id_creds = PythonOperator(
task_id='get_id_creds',
python_callable=dash_workers.get_id_creds,
provide_cOntext=True,
dag=DAG)
with open('/tmp/ids.txt', 'r') as infile:
ids = infile.read().splitlines()
for uid in uids:
upload_transactiOns= PythonOperator(
task_id=uid,
python_callable=dash_workers.upload_transactions,
op_args=[uid],
dag=DAG)
upload_transactions.set_downstream(get_id_creds)
Aaron..
5
根据@Juan Riza的建议,我检查了以下链接:在Airflow中创建动态工作流的正确方法。尽管我能够简化解决方案,以至于我认为自己可以在此处提供自己的实现的修改版本,但这几乎是答案:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
ENV = os.environ
default_args = {
# 'start_date': datetime.now(),
'start_date': datetime(2017, 7, 18)
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
clear_tables = PythonOperator(
task_id='clear_tables',
python_callable=dash_workers.clear_db,
dag=DAG)
def id_worker(uid):
return PythonOperator(
task_id=uid,
python_callable=dash_workers.main_preprocess,
op_args=[uid],
dag=DAG)
for uid in capone_dash_workers.get_id_creds():
clear_tables >> id_worker(uid)
clear_tables
清理将由于该过程而重建的数据库。id_worker
是一项函数,该函数根据从返回的ID值数组动态生成新的预处理任务get_if_creds
。任务ID只是相应的用户ID,尽管它很容易成为索引i
,如上面提到的示例所示。
注意该位移位运算符(<<
)在我看来是后向的,因为该clear_tables
任务应该排在第一位,但是在这种情况下它似乎可以正常工作。
1> Aaron..:
根据@Juan Riza的建议,我检查了以下链接:在Airflow中创建动态工作流的正确方法。尽管我能够简化解决方案,以至于我认为自己可以在此处提供自己的实现的修改版本,但这几乎是答案:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
ENV = os.environ
default_args = {
# 'start_date': datetime.now(),
'start_date': datetime(2017, 7, 18)
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
clear_tables = PythonOperator(
task_id='clear_tables',
python_callable=dash_workers.clear_db,
dag=DAG)
def id_worker(uid):
return PythonOperator(
task_id=uid,
python_callable=dash_workers.main_preprocess,
op_args=[uid],
dag=DAG)
for uid in capone_dash_workers.get_id_creds():
clear_tables >> id_worker(uid)
clear_tables
清理将由于该过程而重建的数据库。id_worker
是一项函数,该函数根据从返回的ID值数组动态生成新的预处理任务get_if_creds
。任务ID只是相应的用户ID,尽管它很容易成为索引i
,如上面提到的示例所示。
注意该位移位运算符(<<
)在我看来是后向的,因为该clear_tables
任务应该排在第一位,但是在这种情况下它似乎可以正常工作。