2016-09-21 59 views
2

我有一個(鍵,值)元素的RDD。鍵是NumPy數組。 NumPy數組不可散列,並且在嘗試執行reduceByKey操作時會導致問題。Spark:當鍵是不可排列的numpy數組時,如何「reduceByKey」?

有沒有辦法給我的手動散列函數提供Spark上下文?或者有沒有其他解決這個問題的方法(除了實際上將數組散列爲「離線」並將Spark傳遞給散列鍵)?

下面是一個例子:

import numpy as np 
from pyspark import SparkContext 

sc = SparkContext() 

data = np.array([[1,2,3],[4,5,6],[1,2,3],[4,5,6]]) 
rd = sc.parallelize(data).map(lambda x: (x,np.sum(x))).reduceByKey(lambda x,y: x+y) 
rd.collect() 

錯誤是:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

...

TypeError: unhashable type: 'numpy.ndarray'

回答

2

最簡單的解決方案是將其轉換爲一個對象,它是可哈希。例如:

from operator import add 

reduced = sc.parallelize(data).map(
    lambda x: (tuple(x), x.sum()) 
).reduceByKey(add) 

並在需要時再轉換回來。

Is there a way to supply the Spark context with my manual hash function

不是一個簡單的。整個機制取決於事實對象實現了一個__hash__方法和C擴展名不能被猴子修補。您可以嘗試使用調度來覆蓋pyspark.rdd.portable_hash,但即使考慮轉換成本,我也懷疑這是否值得。