這是一個略顯混亂的問題。如果您的數據已經在Array[(String, Int)]
集合中(可能在驅動程序的collect()
之後),那麼您無需使用任何RDD
轉換。事實上,還有你可以用fold*()
跑過來集合搶平均一個漂亮的竅門:
val average = arr.foldLeft(0.0) { case (sum: Double, (_, count: Int)) => sum + count }/arr.foldLeft(0.0) { case (sum: Double, (word: String, count: Int)) => sum + count/word.length }
的長篇大論類,但它本質上聚集在分子的總字符數,字數在數分母。在你的例子來看,我看到以下內容:
scala> val arr = Array(("I",1), ("have",4), ("a",1), ("cat",6), ("The", 3), ("looks", 5), ("very" ,4), ("cute",4))
arr: Array[(String, Int)] = Array((I,1), (have,4), (a,1), (cat,6), (The,3), (looks,5), (very,4), (cute,4))
scala> val average = ...
average: Double = 3.111111111111111
如果您有跨RDD[(String, Int)]
分佈式您(String, Int)
元組,你可以使用accumulators來很容易地解決這個問題:
val chars = sc.accumulator(0.0)
val words = sc.accumulator(0.0)
wordsRDD.foreach { case (word: String, count: Int) =>
chars += count; words += count/word.length
}
val average = chars.value/words.value
當在運行例如(放置在RDD
)上面,我看到以下內容:
scala> val arr = Array(("I",1), ("have",4), ("a",1), ("cat",6), ("The", 3), ("looks", 5), ("very" ,4), ("cute",4))
arr: Array[(String, Int)] = Array((I,1), (have,4), (a,1), (cat,6), (The,3), (looks,5), (very,4), (cute,4))
scala> val wordsRDD = sc.parallelize(arr)
wordsRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:14
scala> val chars = sc.accumulator(0.0)
chars: org.apache.spark.Accumulator[Double] = 0.0
scala> val words = sc.accumulator(0.0)
words: org.apache.spark.Accumulator[Double] = 0.0
scala> wordsRDD.foreach { case (word: String, count: Int) =>
| chars += count; words += count/word.length
| }
...
scala> val average = chars.value/words.value
average: Double = 3.111111111111111
我在尋找每個單詞的平均長度(而不是在整個文本的水平),即如果一個單詞出現的次數越多,我需要得到更多的單詞的平均長度。例如,在我的段落中的單詞貓出現了兩次,從而,該單詞的平均長度爲6/3 = 2換句話說,如「該」,平均長度爲3/3 = 1 – VRK