2017-10-18 328 views
0

我想安排一個Python作業,每15分鐘啓動一次。我已經使用過氣流,並沒有遇到過任何問題。 我用今天早些時候的開始日期創建了一個dag,頻率爲15分鐘,兩個任務包括激活一個虛擬Python環境,然後啓動一個python腳本。Airflow不會啓動我的dags(激活python環境並啓動python腳本)

但是,我的dags不會執行自己,所以我啓動了一個web服務器來檢查它的狀態並且什麼也沒有發生。因此,我嘗試使用trigger_dag命令在外部啓動它,但其狀態保持運行狀態。我真的不明白問題是什麼,任何幫助將不勝感激。我附上顯示問題的Airflow網絡服務器的兩個屏幕截圖。

enter image description here

enter image description here

編輯:添加dags.py文件,這裏是我的DAG的定義:

import os 
from airflow import DAG 
from datetime import datetime, timedelta 
from airflow.operators.bash_operator import BashOperator 


default_args = { 
    'owner': 'test', 
    'depends_on_past': False, 
    'start_date': datetime(2017,10,13,0,0,0,0), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=15), 
    # 'queue': 'bash_queue', 
    # 'pool': 'backfill', 
    # 'priority_weight': 10, 
    #'end_date': datetime(2017, 9, 23), 
} 

dag = DAG('dbscan_integ', default_args=default_args) 


t_dbscan = BashOperator(
    task_id='job_batch_dbscan', 
     bash_command='/home/test/Documents/git_repo/analyser/algo_integ/integration_dbscan/python main_algo.py', 
dag=dag) 


t_virtual_dbscan = BashOperator(
    task_id='virtual_dbscan', 
    bash_command='source activate integdb', 
    dag=dag) 
t_dbscan.set_upstream(t_virtual_dbscan) 
+0

如果您也從DAG分享相關的代碼片段,這將有所幫助。如果您錯過了在定義它們時將任務分配給DAG,就會發生這種情況。 – Him

+0

@Him我添加了我的dags的代碼,但是我認爲我的任務被正確地分配給任務,這要歸功於參數dag = dag –

回答

1

我知道這是愚蠢的,但你有調度和工人正常運轉?


編輯1:

好吧,我想原因是,你不必在你的DAG schedule_interval,而不是你給timedelta(分鐘= 15)retry_delay。

+0

這不是我注意到並修復它的schedule_interval。我不明白調度器必須單獨運行,我會在星期一進行測試。非常感謝你 –