2016-06-28 103 views
5

我正在嘗試下面的代碼,它將一個數字添加到RDD中的每一行,並使用PySpark返回一個RDD列表。PySpark評估

from pyspark.context import SparkContext 
file = "file:///home/sree/code/scrap/sample.txt" 
sc = SparkContext('local', 'TestApp') 
data = sc.textFile(file) 
splits = [data.map(lambda p : int(p) + i) for i in range(4)] 
print splits[0].collect() 
print splits[1].collect() 
print splits[2].collect() 

在輸入文件(sample.txt的)的含量爲:

1 
2 
3 

我(分別與0添加數字在RDD,1,2)期待像這樣的輸出:

[1,2,3] 
[2,3,4] 
[3,4,5] 

而實際產量爲:

[4, 5, 6] 
[4, 5, 6] 
[4, 5, 6] 

這意味着理解只使用變量i的值3,而不考慮範圍(4)

爲什麼會發生這種行爲?

回答

3

它發生是因爲Python後期綁定,並不是(Py)Spark特定的。 i將在使用lambda p : int(p) + i時查找,而不是在定義時查找。通常,它意味着什麼時候被調用,但在這個特定的上下文中,它是在序列化時發送給工作人員的。

例如,你可以像這樣做:

def f(i): 
    def _f(x): 
     try: 
      return int(x) + i 
     except: 
      pass 
    return _f 

data = sc.parallelize(["1", "2", "3"]) 
splits = [data.map(f(i)) for i in range(4)] 
[rdd.collect() for rdd in splits] 
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]] 
+0

我曾試圖通過「P」,以一個簡單的外部函數,以及內部函數(如一個在答案中)通過一個lambda調用,用於試驗和錯誤目的。 注意到正確的行爲,當我這樣做:http://pastebin.com/z7E7wGKx 謝謝你回答爲什麼發生這種情況的原因。 – srjit

+0

值得注意的是,這發生在幾乎任何語言的閉包/ lambdas,甚至C# –

2

這是由於這樣的事實:通過lambda表達式參考參考我!它與火花無關。 See this

你可以試試這個:

a =[(lambda y: (lambda x: y + int(x)))(i) for i in range(4)] 
splits = [data.map(a[x]) for x in range(4)] 

或在一行

splits = [ 
    data.map([(lambda y: (lambda x: y + int(x)))(i) for i in range(4)][x]) 
    for x in range(4) 
] 
+1

如果你想使用'lambdas'有一個簡單的技巧,避免嵌套:'[lambda x,i = i:i + int(x )我在範圍內(4)]'。 – zero323