2017-10-18 338 views
1

我正在嘗試使用ExternalTask​​Sensor並且卡在另一個DAG的任務中,該任務已成功完成。Airflow ExternalTask​​Sensor卡住

這裏,第一個DAG「a」完成其任務,然後通過ExternalTask​​Sensor第二個DAG「b」應該被觸發。相反,它被卡在a.first_task中。

首先DAG:

import datetime 
from airflow import DAG 
from airflow.operators.python_operator import PythonOperator 

dag = DAG(
    dag_id='a', 
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()}, 
    schedule_interval=None 
) 

def do_first_task(): 
    print('First task is done') 

PythonOperator(
    task_id='first_task', 
    python_callable=do_first_task, 
    dag=dag) 

二DAG:

import datetime 
from airflow import DAG 
from airflow.operators.python_operator import PythonOperator 
from airflow.operators.sensors import ExternalTaskSensor 

dag = DAG(
    dag_id='b', 
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()}, 
    schedule_interval=None 
) 

def do_second_task(): 
    print('Second task is done') 

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed', 
    external_dag_id='a', 
    external_task_id='first_task', 
    dag=dag) >> \ 
PythonOperator(
    task_id='second_task', 
    python_callable=do_second_task, 
    dag=dag) 

缺少什麼我在這裏?

回答

1

ExternalTaskSensor假定您依賴於具有相同執行日期的DAG運行中的任務。

這意味着在您的情況下,要求ab需要按照相同的計劃運行(例如每天早上9:00或w/e)。

否則在實例化ExternalTaskSensor時,您需要使用execution_deltaexecution_date_fn

這是運營商自身內部的文件,以幫助進一步澄清:

:param execution_delta: time difference with the previous execution to 
    look at, the default is the same execution_date as the current task. 
    For yesterday, use [positive!] datetime.timedelta(days=1). Either 
    execution_delta or execution_date_fn can be passed to 
    ExternalTaskSensor, but not both. 

:type execution_delta: datetime.timedelta 


:param execution_date_fn: function that receives the current execution date 
    and returns the desired execution date to query. Either execution_delta 
    or execution_date_fn can be passed to ExternalTaskSensor, but not both. 

:type execution_date_fn: callable