2016-12-16 86 views
3

我現在遇到一個問題,我試圖從多個文件中使用燙傷讀取並使用單個文件創建輸出。我的代碼是這樣的:使用燙傷法讀取多個文件並輸出一個SINGLE文件

def getFilesSource (paths: Seq[String]) = { 
    new MultipleTextLineFiles(paths: _*) { 
     override protected def createHdfsReadTap(hdfsMode: Hdfs): Tap[JobConf, _, _] = { 
     val taps = goodHdfsPaths(hdfsMode).toList.map { 
      path => CastHfsTap (new Hfs (hdfsScheme, path, sinkMode)) 
     } 

     taps.size match { 
      case 0 => { 
      CastHfsTap (new Hfs(hdfsScheme, hdfsPaths.head, sinkMode)) 
      } 
      case 1 => taps.head 
      case _ => new ScaldingMultiSourceTap(taps) 
     } 
     } 
    } 
    } 

但是當我運行這段代碼,它分裂我的輸出到許多文件,但是裏面的數據非常少:短短K.相反,我希望能夠聚集所有輸出文件成一個單一的。

我燙碼是:

val source = getFilesSource(mapped) // where mapped is a Sequence of valid HDFS paths (Seq [String]) 

TypedPipe.from(source).map(a => Try{ 
    val json = JSON.parseObject(a) 
    (json.getInteger("prop1"), json.getInteger("prop2"), json.getBoolean("prop3")) 
}.toOption).filter(a => a.nonEmpty) 
    .map(a => a.get) 
    .filter(a => !a._3) 
    .map (that => MyScaldingType (that._1, that._2)) 
    .write(MyScaldingType.typedSink(typedArgs)) 

我想我必須重寫型ScaldingMultiSourceTap的「sourceConfInit」的方法,但我不知道怎麼寫裏面...

回答

0

您可以使用groupAll發送所有的地圖輸出(作業是一個只有地圖的作業)到一個reducer,考慮到數據很小,然後寫一個。輸出將被寫入單個文件。

. 
. 
. 
.filter(a => !a._3) 
.map (that => MyScaldingType (that._1, that._2)) 
.groupAll 
.write(MyScaldingType.typedSink(typedArgs)) 
+0

嗨@karthikcru,謝謝你的回答,聽起來很有希望。今天早上我會試試這個。我希望,因爲我在映射階段執行過濾器(使用聲明:.filter(a =>!a._3)),對於我的商業案例,將會有大量數據不會通過該過濾條件。剩餘的東西將被髮送到單個減速器。 –

相關問題