2016-10-11 66 views
0

我想在我的combineByKey/reduceByKey/foldByKey中有依賴於當前正在操作的鍵的邏輯。從方法特徵中我可以看出,傳遞給這些方法的唯一參數是組合/縮小/摺疊的值。爲什麼我不能在減少邏輯中引用鍵?

用一個簡單的例子,我只是有一個RDD是(int, int)元組,我想要的結果是tuple[0]鍵入一個RDD其中值最接近鍵int

例如:

(1, 8) 
(1, 3) 
(1, -1) 
(2, 4) 
(2, 5) 
(2, 2) 
(3, 2) 
(3, 4) 

應減少到:

(1, 3) 
(2, 2) 
(3, 2) 

注意,在比較(1, 3)(1, -1)我不在乎哪一個是挑​​選,因爲它們都是相同的距離。 「3」鍵相同。

我可以想象這樣做的方法是沿着線的東西:

rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2) 

reduce功能只需要兩個參數:要合併兩個值。看起來最簡單的方法是參考我的減速器中的鑰匙以實現我的目標;這可能嗎?

如果我試試這個,我得到一個錯誤:

rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)]) 
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2).collect() 

TypeError:() takes exactly 3 arguments (2 given)

我真的不尋找一個解決方案,這個例子中的問題。我想知道的是,如果有一個原因,鑰匙沒有傳遞給reduceByKey函數?我認爲這是我遺失的地圖縮減哲學的一些基本原理。


注我可以通過插入一個映射步驟,其每個值映射到由該值和從鑰匙的距離的元組解決我例如:

rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)]) 
rdd = rdd.map(lambda tup: (tup[0], tuple([tup[1], abs(tup[0] - tup[1])]))) 
rdd.reduceByKey(lambda v1, v2: v1 if v1[1] < v2[1] else v2).mapValues(lambda x: x[0]).collectAsMap() 

回答

0

我認爲沒有很強的理由不要傳遞鑰匙。
但是,我覺得reduceByKey API是爲通用用例設計的 - 計算每個鍵值的總和。到目前爲止,我從來都不需要在計算值時使用鍵。但那只是我的個人意見。

另外你解決的問題似乎是簡單的聚合問題。 min()groupByKey可以找到答案。我知道你不是在尋找解決方案,而是在於如何寫作。

from pyspark import SparkContext 

sc = SparkContext() 
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)]) 
reduced = rdd.groupByKey().map(lambda (k, v): (k, min(v, key=lambda e:abs(e-k)))) 
print(reduced.collectAsMap()) 

結果

{1: 3, 2: 2, 3: 2} 
+0

尼斯的答案。我的問題的真正答案很可能僅僅是「因爲這不是API」。但無論如何我都在想這件事。 – FGreg

相關問題