2017-02-28 15 views
3

比方說,我把芹菜併發到ñ,但我有>ñ)ExternalTask​​Sensor DAG中,它將檢查名爲do_sth另一DAG,這些ExternalTask​​Sensor將消耗所有的芹菜工人,這樣事實上沒有人會工作。如果傳感器數量大於併發性,氣流芹菜工作人員將被阻塞?

但我不能設置併發性太高(如2 *),因爲DAG do_sth可以啓動太多的進程,這將導致內存不足。

我很困惑什麼數字我應該設置爲芹菜併發?

回答

3

ETL best practices with Airflow's Gotchas section作者解決這個一般問題。其中一個建議是爲傳感器任務設置一個池,以便其他任務不會餓死。針對您的情況,確定您希望一次運行的傳感器任務數(小於併發級別),並以此爲限制設置池。一旦你的池被設置,將池參數傳遞給你的每個傳感器操作員。 有關泳池的更多信息,請參閱Airflow's documentation on concepts。以下是將一個池參數傳遞給運算符的示例:

aggregate_db_message_job = BashOperator( 
    task_id='aggregate_db_message_job', 
    execution_timeout=timedelta(hours=3), 
    pool='ep_data_pipeline_db_msg_agg', 
    bash_command=aggregate_db_message_job_cmd, dag=dag) 
+0

非常感謝。這就是我想找到的。 – MoreFreeze