2017-06-01 182 views
1

我有一個火花數據集,根據我的需要對數據進行分組和縮減。我需要擺脫元組,並只保留Tuple2 :: _ 2。 我想如下映射數據集:如何創建RowEncoder以將Tuple <A,Row>映射到Row?

sparkSession.read() 
      .parquet("s3://stuff/*") 
      .groupByKey((MapFunction<Row, Long>) value -> { 
       long stamp = value.getAs("timeStamp"); 
       return stamp/600000; 
      }, Encoders.LONG()) 
      .reduceGroups((ReduceFunction<Row>) (v1, v2) -> { 
       int fare1 = v1.getAs("totalFare"); 
       int fare2 = v2.getAs("totalFare"); 
       return fare1 < fare2 ? v1 : v2; 
      }) 
      .map((MapFunction<Tuple2<Long, Row>, Row>) Tuple2::_2, RowEncoder.apply(null)) 

無法弄清楚如何架構提供給RowEncoder ::適用。 我正在閱讀this架構的鑲木地板文件。我沒有使用與Java的火花,所以我不能更具體的

回答

2

所以我最終這樣做了。基本上讀取1個元素以獲得所需的「ExpressionEncoder」。我需要最終輸出中的全部「行」,所以無法繼續@ Jacek的方法。

System.out.println("Starting"); 
System.out.println(Arrays.toString(args)); 
Row sampleRow = sparkSession.read().parquet(readFrom).head(); 
ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(sampleRow.schema()); 

//read all elements, process and write back the result 
sparkSession.read() 
      .parquet(readFrom) 
      .groupByKey((MapFunction<Row, Long>) value -> { 
       long stamp = value.getAs("timeStamp"); 
       return stamp/600000; 
      }, Encoders.LONG()) 
      .reduceGroups((ReduceFunction<Row>) (v1, v2) -> { 
       int fare1 = v1.getAs("totalFare"); 
       int fare2 = v2.getAs("totalFare"); 
       return fare1 < fare2 ? v1 : v2; 
      }) 
      .map((MapFunction<Tuple2<Long, Row>, Row>) Tuple2::_2, rowEncoder) 
      .write() 
      .parquet(writeTo); 
System.out.println("Done !!!!"); 
1

,但是......

如果我沒有記錯的話,你只想與timeStamptotalFare下地幹活。 timeStamp是長類型,而totalFare是int類型。

我的第一個建議是使用as運營商留下無類型到Dataset[Long, Int](Scala中):

公開的數據集(編碼證據$ 2)返回每條記錄已經被映射到一個新的數據集到指定的類型。

這樣,你會避免處理這種不愉快對象和您的轉型將如下所示:

sparkSession.read() 
      .parquet("s3://stuff/*") 
      .as(Encoder...) // <-- I don't know how to write a tuple of (long, int) in Java 

已經這樣做了,你的問題有關map會得到「映射」使用Encoders.INT()如果我沒有弄錯(試圖將我的Scala思維映射到Java的)。

我提倡使用as操作的原因是,使用groupByKeyreduceGroups感覺對我來說是一個非常強烈的願望,留下無類型RelationalGroupedDataset API來輸入KeyValueGroupedDataset

相關問題