2016-11-17 148 views
28

我已經啓動了Airflow網絡服務器並安排了一些dag。我可以在Web GUI上看到dag。氣流:如何刪除DAG?

如何從運行中刪除特定的DAG並在Web GUI中顯示?有沒有一個Airflow CLI命令來做到這一點?

我環顧四周,但無法找到一個簡單的方法來刪除DAG,一旦它已被加載和計劃的答案。

+0

沒有CLI這個刪除DAG文件。但是,如果您想要嘗試恢復它,那麼就會放棄pull請求:https://github.com/apache/incubator-airflow/pull/1344 – TheF1rstPancake

回答

2

Airflow沒有內置的功能可以爲您做到這一點。爲了刪除DAG,請將其從存儲庫中刪除,並刪除Airflow Metastore表中的數據庫條目 - dag。

+0

我還必須重新啓動計劃和網絡服務器所在的機器運行完成清理。簡單地重新啓動Web服務器和調度程序是不夠的。 –

7

我剛剛寫了一個腳本,刪除與特定dag相關的所有內容,但這僅適用於MySQL。如果您使用PostgreSQL,則可以編寫不同的連接器方法。最初由蘭斯發佈的命令是https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 我只是把它放在腳本中。希望這可以幫助。格式:蟒蛇script.py dag_id

import sys 
import MySQLdb 

dag_input = sys.argv[1] 

query = {'delete from xcom where dag_id = "' + dag_input + '"', 
     'delete from task_instance where dag_id = "' + dag_input + '"', 
     'delete from sla_miss where dag_id = "' + dag_input + '"', 
     'delete from log where dag_id = "' + dag_input + '"', 
     'delete from job where dag_id = "' + dag_input + '"', 
     'delete from dag_run where dag_id = "' + dag_input + '"', 
     'delete from dag where dag_id = "' + dag_input + '"' } 

def connect(query): 
     db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database") 
     cur = db.cursor() 
     cur.execute(query) 
     db.commit() 
     db.close() 
     return 

for value in query: 
     print value 
     connect(value) 
10

不知道爲什麼Apache的氣流沒有明顯的和簡單的方法來刪除DAG

提起https://issues.apache.org/jira/browse/AIRFLOW-1002

+2

這是PR公開,但尚未合併。對於那些感興趣的鏈接 - https://github.com/apache/incubator-airflow/pull/2199。 –

14

這是使用PostgresHook我適應代碼默認的connection_id。

import sys 
from airflow.hooks.postgres_hook import PostgresHook 

dag_input = sys.argv[1] 
hook=PostgresHook(postgres_conn_id= "airflow_db") 

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]: 
    sql="delete from {} where dag_id='{}'".format(t, dag_input) 
    hook.run(sql, True) 
+2

我認爲你也可以將'task_fail'和'dag_stats'添加到表 – marengaz

4

我已經編寫了一個腳本,用於刪除與默認SQLite數據庫相關的特定dag的所有元數據。這是基於耶穌的回答,但是從Postgres改編爲SQLite。用戶應將../airflow.db設置爲相對於默認airflow.db文件(通常爲~/airflow)存儲script.py的任何位置。要執行,請使用python script.py dag_id

import sqlite3 
import sys 

conn = sqlite3.connect('../airflow.db') 
c = conn.cursor() 

dag_input = sys.argv[1] 

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]: 
    query = "delete from {} where dag_id='{}'".format(t, dag_input) 
    c.execute(query) 

conn.commit() 
conn.close() 
+0

這個表的列表中,這是一個很好的解決方案,至少在PR合併之前是這樣的 –

1

您可以清除一組任務實例,就好像他們從來沒有跑:

airflow clear dag_id -s 2017-1-23 -e 2017-8-31 

然後從DAG的文件夾

+1

這可能會導致'dag'表中有一些未清理的數據 – Chengzhi