2017-07-14 89 views
0

我們有一個直接與DAG API(DagBag()get_dag()然後dag_run()如何將參數提交給Airflow中的Python程序?

的DAG運行正常,問題是,我們能不能找到一種方法與執行這樣的DAG執行的DAG前端服務器具體參數。

最接近的解決方案是使用變量API,它使用set()get()方法,但這些變量是全局變量,並且在使用相同變量名稱的併發操作中工作時可能會發生衝突。

我們如何運行一個dag並設置可用的參數來執行它?我們主要使用PythonOperator。

編輯1: 我們的程序是一個Python Django前端服務器。所以,我們正在通過另一個Python程序與Airflow進行交流。這意味着我們通過Python觸發dags,因此,使用DagBag.get_dag()從airflow服務檢索信息。 run_dag()沒有辦法通過直接的參數雖然

回答

0

如果使用trigger_dag_run(通過命令行或從另一DAG)觸發DAG,你可以通過任何JSON作爲有效載荷。

另一種選擇是將參數列表存儲在文件中,並將該文件的位置存儲爲變量。然後DAG可以將該文件位置傳遞給python運算符,然後運算符可以處理讀取該文件並從中解析參數。

如果這兩個解決方案對您的用例不起作用,提供有關您的dag和參數類型的更多詳細信息可能會有所幫助。

+0

我編輯過這篇文章,我們使用了django前端,因此觸發器是由另一個python構成的。理想情況下,我們想傳遞Python參數(字典,列表等)。我們如何使用trigger_dag_run並傳遞一個有效載荷?如果它是一個JSON有效載荷,它意味着它可以很容易地成爲一個Python字典。這已經足夠了。對於第二種選擇,我們如何傳遞給dag文件路徑?如果其他用戶使用不同的參數觸發相同的DAG,則它必須工作。 – Saif

+0

對於您的用例,您是否查看了airflow中的json_client.py文件(它提供了實驗休息api)。你可以在POST請求中提供'conf'參數,它基本上是任何json對象。如果這能解決您的問題,我也會將其添加到答案中。 https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/api/client/json_client.py – Him

相關問題