2017-04-18 80 views
0

我想將我的風暴拓撲中傳入的所有原始數據存儲在HDFS羣集中。 這是JSON或二進制數據,以2k /秒的速率傳入。風暴 - 使用壓縮技術寫入HDFS

我試圖使用HDFS螺栓(http://storm.apache.org/releases/0.10.0/storm-hdfs.htmlà,但使用正常的HDFS不允許壓緊螺栓使用序列文件博爾特 壓縮纔是可能的。 我不想使用序列文件,因爲我沒有一個真正的關鍵。

另外,我已經卡桑德拉用於存儲我的鍵/值的東西,服務我的要求。 它只是使用卡桑德拉我的原始數據(沒有這個職位需要太多的磁盤(開銷)客觀辯論這個)

誰能幫我一下嗎? 我可以用java Hadoop驅動客戶端來實現這一點? 有沒有人的代碼片段?

+0

並不是說我會建議順序文件但缺少密鑰不應阻止你。您可以使用NullWritable作爲鍵。 – Venkat

回答

0

好吧,沒有辦法像我想要的那樣快速壓縮。 但我找到了一個解決方案,如果有人需要它,我可以在這裏分享。

這個問題不僅與Storm有關,而且是一個更一般的Hadoop問題。

我所有的數據均是採用HdfsBolt創作的:

RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|"); 

    //Synchronize data buffer with the filesystem every 1000 tuples 
    // Need to be configurable 
    SyncPolicy syncPolicy = new CountSyncPolicy(1000); 

    // Rotate data files when they reach five MB 
    // need to be configuration 
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(10.0f, FileSizeRotationPolicy.Units.MB); 

    // Use default, Storm-generated file names 
    FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/datadir/in_progress") ; 

    // Instantiate the HdfsBolt 
    HdfsBolt bolt = new HdfsBolt() 
     .withFsUrl("hdfs://"+dfsHost+":"+dfsPort) 
     .withFileNameFormat(fileNameFormat) 
     .withRecordFormat(format) 
     .withRotationPolicy(rotationPolicy) 
     .withSyncPolicy(syncPolicy) 
     .addRotationAction(new MoveFileAction().withDestination("/datadir/finished")); 

這是給我按我的螺栓執行一個文件..不容易處理,但它的好:)

然後我安排自動壓縮使用Hadoop流(在NameNode的或像這樣一個cron):

hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \ 
     -Dmapred.reduce.tasks=0 \ 
     -Dmapred.output.compress=true \ 
     -Dmapred.compress.map.output=true \ 
     -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec \ 
     -input /datadir/finished \ 
     -output /datadir/archives \ 
     -mapper /bin/cat \ 
     -inputformat org.apache.hadoop.mapred.TextInputFormat \ 
     -outputformat org.apache.hadoop.mapred.TextOutputFormat 

在這裏我還有一個問題: 一個輸入文件壓縮成一個檔案。 因此,我的10MB輸入文件(每個工作人員)壓縮到1MB的gzip(或bzip) - >這是生產這麼多小文件,這是一個問題在hadoop

要解決此問題,我會試着看看hadoop檔案(HAR)的功能。

我還需要在/ DATADIR清除已經壓縮的文件/成品

希望我會有反饋從你們 保持聯繫

問候, 巴斯蒂安