0

這個問題解釋了Spark的隨機分割是如何工作的,How does Sparks RDD.randomSplit actually split the RDD,但我不明白spark如何跟蹤一個分割的值,以便這些相同的值不會進入第二個分割。Spark如何跟蹤隨機分裂的分裂?

如果我們看一下randomSplit的實現:

def randomSplit(weights: Array[Double], seed: Long): Array[DataFrame] = { 
// It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its 
// constituent partitions each time a split is materialized which could result in 
// overlapping splits. To prevent this, we explicitly sort each input partition to make the 
// ordering deterministic. 

val sorted = Sort(logicalPlan.output.map(SortOrder(_, Ascending)), global = false, logicalPlan) 
val sum = weights.sum 
val normalizedCumWeights = weights.map(_/sum).scanLeft(0.0d)(_ + _) 
normalizedCumWeights.sliding(2).map { x => 
    new DataFrame(sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted)) 
}.toArray 
} 

,我們可以看到,它創建一個共享相同的sqlContext並用兩種不同的樣品(RS)兩大DataFrames。

這兩個DataFrame是如何相互通信以便在第一個DataFrame中包含的值不包含在第二個DataFrame中?

並且是兩次獲取的數據? (假設sqlContext正在從數據庫中選擇,是兩次執行的選擇?)。

回答

2

這與採樣RDD完全相同。

假設您有權重數組(0.6, 0.2, 0.2),Spark將爲每個範圍(0.0, 0.6), (0.6, 0.8), (0.8, 1.0)生成一個DataFrame。

當需要讀取結果DataFrame時,Spark將會遍歷父DataFrame。對於每個項目,如果該數字落在指定範圍內,則生成一個隨機數字,然後發出該項目。所有子DataFrame共享相同的隨機數發生器(技術上,具有相同種子的不同發生器),所以隨機數序列是確定性的。

對於你的最後一個問題,如果你沒有緩存父DataFrame,那麼輸入DataFrame的數據將在每次輸出DataFrame被計算時重新獲取。

+0

我想強調一個技巧就是對每個'Sample'使用相同的種子。 – zero323

+0

非常好!這回答了我的問題。感謝您花時間解釋它! – James