1

如何在多個類別上並行運行/實例化Dag(氣流)?在Python中並行實時調度DAG

例如: 我有一個氣流(DAG),我定期 上運行如何我可以調度DAG在不同Batchnames上並行基礎上運行(在parallet):

  1. 運行batch1的dag(在args中傳遞批名稱)
  2. 運行batch2的dag(傳遞args中的批名稱)應與 平行運行。 。 。

等等

我使用環境varibale傳遞Batchnames,然後使用多個服務器上TMUX會話跑並行DAG但它搞砸了。

有沒有更好的方法,我可以使用,並與我可以節省時間並行運行多個batchnames DAG?

謝謝你的時間。

回答

0

由於airflow運行代表bash-shell命令圖形的python類,因此可以通過創建兩個獨立的DAG在氣流中執行此操作。這裏有一個輕微修改的tutorial

dag = DAG(dag_id='batch') 
task = [ BashOperator(
      task_id='templated', 
      bash_command=templated_command, 
      params={'batch_name': batch_name}, 
      dag=dag) 
     for batch_name in ["batch one", "batch two"]] 

dag.add_task(task[0]) 
dag.add_task(task[1]) 

由於沒有依賴性,他們應該同時只要氣流已經設置了這種方式運行。如果您需要設置shell環境變量,請在模板中的某處添加VAR = {{params.batch_name}}。

假設你的程序使用sys.argv中,你也可以使用正常作業控制推出:

python ~/airflow/dags/tutorial.py "batch one" & 
python ~/airflow/dags/tutorial.py "batch two" & 
wait 
相關問題