airflow

    1熱度

    1回答

    函數我想執行一個函數,我從任務傳遞一個參數。 這裏是我的功能與狀態參數: def sns_notify(state): client = boto3.client('sns') if state == "failed": message = config.get('sns', 'message') + state else: message =

    1熱度

    1回答

    我試圖通過傳遞一個不起作用的Bash行(thisshouldnotrun)來故意排除故障並排除錯誤。氣流正在輸出以下內容: [2017-06-15 17:44:17,869] {bash_operator.py:94} INFO - /tmp/airflowtmpLFTMX7/run_bashm2MEsS: line 7: thisshouldnotrun: command not found

    0熱度

    1回答

    我們有許多DAG計劃每天使用氣流運行。依賴關係已使用ExternalTask​​Sensor,TriggerDagRunOperator和運營商定製啓用 樣品: 任務1在DAG中的依賴於任務2在DAG乙 任務3中DAG甲取決於任務4在DAGÇ 任務5在DAG甲在DAG依賴於任務6 d ... 任務2在DAG B是依賴於任務7在DAGë 任務4在DAG B在DAG依賴於任務8 F .. 在檢查UI中

    0熱度

    2回答

    如何配置Airflow,以便DAG中的任何故障將(立即)導致鬆弛消息? 此時此刻我通過創建一個slack_failed_task對其進行管理: slack_failed_task = SlackAPIPostOperator( task_id='slack_failed', channel="#datalabs", trigger_rule='one_failed',

    2熱度

    1回答

    我的想法是有一個任務foo,它生成輸入列表(用戶,報告,日誌文件等),併爲輸入列表中的每個元素啓動任務。目標是利用Airflow的重試和其他邏輯,而不是重新實現它。 所以,理想情況下,我應該DAG看起來是這樣的: 這裏唯一的變量是生成的任務數。在完成所有這些任務之後,我想做更多的任務,因此爲每項任務啓動新的DAG似乎並不合適。 這是我的代碼: default_args = { 'owne

    2熱度

    1回答

    我有三個運營商的一個簡單的DAG。第一個是PythonOperator與我們自己的功能,另外兩個是標準的運營商從airflow.contrib(FileToGoogleCloudStorageOperator和GoogleCloudStorageToBigQueryOperator要準確)。他們按順序工作。我們的自定義任務會生成許多文件,通常在2到5之間,具體取決於參數。所有這些文件都必須由後續任

    0熱度

    1回答

    我不明白我需要運行哪些命令才能獲得DAG預定。假設我使用airflow test dag_name task_id_1 2017-06-22測試了DAG,第二項任務使用了airflow test dag_name task_id_2 2017-06-22。 我跑airflow trigger_dag dag_name,但那是爲了實例化DAG恰好那一刻嗎? 比方說,我想dag_name的定時/調度的

    2熱度

    2回答

    我想在不與Airflow GUI交互的情況下創建S3連接。有沒有可能通過airflow.cfg或命令行? 我們正在使用AWS的作用,下面的連接參數爲我們工作: { 「aws_account_id」: 「XXXX」, 「role_arn」: 「YYYYY」} 所以,手動創建的GUI爲S3連接工作,現在我們希望自動執行此流程,並希望將其添加爲Airflow部署流程的一部分。任何工作?

    1熱度

    1回答

    我是新來的氣流和意外啓動的守護程序模式下的氣流調度程序。現在,我想殺死調度器並可能重新啓動它。我試着做 sudo kill -9 <list of pids> pkill <name> 什麼都沒發生。當我運行 ps aux | grep 'airflow scheduler' 我看到這些項: user1 2907 6.0 1.0 329788 62996 ? Sl 17:37

    3熱度

    1回答

    我想用docker和rabbitMQ來建立我的氣流。我正在使用rabbitmq:3管理映像。我可以訪問rabbitMQ UI和API。 在氣流中我建立氣流webserver,氣流調度程序,氣流工作者和氣流花。 Airflow.cfg文件用於配置氣流。 當我使用broker_url = amqp://user:[email protected]:5672/和celery_result_backend