2017-07-26 104 views
0

我們最近試圖採用Airflow作爲我們的「數據工作流程」引擎,儘管我已經將大部分內容弄清楚了,但我仍然處於調度程序如何計算何時觸發DAG的灰色地帶。氣流DAG觸發

在這個簡單的DAG請看:

from airflow import DAG 
from datetime import datetime 
from airflow.operators.bash_operator import BashOperator 

dag_options = {     
      'owner':    'Airflow', 
      'depends_on_past':  False,  
      'start_date':   datetime.now() 
} 

with DAG('test_dag1', schedule_interval="5 * * * *", default_args=dag_options) as dag: 
       task1 = BashOperator(  
       task_id='task1', 
       bash_command='date',     
       dag=dag)  

時間表會挑這個,但不會執行它。現在,如果我將「start_date」更改爲:

​​

其中xxxx,yyyy,zzzz是今天的日期,它將開始執行。原因是調度程序不斷從源dag文件夾重新讀取此dag文件,每次執行datetime.now(),注意到開始日期與當前排隊不同,重新添加此dag並因此重新調度/推動執行日期前進(我的dag_dir_list_interval是300)?

  • 第一DAG執行:

    而且,在氣流,我理解,當一個DAG是取消暫停(或者與dags_are_paused_at_creation加入=假),則調度器將如下安排執行即時後(起始+間隔)

  • 第二DAG執行:時刻之後(開始+(間隔* 2))
  • 第三DAG執行:時刻之後(開始+(間隔* 3))

這是正確的假設嗎?

更新(2017年7月30日)

基於這樣的假設之上,我創造了這個DAG今日(2017年7月30日):

from airflow import DAG 
from datetime import datetime 
from airflow.operators.bash_operator import BashOperator 

dag_options = {     
      'owner':    'Airflow', 
      'depends_on_past': False,  
      'start_date': 
datetime(year=2017,month=7,day=30,hour=20,minute=10) 
} 

with DAG('test_dag_100', schedule_interval="*/10 * * * *", 
default_args=dag_options) as dag: 
       task1 = BashOperator(  
       task_id='task_100', 
       bash_command='date',     
       dag=dag)  

應該開始(UTC ):

  • 2017年7月30日20時20分00秒
  • 2017年7月30日二十點30分00秒
  • 2017/7/30 20:40:00

不幸的是,這沒有發生。 這裏是我的儀表板的一些屏幕截圖:

有人能解釋爲什麼二十點21分00秒達格沒有執行? 20:31:00之後它仍然沒有執行......我在這裏錯過了什麼?順便說一句,我還注意到,出於某種原因,我每次去 手動通過儀表板啓動一個DAG,它只是在「運行」階段。爲什麼是這樣?手動啓動它與任何啓動時間選項(start_date/interval/etc)有什麼關係?

謝謝您提供的任何說明

+0

計劃間隔是crontab中,你可以嘗試https://crontab.guru/測試對你意味着什麼區間。如果使用的是1.8 +,datetime.now()不被認爲是最好的做法,你可以在這裏找到https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#less-forgiving-詳細調度程序上動態起始日期 – Chengzhi

回答

1

您的假設是正確的。從開始日期起,指定的時間間隔已過後,氣流將安排第一次DAG運行。使用datetime.now()作爲開始日期將很少導致Airflow觸發DAG。它在計劃文檔中提到。

如果你指定一個特定的開始日期,如日期時間(2017,7,27,1,0)以「5 * * * *」程序間隔,然後,時間是1:05上午7/27 DAG將首次觸發運行。之後每隔五分鐘就會繼續運行。