2017-07-28 95 views
0

與此問題相關的原始代碼可以在here找到。在Airflow中生成多個任務時反向上游/下游關係

我對兩個bitshift運算符都感到困惑,並且set_upstream/set_downstream方法在我在DAG中定義的任務循環內工作。當DAG的主執行迴路被配置如下:

for uid in dash_workers.get_id_creds(): 
    clear_tables.set_downstream(id_worker(uid)) 

for uid in dash_workers.get_id_creds(): 
    clear_tables >> id_worker(uid) 

圖表看起來像這樣(字母數字序列是用戶ID,這也定義了任務ID ):

enter image description here

當我配置了DAG的主執行循環是這樣的:

for uid in dash_workers.get_id_creds(): 
    clear_tables.set_upstream(id_worker(uid)) 

for uid in dash_workers.get_id_creds(): 
    id_worker(uid) >> clear_tables 

圖如下所示:

enter image description here

第二個圖是我希望/我本來期望的代碼的前兩個片段有什麼根據我閱讀的文檔生成。如果我想clear_tables到觸發我批的數據分析任務,針對不同的用戶ID之前先執行我應該指出這是clear_tables >> id_worker(uid)

編輯 - 這裏是完整的代碼,因爲我張貼的最後幾個問題已更新,以供參考:

from datetime import datetime 
import os 
import sys 

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 

import ds_dependencies 

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH') 
if SCRIPT_PATH: 
    sys.path.insert(0, SCRIPT_PATH) 
    import dash_workers 
else: 
    print('Define DASH_PREPROC_PATH value in environmental variables') 
    sys.exit(1) 

ENV = os.environ 

default_args = { 
    'start_date': datetime.now(), 
} 

DAG = DAG(
    dag_id='dash_preproc', 
    default_args=default_args 
) 

clear_tables = PythonOperator(
    task_id='clear_tables', 
    python_callable=dash_workers.clear_db, 
    dag=DAG) 

def id_worker(uid): 
    return PythonOperator(
     task_id=id, 
     python_callable=dash_workers.main_preprocess, 
     op_args=[uid], 
     dag=DAG) 

for uid in dash_workers.get_id_creds(): 
    preproc_task = id_worker(uid) 
    clear_tables << preproc_task 

實施@ LadislavIndra的建議後,我繼續有同樣的逆轉實現位位移操作,以獲得正確的依賴關係圖。

UPDATE @ AshBerlin-Taylor的回答是這裏發生了什麼。我認爲圖形視圖和樹視圖是做同樣的事情,但他們不是。下面是id_worker(uid) >> clear_tables看起來像圖形視圖:

enter image description here

我當然不希望在我的數據的最後一步前的準備程序將刪除所有數據表!

回答

3

樹狀視圖是「倒退」怎麼你(和我!)它的第一個念頭。在您的第一個屏幕截圖中,它顯示必須在「AAAG5608078M2」運行任務之前運行「clear_tables」。並且DAG狀態取決於每個id工作者任務。因此,而不是任務訂單,它是狀態鏈的一棵樹。如果這是有道理的。

(這聽起來有點怪,但它是因爲DAG可以分支出來,在分支回來。)

你可能有更好的運氣在尋找你的DAG中的圖形視圖。這個有箭頭並以更直觀的方式顯示執行順序。 (雖然我現在發現樹視圖很有用,但從開始就不太清楚)

+0

@ AshBerli-Taylor - 釘上它!使用圖形視圖的屏幕截圖發佈更新。 – Aaron

1

翻看你的其他代碼,看起來get_id_creds是你的任務,你試圖循​​環它,這是創造一些奇怪的互動。

,將工作模式是:在氣流

clear_tables = MyOperator() 

for uid in uid_list: 
    my_task = MyOperator(task_id=uid) 
    clear_tables >> my_task 
+0

謝謝@LadislavIndra,但仍然無法正常工作。我要用完整的代碼更新這個問題。 – Aaron