2016-11-09 118 views
-1

我有一個像這樣的RDD:RDD[(Any, Array[(Any, Any)])] 我只是想將它轉換成一個DataFrame。因此,我用這個模式將Rdd轉換爲數據幀

val schema = StructType(Array (StructField("C1", StringType, true), StructField("C4", ArrayType(StringType, false), false))) 

val df = Seq(
    ("A",1,"12/06/2012"), 
    ("A",2,"13/06/2012"), 
    ("B",3,"12/06/2012"), 
    ("B",4,"17/06/2012"), 
    ("C",5,"14/06/2012")).toDF("C1", "C2","C3") 
df.show(false) 

val rdd = df.map(line => (line(0), (line(1), line(2)))) 
    .groupByKey() 
    .mapValues(i => i.toList).foreach(println) 

val output_df = sqlContext.createDataFrame(rdd, schema) 

我RDD這個樣子的:

(B,List((3,12/06/2012), (4,17/06/2012)))  
(A,List((1,12/06/2012), (2,13/06/2012)))  
(C,List((5,14/06/2012))) 

或類似這樣的

(A,[Lscala.Tuple2;@3e8f27c9) 
(C,[Lscala.Tuple2;@6f22defb) 
(B,[Lscala.Tuple2;@1b8692ec) 

如果我使用:

.mapValues(i => i.toArray) 

我已經嘗試此:

val output_df = sqlContext.createDataFrame(rdd, schema) 

,但我得到:

Error:(40, 32) overloaded method value createDataFrame with alternatives: 
    (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and> 
    (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and> 
    (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and> 
    (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and> 
    (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and> 
    (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame 
cannot be applied to (Unit, org.apache.spark.sql.types.StructType) 
    val output_df = sqlContext.createDataFrame(rdd, schema) 

拉斐爾·羅斯
嘗試第二種方法至極不起作用,我得到:

Error:(41, 24) No TypeTag available for MySchema 
    val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF() 

第一種方法的工作很好,但我失去了我的元組的第一個元素與.mapValues(i => i.map(_._2))

你知道我是否可以完成fi第一個方法,以保持兩個元素

我決定把它轉換我的元組字符串,但這不是根據我,因爲我將要分裂我的字符串元組讀取列優雅的解決方案:

val rdd = df.map(line => (line(0), (line(1), line(2)))).groupByKey() 
     .mapValues(i => i.map(w => (w._1,w._2).toString)) 
     .map(i=>Row(i._1,i._2)) 

謝謝你的幫助

+0

可能重複的[如何將rdd對象轉換爲火花中的數據幀](http://stackoverflow.com/questions/29383578/how-to-convert-rdd-object-to-dataframe-in-spark) – cheseaux

+3

我認爲這將有助於如果您將錯誤添加到問題 – maasg

+0

@a。 moussa解決'沒有TypeTag可用於MySchema',你必須定義主要方法以外的案例類(如果有的話) –

回答

0

GroupByKey給你一個元組的Seq,你沒有考慮到你的模式。此外,sqlContext.createDataFrame需要您沒有提供的RDD[Row]

這應該使用schema

val rdd = df.map(line => (line(0), (line(1), line(2)))) 
    .groupByKey() 
    .mapValues(i => i.map(_._2)) 
    .map(i=>Row(i._1,i._2)) 

val output_df = sqlContext.createDataFrame(rdd, schema) 

你也可以使用一個case class可用於映射元組(不知道元組架構可以編程方式創建的):

val df = Seq(
     ("A", 1, "12/06/2012"), 
     ("A", 2, "13/06/2012"), 
     ("B", 3, "12/06/2012"), 
     ("B", 4, "17/06/2012"), 
     ("C", 5, "14/06/2012")).toDF("C1", "C2", "C3") 
    df.show(false) 

    val rdd = df.map(line => (line(0), (line(1), line(2)))) 
     .groupByKey() 
     .mapValues(i => i.toList) 

    // this should be placed outside of main() 
    case class MySchema(C1: String, C4: List[(Int, String)]) 

    val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF() 
+0

嗨,謝謝你的回答,它不起作用,我用你的評論完成我的問題。如果你有任何想法,這是真正有用的。 –

+0

非常感謝你,當我將MySchema移動到我的方法之外時,它工作得非常好 –