我們有一個直接與DAG API(DagBag()
,get_dag()
然後dag_run()
)如何將參數提交給Airflow中的Python程序?
的DAG運行正常,問題是,我們能不能找到一種方法與執行這樣的DAG執行的DAG前端服務器具體參數。
最接近的解決方案是使用變量API,它使用set()
和get()
方法,但這些變量是全局變量,並且在使用相同變量名稱的併發操作中工作時可能會發生衝突。
我們如何運行一個dag並設置可用的參數來執行它?我們主要使用PythonOperator。
編輯1: 我們的程序是一個Python Django前端服務器。所以,我們正在通過另一個Python程序與Airflow進行交流。這意味着我們通過Python觸發dags,因此,使用DagBag.get_dag()
從airflow服務檢索信息。 run_dag()
沒有辦法通過直接的參數雖然
我編輯過這篇文章,我們使用了django前端,因此觸發器是由另一個python構成的。理想情況下,我們想傳遞Python參數(字典,列表等)。我們如何使用trigger_dag_run並傳遞一個有效載荷?如果它是一個JSON有效載荷,它意味着它可以很容易地成爲一個Python字典。這已經足夠了。對於第二種選擇,我們如何傳遞給dag文件路徑?如果其他用戶使用不同的參數觸發相同的DAG,則它必須工作。 – Saif
對於您的用例,您是否查看了airflow中的json_client.py文件(它提供了實驗休息api)。你可以在POST請求中提供'conf'參數,它基本上是任何json對象。如果這能解決您的問題,我也會將其添加到答案中。 https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/api/client/json_client.py – Him