好吧,沒有辦法像我想要的那樣快速壓縮。 但我找到了一個解決方案,如果有人需要它,我可以在這裏分享。
這個問題不僅與Storm有關,而且是一個更一般的Hadoop問題。
我所有的數據均是採用HdfsBolt創作的:
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");
//Synchronize data buffer with the filesystem every 1000 tuples
// Need to be configurable
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// Rotate data files when they reach five MB
// need to be configuration
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(10.0f, FileSizeRotationPolicy.Units.MB);
// Use default, Storm-generated file names
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/datadir/in_progress") ;
// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://"+dfsHost+":"+dfsPort)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.addRotationAction(new MoveFileAction().withDestination("/datadir/finished"));
這是給我按我的螺栓執行一個文件..不容易處理,但它的好:)
然後我安排自動壓縮使用Hadoop流(在NameNode的或像這樣一個cron):
hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
-Dmapred.reduce.tasks=0 \
-Dmapred.output.compress=true \
-Dmapred.compress.map.output=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec \
-input /datadir/finished \
-output /datadir/archives \
-mapper /bin/cat \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-outputformat org.apache.hadoop.mapred.TextOutputFormat
在這裏我還有一個問題: 一個輸入文件壓縮成一個檔案。 因此,我的10MB輸入文件(每個工作人員)壓縮到1MB的gzip(或bzip) - >這是生產這麼多小文件,這是一個問題在hadoop
要解決此問題,我會試着看看hadoop檔案(HAR)的功能。
我還需要在/ DATADIR清除已經壓縮的文件/成品
希望我會有反饋從你們 保持聯繫
問候, 巴斯蒂安
並不是說我會建議順序文件但缺少密鑰不應阻止你。您可以使用NullWritable作爲鍵。 – Venkat