2017-04-13 120 views
0

我正在使用apache airflow 1.8.0apache airflow scheduler not scheduling jobs

這裏輸出的是當我backfill的工作。

[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00  [scheduled]> 
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]> 
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]> 
[2017-04-13 09:42:55,864] {models.py:1120} INFO - Dependencies not met for <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 3 non-success(es). upstream_tasks_state={'skipped': Decimal('0'), 'successes': Decimal('0'), 'done': 0, 'upstream_failed': Decimal('0'), 'failed': Decimal('0')}, upstream_task_ids=['runme_0', 'runme_1', 'runme_2'] 

,當我嘗試安排它拋出錯誤的任何DAG

Traceback (most recent call last): 
    File "/anaconda3/bin/airflow", line 28, in <module> 
    args.func(args) 
    File "/anaconda3/lib/python3.5/site-packages/airflow/bin/cli.py", line 167, in backfill 
    pool=args.pool) 
    File "/anaconda3/lib/python3.5/site-packages/airflow/models.py", line 3330, in run 
    job.run() 
    File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 200, in run 
    self._execute() 
    File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 2021, in _execute 
    raise AirflowException(err) 
airflow.exceptions.AirflowException: --------------------------------------------------- 

這是關於任務的輸出。

BackfillJob is deadlocked. These tasks have succeeded: 
set() 
These tasks have started: 
{} 
These tasks have failed: 
set() 
These tasks are skipped: 
set() 
These tasks are deadlocked: 
{<TaskInstance: example_bash_operator.runme_0 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:44:00 [scheduled]>} 

蟒2.7蟒3.5

使用SequentialExecutorLocalExecutor

PS測試。如果我在當前時間回填DAG,它會執行一次,然後針對所有計劃的任務拋出上述錯誤。

回答

0

您的氣流實例處於死鎖狀態。失敗的任務不允許將來執行任務。

氣流推出每項任務在運行一個新的過程中的每個DAG和任務的時候上不去,這是不處理死鎖的情況出現

要解決此問題,您可以執行下列操作之一:

  1. use **airflow clear** <<dag_id>>這將解決死鎖並允許未來DAG /任務的運行
  2. 如果以上不能解決問題,則需要use airflow resetdb這將清除氣流數據庫,從而解決問題

今後,

  • 嘗試使用execution_timeout=timedelta(minutes=2)設置一些超時,使你有運營商明確控制
  • 另外,確實提供了on_failure_callback=handle_failure這將乾淨存在操作上的失敗

希望這會有所幫助,

乾杯!