2017-06-13 81 views
0

我使用Window.sum函數來獲取RDD中的值的總和,但是當我將DataFrame轉換爲RDD時,我發現結果只有一個分區。重新分區何時發生? ?將RDD轉換爲DataFrame時會導致重新分區的原因是什麼?

val rdd = sc.parallelize(List(1,3,2,4,5,6,7,8), 4) 
    val df = rdd.toDF("values"). 
     withColumn("csum", sum(col("values")).over(Window.orderBy("values"))) 
    df.show() 
    println(s"numPartitions ${df.rdd.getNumPartitions}") 
    // 1 
    //df is: 
// +------+----+ 
// |values|csum| 
// +------+----+ 
// |  1| 1| 
// |  2| 3| 
// |  3| 6| 
// |  4| 10| 
// |  5| 15| 
// |  6| 21| 
// |  7| 28| 
// |  8| 36| 
// +------+----+ 

我添加partitionBy在窗口,但結果是錯誤,我應該怎麼做,這是我改變代碼:

 val rdd=sc.parallelize(List(1,3,2,4,5,6,7,8),4) 
     val sqlContext = new SQLContext(m_sparkCtx) 
     import sqlContext.implicits._ 
     val df = rdd.toDF("values").withColumn("csum", sum(col("values")).over(Window.partitionBy("values").orderBy("values"))) 
     df.show() 
     println(s"numPartitions ${df.rdd.getNumPartitions}") 
     //1 
//df is: 
// +------+----+ 
// |values|csum| 
// +------+----+ 
// |  1| 1| 
// |  6| 6| 
// |  3| 3| 
// |  5| 5| 
// |  4| 4| 
// |  8| 8| 
// |  7| 7| 
// |  2| 2| 
// +------+----+ 

回答

1

Window功能有partitionBy API用於分組的dataframeorderBy訂購按升序或降序分組rows

在您的第一個案例中,您尚未定義partitionBy,因此所有值都歸入一個dataframe以進行排序,從而將數據混合到一個分區中。

但在第二種情況下,您在values本身上定義了partitionBy。因此,由於每個值都不相同,因此每個row都被分組爲單個組。

的在第二種情況下partition是200,因爲這是當你還沒有定義分區和洗牌發生

當您第一種情況下得到同樣的結果在spark定義的默認分區,則需要添加其他column與分組價值,以便您可以將它們按照您的第一種情況分組,即分組到一個組中。

val rdd=sc.parallelize(List(1,3,2,4,5,6,7,8),4) 
val df = rdd.toDF("values").withColumn("grouping", lit("group")) 
    df.withColumn("csum", sum(col("values")).over(Window.partitionBy("grouping").orderBy("values"))).drop("grouping").show(false) 

通過這樣做,我看到您的原始分區被保留。

+0

我該怎麼辦? – mentongwu

+0

「我該怎麼辦?」是什麼意思? –

+0

我該怎麼做才能得到與分區相同的結果? – mentongwu

相關問題