2017-05-08 76 views
0

我試圖從我的數據框中的單個csv格式列創建新的數據框。我不知道模式,所以我試圖使用spark.createDataFrame方法沒有模式參數(類似於this example中的方法1)從任意長度的csv列創建spark數據框

我在嘗試如下所示的代碼,但得到了異常:

var csvrdd = df.select(df("Body").cast("string")).rdd.map{x:Row => x.getAs[String](0)}.map(x => x.split(",").toSeq) 
var dfWithoutSchema = spark.createDataFrame(csvrdd) 

錯誤:

error: overloaded method value createDataFrame with alternatives: 
    [A <: Product](data: Seq[A])(implicit evidence$3: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and> 
    [A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$2: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame 
cannot be applied to (org.apache.spark.rdd.RDD[Seq[String]]) 
     var dfWithoutSchema = spark.createDataFrame(csvrdd) 
+0

比方說, 「Body」的所有值都是帶有4列的CSV(例如'「a,b,c,d」') - 那麼您會期望生成的DataFrame具有4個String列還是一個Array列? –

回答

1

首先,失敗的原因可以清楚地在createDataFrame簽名看出:

def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame 

類型A有界scala.Product的子類。您的RDD包含Array[String],這不是這樣的子類。如果你真的想,你可以人爲地包裹在Tuple1陣列(延伸Product),並得到這個工作:

val csvrdd: RDD[Tuple1[Array[String]]] = df 
    .select(df("Body").cast("string")) 
    .rdd 
    .map{ x:Row => x.getAs[String](0)} 
    .map(x => Tuple1(x.split(","))) // wrapping with a Tuple1, which extends scala.Product 

val dfWithoutSchema = spark.createDataFrame(csvrdd) // now this overload works 

dfWithoutSchema.printSchema() 
// root 
// |-- _1: array (nullable = true) 
// | |-- element: string (containsNull = true) 

然而 - 這似乎並沒有太大的用處。這將創建一個類型爲ArrayType的單列DataFrame。這可以用更簡單split功能從org.apache.spark.sql.functions來實現:

val withArray = df.select(split(df("Body").cast("string"), ",") as "arr") 

withArray.printSchema() 
// root 
// |-- arr: array (nullable = true) 
// | |-- element: string (containsNull = true) 

或者,如果你希望獲得什麼是單獨列每個「CSV列」的數據幀,你會必須在所有記錄的共同模式上「決定」(並非所有記錄都具有相同數量的「CSV部分」)。你可以通過添加數據框的另一次掃描並計算最大數所需的列,然後讓有null星火「填補空白」,其中實際值包含較少的部件:

// first - split String into array of Strings 
val withArray = df.select(split(df("Body").cast("string"), ",") as "arr") 

// optional - calculate the *maximum* number of columns; 
// If you know it to begin with (e.g. "CSV cannot exceed X columns") - 
// you can skip this and use that known value 
val maxLength: Int = withArray.select(size($"arr") as "size") 
    .groupBy().max("size") 
    .first().getAs[Int](0) 

// Create the individual columns, with nulls where the arrays were shorted than maxLength 
val columns = (0 until maxLength).map(i => $"arr".getItem(i) as s"col$i") 

// select these columns 
val result = withArray.select(columns: _*) 

result.printSchema() // in my example, maxLength = 4 
// root 
// |-- col0: string (nullable = true) 
// |-- col1: string (nullable = true) 
// |-- col2: string (nullable = true) 
// |-- col3: string (nullable = true)