2017-07-17 73 views
1

我正在研究將學生數據轉換爲間隔的小型項目。該程序只是讀取數據,並從標記列中選擇標記(整數),將其按升序排序後轉換爲間隔。任何一個可以幫我這個特殊部分,與許多感謝:如何從Scala中的列讀取行

代碼:

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.Row 


case class Rating(mark: Int, classes: String, schooles: String, name: String) 

val Result = sc.textFile("hdfs://schools:9000/input/marks.csv").map(_.split(",")).map(p => Rating(p(0).toInt, p(1).trim, p(2).trim, p(3).trim)).toDF 
val all_marks = Result.groupBy("classes", "schooles","name").agg(collect_list("mark") as "marks",count("*") as "cnt").where($"cnt" > 10) 

val mrk=all_marks.select("marks") 

我需要幫助的部分:

mrk.foreach(
    var ascending=mrk.sort 
    var interval=ascending[0]+"-"+ascending[ascending.size] 
) 

我怎樣才能讀取的標記按行行,所以我可以對它們進行排序,並將它們轉換爲間隔。

回答

2

您可以創建一個用戶自定義函數,從你的列表中創建一個新的領域作爲間隔

下面是一個簡單的例子,你已經計算列標記

import org.apache.spark.sql.functions._ 
val ddf1 = Seq(List(2,3,1), List(6,4,3)).toDF("marks") 

val testUdf = udf((list: Seq[Int]) => { 
    val ascending = list.sorted //sorts in ascending order 
    s"${ascending(0)} - ${ascending(ascending.size - 1)}" 
}) 

ddf1.withColumn("marks", testUdf($"marks")) 

輸出:

+-----+ 
|marks| 
+-----+ 
|1 - 3| 
|3 - 6| 
+-----+ 

希望這有助於!

+0

非常感謝Shankar,但是如何將數據幀「val mrk = all_marks.select(」marks「)」轉換爲Seq和「val ddf1 = Seq(List(2,3,1),List(6,4, 3 ))。toDF(「marks」)「 –

+0

你爲什麼要轉換爲列表?我認爲這不是好主意 –

+0

如果你真的想要mrk.rdd.map(r => r(0))。collect()這是你可以做的 –

1

它可以使用旁邊的方式來獲得這樣的結果 - 數據幀轉換爲RDD與列表類型,適用的地圖功能和轉換RDD回數據幀:

mrk.rdd.map(_.getList[Int](0).toList).map(l => s"${l.min} - ${l.max}").toDF("marks") 

注:getList[Int]返回Java的utils.List鍵入並將其轉換爲Scala的列表,我們必須使用toList方法並導入scala.collection.JavaConversions._

也可以使用DataSet的API,而不是RDD:

mrk.map(_.getList[Int](0).toList).map(l => s"${l.min} - ${l.max}").toDF("marks") 
1

我需要使用一個WrappedArray得到UDF的工作,像這樣:

case class Rating(mark: Int, classes: String, schooles: String, name: String) 

val Result = sc.parallelize(Seq(
    Rating(56, "classA", "SchoolA", "English"), 
    Rating(57, "classB", "SchoolA", "English"), 
    Rating(58, "classA", "SchoolA", "English"), 
    Rating(59, "classB", "SchoolA", "English"), 
    Rating(60, "classA", "SchoolA", "English"), 
    Rating(61, "classA", "SchoolA", "English"))).toDF() 


val toInterval = udf((marks: scala.collection.mutable.WrappedArray[Int]) => s"${marks.min}-${marks.max}") 

val all_marks = Result.groupBy("classes", "schooles","name").agg(collect_list("mark") as "marks",count("*") as "cnt") 

all_marks.select("marks").withColumn("interval", toInterval(col("marks"))).show() 

輸出:

+----------------+--------+ 
|   marks|interval| 
+----------------+--------+ 
|[56, 58, 60, 61]| 56-61| 
|  [57, 59]| 57-59| 
+----------------+--------+