與此問題相關的原始代碼可以在here找到。在Airflow中生成多個任務時反向上游/下游關係
我對兩個bitshift運算符都感到困惑,並且set_upstream
/set_downstream
方法在我在DAG中定義的任務循環內工作。當DAG的主執行迴路被配置如下:
for uid in dash_workers.get_id_creds():
clear_tables.set_downstream(id_worker(uid))
或
for uid in dash_workers.get_id_creds():
clear_tables >> id_worker(uid)
圖表看起來像這樣(字母數字序列是用戶ID,這也定義了任務ID ):
當我配置了DAG的主執行循環是這樣的:
for uid in dash_workers.get_id_creds():
clear_tables.set_upstream(id_worker(uid))
或
for uid in dash_workers.get_id_creds():
id_worker(uid) >> clear_tables
圖如下所示:
第二個圖是我希望/我本來期望的代碼的前兩個片段有什麼根據我閱讀的文檔生成。如果我想clear_tables
到觸發我批的數據分析任務,針對不同的用戶ID之前先執行我應該指出這是clear_tables >> id_worker(uid)
編輯 - 這裏是完整的代碼,因爲我張貼的最後幾個問題已更新,以供參考:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
ENV = os.environ
default_args = {
'start_date': datetime.now(),
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
clear_tables = PythonOperator(
task_id='clear_tables',
python_callable=dash_workers.clear_db,
dag=DAG)
def id_worker(uid):
return PythonOperator(
task_id=id,
python_callable=dash_workers.main_preprocess,
op_args=[uid],
dag=DAG)
for uid in dash_workers.get_id_creds():
preproc_task = id_worker(uid)
clear_tables << preproc_task
實施@ LadislavIndra的建議後,我繼續有同樣的逆轉實現位位移操作,以獲得正確的依賴關係圖。
UPDATE @ AshBerlin-Taylor的回答是這裏發生了什麼。我認爲圖形視圖和樹視圖是做同樣的事情,但他們不是。下面是id_worker(uid) >> clear_tables
看起來像圖形視圖:
我當然不希望在我的數據的最後一步前的準備程序將刪除所有數據表!
@ AshBerli-Taylor - 釘上它!使用圖形視圖的屏幕截圖發佈更新。 – Aaron