2017-07-20 258 views
0

我有如下推動XCOM值自定義操作:Apache Airflow如何將xcom_pull()值轉換爲DAG?

... 
task_instance = context['task_instance'] 
task_instance.xcom_push("list_of_files",file_list) 
... 

它工作正常。我有一個dag定義文件(my_dag.py),我使用自己的操作符創建了一個任務,它使用此xcom值推送XCOM值,然後我想在循環中執行此操作。如何拉它?

回答

0

您不能在您的dag中訪問XCOM變量,它僅在運營商中通過向運算符構造函數提供參數provide_context=True而可用。

如果您想在DAG結構本身中使用來自操作員的數據,則需要執行操作員正在執行的實際任務。

def get_file_list(): 
    hook = SomeHook() 
    hook.run('something to get file list') 

dag = DAG('tutorial', default_args=default_args) 

for file in get_file_list(): 
    task = SomeOperator(params={'file': file}) # Do something with the file passed as a parameter