2017-10-09 128 views
0

我正在使用spark 1.6.1。如何將GroupedDataset保存爲實木複合地板或將其轉換爲DF

是否有任何API可用於將GroupDataset保存爲實木複合地板文件。 或將其轉換爲DataFrame。

E.g.我有一個自定義的對象「過程」,我已經將Dataframe轉換爲過程對象。 之後,我正在通過patientID進行分組。 我想分組爲文件或將其作爲數據框傳遞給其他函數。 我沒有獲得任何存儲API或將其轉換爲Dataframe。

val procedureDs: Dataset[Procedure] = joinDf.select("patientid", "patientprocedureid", "procedurecode").as[Procedure] 
val groupedDs:GroupedDataset[Long, Procedure] = procedureDs.groupBy{ x => x.patientid } 

應用mapGroups後

val a = groupedDs.mapGroups{ case (k,vs) => { (k, vs.toSeq)}} 

它提供了以下錯誤:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.....PatientDiagnosis 
- array element class: "com....PatientDiagnosis" 
- field (class: "scala.collection.Seq", name: "_2") 
- root class: "scala.Tuple2" 

我曾試圖給出明確的編碼器

val a = groupedDigDs.mapGroups((k,vs) => (k, vs.toSeq))(org.apache.spark.sql.Encoders.bean(classOf[(Long, Seq[com....PatientDiagnosis])])) 

然後錯誤更改爲:

java.lang.UnsupportedOperationException: Cannot infer type for class scala.Tuple2 because it is not bean-compliant 

回答

1

GroupedDataRelationalGroupedDataset在火花2.X),GroupedDataset(在火花2.X KeyValueGroupedDataset)具有被聚合,然後才能保存。

如果你的目標是到另一個groupByKey您可以使用mapGroups

val groupedDs: GroupedDataset[K, V] = ??? 
// ... { case (k, xs) => (k, xs.toSeq) } to preserve key as well 
groupedDs.mapGroups { case (_, xs) => xs.toSeq } 

並將結果寫入。

相關問題