2016-08-05 61 views
6

我想要RDD表現的動作,如reduce,但不需要操作符可交換。即我希望result將始終是"123456789"RDD中是否有任何操作保持順序?

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24 

scala> val result = rdd.someAction{ _+_ } 

首先,我找到了fold。的RDD#fold的醫生說:

DEF摺疊(零值:T)(OP:(T,T)⇒T):T骨料 每個分區的所有分區中的元件,並且然後將結果,使用一個 給聯想功能和中性「零值」

注意,沒有可交換在doc需要。但是,預期的結果並不:

scala> rdd.fold(""){ _+_ } 
res10: String = 312456879 

編輯我曾嘗試通過@ DK14提到,沒有運氣:

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res22: String = 341276895 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res23: String = 914856273 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res24: String = 742539618 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res25: String = 271468359 
+0

您錯過了文檔的下一部分,它描述了您所看到的內容:*「這與在Scala等函數語言中爲非分佈式集合實現的摺疊操作有些不同,這種摺疊操作可能適用於然後將這些結果摺疊到最終結果中,而不是按照某些定義的順序將摺疊應用於每個元素。對於不可交換的函數,結果可能與應用於非分佈式集合的摺疊結果不同。「* –

回答

2

沒有內置降低滿足在斯卡拉這一標準動作,但你可以很容易地通過結合mapPartitionscollect和地方減少實現自己的:

import scala.reflect.ClassTag 

def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = { 
    rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f) 
} 

使用的collectreduce組合用於合併,而不是由所使用fold異步和無序方法確保全局順序被保留。

這當然帶有包括一些額外的成本:對駕駛員

  • 略高的內存佔用。
  • 顯着更高的延遲 - 我們明確地等待所有任務完成,然後再開始本地減少。
+0

感謝您的幫助,這是否意味着每個分區**始終是整個RDD的連續子序列**?有沒有提到的文件? – Eastsun

+0

關於文檔 - 我沒有意識到。儘管如此,它或多或少受到某些有序方法的模型和契約的約束。 Spark中真正的問題是如何確定整個序列。一般情況下,有兩種情況是當您對訂單進行推理時a)使用顯式排序(按合同)b)當您有輸入時生成確定性有序分割並且在輸入和當前點之間沒有混洗和其他數據移動。 – zero323

1

正如指出的@YuvalItzchakov fold不保留排序在組合結果時分區爲RDD。爲了說明這一點考慮合併原RDD一個唯一的分區,

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)).coalesce(1) 
rdd: org.apache.spark.rdd.RDD[String] = CoalescedRDD[27] at coalesce at <console>:27 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res4: String = 123456789 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res5: String = 123456789 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res6: String = 123456789 
+0

應該指出的是,這樣做會遭受完全失去計算並行能力的缺點。 –

+0

@YuvalItzchakov確定;使用'fold',排序可能不會保存在分區的'RDD'中。 – elm

+0

是的,我明白了。但是OP應該意識到這一點。 –

相關問題