我想在每天運行Hadoop作業時覆蓋/重用現有的輸出目錄。 實際上,輸出目錄將存儲每天作業運行結果的彙總輸出。 如果我指定相同的輸出目錄,它會給出錯誤「輸出目錄已存在」。如何覆蓋/重新使用現有的Hadoop作業輸出路徑和Agian
如何繞過此驗證?
我想在每天運行Hadoop作業時覆蓋/重用現有的輸出目錄。 實際上,輸出目錄將存儲每天作業運行結果的彙總輸出。 如果我指定相同的輸出目錄,它會給出錯誤「輸出目錄已存在」。如何覆蓋/重新使用現有的Hadoop作業輸出路徑和Agian
如何繞過此驗證?
如何在運行作業之前刪除目錄?
您可以通過外殼做到這一點:
hadoop fs -rmr /path/to/your/output/
或通過Java API:
// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);
Jungblut的答案是你的直接解決方案。由於我從不信任自動化流程來刪除東西(我個人),所以我會提出一個替代方案:
而不是嘗試覆蓋,我建議您將作業的輸出名稱設置爲動態,包括其運行時間。
類似於「/path/to/your/output-2011-10-09-23-04/
」。這樣你就可以保持周圍的舊作業輸出的情況下,你需要在重新審視在我的系統,它運行日常工作10+,我們結構輸出是:/output/job1/2011/10/09/job1out/part-r-xxxxx
,/output/job1/2011/10/10/job1out/part-r-xxxxx
等
+1這是數據保持方法。但要確保有一個垃圾收集守護進程收集所有過時的目錄。否則你的HDFS將溢出;))。 –
確實!我們通常在一週後刪除東西。 –
感謝您的回覆。我唯一的問題是 - 我不想刪除現有的輸出目錄。每天當我運行我的工作時,我希望它的輸出與現有輸出(舊)合併。我正在考慮的解決方案是生成/將日常作業的輸出存儲到某個臨時目錄中,並通過某個腳本將該臨時文件夾結構複製到舊的輸出目錄中。我在s3上有像「Campaign_Id/Year/Month/day」這樣的文件夾結構 – yogesh
Hadoop的TextInputFormat
(我猜你正在使用)不允許覆蓋現有的目錄。可能爲了找到你錯誤地刪除你(和你的集羣)非常努力的東西的痛苦找到了你的藉口。
但是,如果您確定您希望您的輸出文件夾到作業被覆蓋,我相信最徹底的方法是改變TextOutputFormat
有點像這樣:
public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
public RecordWriter<K, V>
getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException
{
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed)
{
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, true);
if (!isCompressed)
{
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
}
else
{
return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
}
}
}
現在正在創建的FSDataOutputStream
( fs.create(file, true)
),其中overwrite = true。
Hadoop已經支持您似乎試圖通過允許多個輸入路徑到達作業來實現的效果。不要嘗試添加更多文件的單個文件目錄,而是要添加新目錄的目錄目錄。要使用聚合結果作爲輸入,只需將輸入glob指定爲子目錄上的通配符(例如,my-aggregate-output/*
)。要將新數據「附加」到作爲輸出的聚合中,只需將聚合的新唯一子目錄指定爲輸出目錄,通常使用時間戳或從輸入數據派生的某個序列號(例如my-aggregate-output/20140415154424
)。
您可以爲每個執行的時間創建一個輸出子目錄。比如可以說你是從用戶期望的輸出目錄,然後進行如下設置:由以下行
FileOutputFormat.setOutputPath(job, new Path(args[1]);
更改此:
String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss", Locale.US).format(new Timestamp(System.currentTimeMillis()));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + timeStamp));
感謝托馬斯的答覆。我唯一的問題是我不想刪除現有的輸出目錄。每天當我運行我的工作時,我希望它的輸出與現有輸出(舊)合併。我正在考慮的解決方案是生成/將日常作業的輸出存儲到某個臨時目錄中,並通過某個腳本將該臨時文件夾結構複製到舊的輸出目錄中。 – yogesh
然後,您必須使用一個文件夾,您可以將所有作業輸出複製到該文件夾。這也可以用一個shellcript來完成,這個問題沒有內置的解決方案。 –