我想在我的combineByKey
/reduceByKey
/foldByKey
中有依賴於當前正在操作的鍵的邏輯。從方法特徵中我可以看出,傳遞給這些方法的唯一參數是組合/縮小/摺疊的值。爲什麼我不能在減少邏輯中引用鍵?
用一個簡單的例子,我只是有一個RDD是(int, int)
元組,我想要的結果是tuple[0]
鍵入一個RDD其中值最接近鍵int
。
例如:
(1, 8)
(1, 3)
(1, -1)
(2, 4)
(2, 5)
(2, 2)
(3, 2)
(3, 4)
應減少到:
(1, 3)
(2, 2)
(3, 2)
注意,在比較(1, 3)
和(1, -1)
我不在乎哪一個是挑選,因爲它們都是相同的距離。 「3」鍵相同。
我可以想象這樣做的方法是沿着線的東西:
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2)
但reduce
功能只需要兩個參數:要合併兩個值。看起來最簡單的方法是參考我的減速器中的鑰匙以實現我的目標;這可能嗎?
如果我試試這個,我得到一個錯誤:
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2).collect()
TypeError:() takes exactly 3 arguments (2 given)
我真的不尋找一個解決方案,這個例子中的問題。我想知道的是,如果有一個原因,鑰匙沒有傳遞給reduceByKey
函數?我認爲這是我遺失的地圖縮減哲學的一些基本原理。
注我可以通過插入一個映射步驟,其每個值映射到由該值和從鑰匙的距離的元組解決我例如:
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd = rdd.map(lambda tup: (tup[0], tuple([tup[1], abs(tup[0] - tup[1])])))
rdd.reduceByKey(lambda v1, v2: v1 if v1[1] < v2[1] else v2).mapValues(lambda x: x[0]).collectAsMap()
尼斯的答案。我的問題的真正答案很可能僅僅是「因爲這不是API」。但無論如何我都在想這件事。 – FGreg