2016-08-25 71 views
2

我嘗試使用Java中的Apache Spark來解決大規模數據處理問題。我的輸入是一大組相對較小的自定義Java對象。Apache Spark中的不同縮減器

我的地圖步驟對每個對象執行一些小的改動。完成後,它會識別對象所屬的一個或多個等價類。綜合起來,可能有數十億等價類/對象對。

我的問題是我需要對每個等價類的元素執行不同的操作。這個項目需要支持插件架構,所以我不知道等價類是什麼,或者每個類需要發生的不同操作。

我的直覺是使用類似如下:

//Get the input set. 
JavaRDD<MyType> input = ... //Not important 

//Transform the input into (Equivalence Class, MyType) pairs, 
//using strings to store the equivalence class. 
JavaPairRDD<String, MyType> classedInput = input.flatMapToPair(

    new PairFlatMapFunction<MyType, String, MyType>() { 

     Iterator<Tuple2<String, MyType>> call(MyType arg) { 

      List<Tuple2<String, MyType>> out = new ArrayList<>(); 

      //Compute equivalence classes for arg. 
      for(String eqClz: getEquivalenceClasses(arg)) { 
       out.add(new Tuple2<String, MyType>(equClz, arg)); 
      } 

      return out.iterator(); 
     } 
}); 

//Collapse the results for each equivalence class. 
JavaPairRDD<String, MyType> output = classedInput.reduceByKey(

    new Function2<MyType, MyType, MyType>() { 

     MyType call(MyType a, MyType b) { 
      String eqClz = ??? //<= Problem 
      List<MyModule> modules = MyFramework.getModulesForEqClz(eqClz); 
      for(MyModule m: modules) { 
       a = m.merge(a, b); 
      } 
      return a; 
     } 
    } 

); 

我希望能夠等價類通成reduceByKey的功能,以便用它來確定哪些模塊需要調用。問題在於我沒有發現Spark的鍵控組合函數可以將密鑰傳遞給它們的回調函數。

由於classedInput的大小,我想避免用MyType對象保存密鑰,或者在地圖之後添加太多額外的分佈式操作。

有沒有類似Spark的方式來完成我正在嘗試的?

回答

0

看來你的問題是"secondary sort"問題的反面。它可以通過逆解來解決,我認爲(下面的2.)。

  1. 一種方法是使用reduce函數(或它的更完整的版本,aggregate),這只是需要一個關聯的操作爲您彙總數據的結果,而不管的關鍵。但是在相同的等價類中表達項目分組的細節可能會有點複雜。
  2. 更簡單的方式來保持對等價類針對的項目相匹配是簡單地重複等價類中值的參考:

Tuple2<String, MyType> outValue = new Tuple2<String, MyType>(eqClz, arg); out.add(new Tuple2<String, Tuple2<String, MyType>>(equClz, outValue));

如果你在留言中提到,當您傳輸數據時,您擔心隨機播放的大小,也許您想限制的是用作鍵控構造的表示的大小。我的意思是,重複上述建議的關鍵值可以得到equClz變量的兩個副本。但是,如果是十幾個字節,那麼您想縮小的大小就是關鍵位置中的副本。要做到這一點,您可能需要選擇正確長度的非加密散列。

您提到每個記錄多打幾十字節會導致數據量增加千兆字節,這意味着您最多隻有幾億條記錄,因此最多隻有幾億「equClz」值。這很容易被32位非密碼哈希覆蓋(你可以很容易地找到這些實現,Murmur3,XXHash)。由於32位是4個字節,它應該將傳輸開銷減少至少一個數量級。

+0

要開始,很好的答案。對於我的特定用例,我擔心這些解決方案的內存和網絡使用情況。 對於1.,我對Spark的.reduce()的理解是它將單個結果返回給調用者。鑑於單獨的工件數量,我想避免一次將它們集中在一個地方。 對於2.我擔心關聯額外的數據與每個值。我的直覺是,每個記錄多幾十字節就等於在線路上發送千兆字節或更多的額外數據。這是合理的,還是我的基地? – PilotScape64

+0

@ PilotScape64 http://stackoverflow.com/help/someone-answers – huitseeker