airflow

    0熱度

    1回答

    我有如下推動XCOM值自定義操作: ... task_instance = context['task_instance'] task_instance.xcom_push("list_of_files",file_list) ... 它工作正常。我有一個dag定義文件(my_dag.py),我使用自己的操作符創建了一個任務,它使用此xcom值推送XCOM值,然後我想在循環中執行此操作。

    0熱度

    1回答

    我正在運行此DAG。它從dash_workers.py導入函數(不包括,但是 - 這會有幫助嗎?)並實現這些函數作爲PythonOperator定義的任務。 我使用氣流版本1.8.0: from datetime import datetime, timedelta import os import sys import airflow.models as af_models from

    1熱度

    1回答

    我試圖使用Dask和Airflow實現數據管道。我希望能夠將節點添加到現有的DAG中,類似於NodeJS中的中間件。我的想法是醃製Dataframe,以便下一步可以在醃製之前拿起並應用任何轉換並轉到下一步。但通過Dask並行處理,下一個節點可以分配給任何工作人員。 我在想本地調度員和一些工人。當我有一份大工作時,我可以啓動一些EC2工作人員來處理這項工作。 有什麼建議嗎?

    2熱度

    2回答

    我最近安裝的Apache氣流1.8.1,我執行以下命令: airflow initdb 其返回以下錯誤: Traceback (most recent call last): File "/usr/bin/airflow", line 18, in <module> from airflow.bin.cli import CLIFactory File "/usr/l

    2熱度

    1回答

    我有使用芹菜和redis的氣流集羣。我有一個任務,我想要在所有工作人員上運行。我怎樣才能做到這一點 ? 原因:我們根據工人負荷增加和減少氣流工作人員。我必須在所有節點上運行一個任務。 感謝您的幫助。

    1熱度

    1回答

    我在氣流中製作了以下DAG,並執行一組EMRSteps來運行我的管道。 default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2017, 07, 20, 10, 00), 'email': ['[email protected]'],

    0熱度

    1回答

    我試圖安排每天以3小時爲間隔動態運行的作業,從13:45開始到14:30結束。 我的代碼如下: Test = datetime.now() current_date = datetime.now() default_args = { 'owner': 'abc', 'depends_on_past': False, 'start_date': datetime(Test.yea

    1熱度

    3回答

    我們使用airflow作爲調度程序。我想在DAG中調用一個簡單的bash操作符。 bash腳本需要密碼作爲參數才能做進一步處理。 如何在airflow(config/variables/connection)中安全地存儲密碼並在dag定義文件中進行訪問。 我是新來的氣流和Python,所以代碼片段將不勝感激。

    1熱度

    1回答

    我有一個使用celery executor在不同工作節點上運行的任務列表的dag。不過,我想運行主節點上的其中一個任務。那可能嗎?

    3熱度

    1回答

    有沒有什麼辦法可以在不進行多任務的情況下依次運行回填?例如,如果我使用多個日期運行回填,例如 氣流回填[dag] -s「2017-07-01」-e「2017-07-10」,有什麼方法可以在跑到下一個之前完成每個DAG天?現在,它正在完成每項任務的所有日子,然後再進行下一項任務。 謝謝。