2017-04-25 113 views
1

我有一個字典類型RDD我:PySpark:迭代過字典類型RDD

>>> a.collect() 

[{(1155718, 105): 14, (1155718, 1887): 2, (1155718, 1930): 12, (1155718, 927): 6, (1155718, 2783): 8, (1155718, 738): 4, (1155718, 952): 4, (1155718, 1196): 6, (1155718, 997): 4, (1155718, 2904): 38}]

只是爲了檢查:

>>> a.map(lambda x:type(x)).collect() 

[< type 'dict' >]

但是我不能遍歷通過使用map()的字典型RDD。我想:

>>> a.map(lambda x:(k,v) for k,v in x.iteritems()) 

要我說出驚訝它導致錯誤:

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
NameError: name 'x' is not defined 

我錯過任何重要的一點在這裏。

編輯:代碼是所有權利限制與發電機的語法小bug正確的代碼應該是:

a.map(lambda x:[(k,v) for k,v in x.iteritems()]) 

回答

1

我嘗試這樣做:

data = [{(1155718, 105): 14, (1155718, 1887): 2, (1155718, 1930): 12, (1155718, 927): 6, (1155718, 2783): 8, (1155718, 738): 4, 
     (1155718, 952): 4, (1155718, 1196): 6, (1155718, 997): 4, (1155718, 2904): 38}] 

rdd = sc.parallelize(data) 
rdd.flatMap(lambda _: [(k,v) for (k,v) in _.items()]).collect() 

,並得到這個:

[((1155718, 105), 14), 
((1155718, 738), 4), 
((1155718, 2904), 38), 
((1155718, 1887), 2), 
((1155718, 1196), 6), 
((1155718, 1930), 12), 
((1155718, 927), 6), 
((1155718, 2783), 8), 
((1155718, 997), 4), 
((1155718, 952), 4)] 
+0

你實際上是我的代碼應該是'a.map(lambda x:[(k,v)for k,v in x.iteritems()])' – abhiieor

+0

只需要在這裏注意:對於Python 3,使用'items()'。對於Python 2,使用'iteritems()' – titipata