2016-05-15 79 views
0

如果我有一個數據集與此類似:對RDD轉換

val list = List ((1,1), (1,2), (1,3), (2,2), (2,1), (3,1), (3,3))

而且我想找到每個鍵的平均所以輸出應該是:

(1, 2), (2, 3/2), (3, 2)我能做到這一點使用groupByKey, countByKey, and reduceByKey莫名其妙或我必須使用類似於下面的示例combineByKey方法:我嘗試使用groupByKey, countByKey, and reduceByKey但這種方法的組合不起作用,我想知道是否有人知道使用這三種方法做到這一點?

val result = input.combineByKey(
(v) => (v, 1), 
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), 
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)). 
map{ case (key, value) => (key, value._1/value._2.toFloat) } 

result.collectAsMap().map(println(_)) 

回答

4

你應該嘗試以下操作:

val sc: SparkContext = ... 
val input = sc.parallelize(List((1,1), (1,2), (1,3), (2,2), (2,1), (3,1), (3,3))) 
val averages = input.groupByKey.map { case (key, values) => 
    (key, values.sum/values.size.toDouble) 
} 

println(averages.collect().toList) // List((1,2.0), (2,1.5), (3,2.0)) 
+0

哦,對了,我忘當你groupByKey它把值在數組中,所以你可以使用之和大小在該陣列上!感謝您的幫助! – CapturedTree

1

那麼你可以簡單地使用PairRDDFunctions.groupByKey並計算你想要什麼。

val avgKey = input.groupByKey.map{ 
    case (k, v) => (k, v.sum.toDouble/v.size) 
} 
avgkey.collect 
//res2: Array[(Int, Double)] = Array((3,2.0), (1,2.0), (2,1.5)) 
1

使用reduceByKey,與二倍體之前轉化爲三胞胎

rdd.map{ case(k,v) => (k,(v,1)) }. 
    reduceByKey((a,v) => (a._1+v._1, a._2+v._2)). 
    map {case (k,v) => (k, v._1/v._2)} 
+0

你好榆木!對不起,對於遲到的回覆,但是你能解釋一下當你在map函數中使用'case'嗎?只是爲了讓你可以將參數寫成'(k,v)',而不必具體指定'k'和'​​v'的結構。例如,如果'k'是一個元組,在地圖中沒有情況下,我將不得不寫'((a1,a2),v)'?所以它在技術上只適用於模式匹配? – CapturedTree

+0

使用'case'我們*啓用*模式匹配,提取或分解數據結構,我們使用大括號來定義一個部分函數(並非所有的模式都需要定義)。另一方面,元組數據結構的使用涉及使用它自己的方法(._1和._2)來獲取(提取)數據項。 – elm