2017-03-08 86 views
3

很多時候,我下載的文件在文件名中有一個日期。在Airflow中使用參數的示例?

csat_surveys_2017_03_05.csv 
03062017_roster.csv 

我的代碼單獨處理這個問題。

  • 比較在處理的文件清單,其中應該存在的預期日期的日期(根據切片的文件名)(某些日期範圍,直到當前日期)
  • 對於每一個我處理文件,添加文件名數據庫表和只處理還沒有被添加到表

我可以(也應該I)使用氣流計劃日期,以取代其編寫這個邏輯需要新的文件嗎?每天,我的任務都會按計劃進行。我把這個計劃的日期(可能減去1天),並將該值作爲參數傳遞,作爲要讀取的文件名的一部分(在pandas中)。如果是這樣,我可以請看一個我可以用作模板的清晰示例嗎?

這是一個更好的方法,並且如果文件丟失或延遲了幾天會覆蓋我(我希望任務失敗,然後繼續嘗試每一天,直到它成功或直到我注意到它並可以向客戶提出問題)?

回答

0

我會說是的,使用execution_date可能是最佳做法。

要訪問它,您需要一個模板字段。一些默認的運營商有那些已經,或您可能希望創建自己的操作,那麼這將是這個樣子:

在你DAG,你有任務爲:

my_task = MyOperator(
    task_id='t1', 
    filename='prefix_{{ ds }}_suffix') 

ds是氣流宏用於訪問execution_date參數作爲日期的字符串表示形式。

和你MyOperator會是什麼樣子:

class MyOperator(BaseOperator): 
    template_fields = ('filename') 

    def __init__(self, filename) 
     self.filename = filename 

    def execute(self, context): 
     download_file(self.filename) 
     do_other_stuff() 

你可以找到更多有關如何在宏部分參數化任務https://airflow.incubator.apache.org/code.html#macros