2017-08-05 47 views
0

我正在寫一個方法,需要一個rdd並將其保存爲一個avro文件。問題是,如果我使用的特定類型比我能做的.toDF()但我不能在通用rdd上調用.toDF()!這裏有一個例子:如何將通用rdd轉換爲數據框?

case class Person(name: String) 

def f(x: RDD[Person]) = x.toDF() 
def g[T](x: RDD[T]) = x.toDF() 

f(p) //works 
g(p) //fails!! 

有誰知道爲什麼我不能在普通的RDD撥打.toDF()如果在它周圍有什麼辦法?

+0

調用'.toDf'就很困難,你需要編寫'g()'函數嗎? –

+0

雖然你是如何獲得RDD的?是否沒有替代方案來獲取DataFrame(或Spark2中的DataSet) –

+0

@ cricket_007'f'和'g'不是我正在編寫的實際函數。我想編寫一個包含RDD的函數,其中包含檢查點等附加功能,因此如果已經生成了數據,我可以加載數據,否則我將觸發DAG生成數據集並將其保存到磁盤。 Dataframe有幾種可以保存的格式,但RDD只有'saveAsTextFile'和'saveAsObject'(無論'對象'的含義)。 – anthonybell

回答

1

如果使用的是星火2,

import org.apache.spark.sql.Encoder 

def g[T: Encoder](x: RDD[T]) = x.toDF() 

會工作。

toDF是由implicit conversion

implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { 
    DatasetHolder(_sqlContext.createDataset(rdd)) 
} 

org.apache.spark.sql.SQLImplicits

添加的方法來完成,簽名應該是相同的。