2017-07-27 68 views
0

第三方向我發送日誌文件的每日上載到S3存儲桶。我試圖使用DataPipeline將它們轉換成與awk略有不同的格式,將新文件放回到S3上,然後將原始文件放在一邊,這樣我就不會再明天處理相同的文件了。AWS Datapipeline完成後刪除已處理的源文件

有沒有乾淨的方式做到這一點?目前我的shell命令看起來類似:

#!/usr/bin/env bash 
set -eu -o pipefail 

aws s3 cp s3://example/processor/transform.awk /tmp/transform.awk 

for f in "${INPUT1_STAGING_DIR}"/*; do 
    basename=${f//+(*\/|.*)} 
    unzip -p "$f" | awk -f /tmp/transform.awk | gzip > ${OUTPUT1_STAGING_DIR}/$basename.tsv.gz 
done 

我可以使用AWS CLI工具,一邊移動源文件在每次循環,但似乎古怪 - 如果我的循環中途加工模具,這些早期文件將會丟失。

回答

0

一些可能的解決方案:

  1. 你的S3存儲創建觸發器..每當任何對象增加桶 - >調用它可以是一個Python腳本執行轉換lambda函數 - >拷貝回到另一個桶。現在,在另一個桶上再次調用lambda函數,從第一個桶中刪除文件。

  2. 我個人覺得;你所取得的成果已經足夠好了。所有你需要的是shell腳本中的異常處理,並且只有當輸出文件被成功創建時纔會刪除文件(絕不會丟失數據)(可能你也可以檢查輸出文件的大小)

+0

對於(2),我擔心我依靠OUTPUT1_STAGING_DIR在運行結束時將文件同步回S3,所以如果我的管道在半途中途中斷,可能會丟失它們。或者OUTPUT1_STAGING_DIR是一個實際的S3掛載,並且在那裏寫入的任何內容在目標S3存儲桶中同步可用? –