2017-07-09 27 views
-1

我有一個相對簡單的任務,首先在1.2 mio文件上運行,併爲每個文件(每個都保存中間產品的多個步驟)都有一個管道。我已經在luigi中實現了這個功能:https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f。我喜歡Luigi使用文件系統來查看任務是否完成。 我還發現了一個可以刪除中間產品的實現,管道將重新創建所有相關產品(這樣我就可以更改管道)。 我該如何在氣流中做到這一點(或者我應該堅持Luigi?)?從luigi切換到氣流

回答

1

我真的不知道路易吉是如何工作的。我主要使用Apache Airflow。 Airflow是一個工作流程管理系統。這意味着它不會傳輸數據,轉換數據或生成一些數據(雖然它會生成日誌,並且有一個名爲Xcom的概念,允許在任務之間交換消息,從而允許更多細微的控制形式和共享狀態。 Apache Nifi。但是它定義了使用Operators實例化每個任務的依賴關係,例如。 BashOperator。爲了知道任務是否完成,它會檢查同一任務返回的信號。

以下是您想要在Airflow中實施的示例。

要在氣流使用
from airflow.operators.bash_operator import BashOperator 
from airflow.operators.python_operator import PythonOperator 
import glob 
import gzip 
import shutil 

args = { 
    'owner': 'airflow', 
    'start_date': airflow.utils.dates.days_ago(2) 
} 

dag = DAG(
    dag_id='example_dag', default_args=args, 
    schedule_interval='0 0 * * *', 
    dagrun_timeout=timedelta(minutes=60)) 


def extract_gzs(): 
    for filename in glob.glob('/1002/*.gz') 
     with gzip.open(filename, 'rb') as f_in, open(filename[:-3], 'wb') as f_out: 
      shutil.copyfileobj(f_in, f_out) 


extractGZ = PythonOperator(
    task_id='extract_gz', 
    provide_context=True, 
    python_callable=extract_gzs(), 
dag=dag) 


cmd_cmd=""" 
your sed script! 
""" 

sed_script = BashOperator(
    task_id='sed_script', 
    bash_command=cmd_cmd, 
    dag=dag) 


extractGZ.set_downstream(sed_script) 
  1. 進口經營者(當然,如果你需要其他類/庫)
  2. 定義你的達格。這裏在變量args中我定義了ownerstart_date參數。
  3. 然後實例化您的DAG。在這裏,我把它命名爲example_dag,歸功於它的定義變量,schedule_interval和之後的時間應該是超時(有更多的根據自己的需要使用參數)
  4. 創建一個Python函數extract_gzs()
  5. 實例化一個PythonOperator哪裏我打電話給我的蟒蛇FUNC
  6. 做同樣與bash的代碼
  7. 確定兩個任務之間的依賴關係intances

當然有更多的實施同樣的想法的方式。根據需要來適應! PS:Here有一些Apache Airflow的例子

+0

也許,我誤解了整個管道的事情。我假設你建立了一條管道,然後你給它一個數據集,然後沿着這條管道進行變換。這意味着一個管道工作在1.2 mio文件上。這不是正確的思考方式嗎?做一個在文件上運行sed的氣流管道,然後將其應用到1.2 mio文件應該是微不足道的,不是嗎? –

+0

@WolfgangKerzendorf查看我的修改答案。 – sdikby

+0

謝謝,我會試試看。這仍然不完全如我所想的那樣。我想爲一個文件構建一個管道,然後以某種方式通過這個東西來推動每個文件。 –