2017-05-08 48 views
1

我正在嘗試採用包含整數範圍對的RDD,並對其進行轉換,以便每對都有第三項,它會遍歷範圍內的可能值。基本上,我有這樣的:在Spark中使用Lambdas列表轉換

[[1,10], [11,20], [21,30]] 

而且我想這個落得:

[[1,1,10], [2,1,10], [3,1,10], [4,1,10], [5,1,10]...] 

我想轉換的文件是非常大的,這就是爲什麼我我希望在PySpark上做這件事,而不是在本地機器上使用Python(我有一種方法可以在CSV文件上本地執行此操作,但在文件大小的情況下,該過程需要幾個小時)。到目前爲止,我有這樣的:

a = [[1,10], [11,20], [21,30]] 
b = sc.parallelize(a) 
c = b.map(lambda x: [range(x[0], x[1]+1), x[0], x[1]]) 
c.collect() 

其中產量:

>>> c.collect() 
[[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 1, 10], [[11, 12, 13, 14, 15, 16, 17, 18, 19, 20], 11, 20], [[21, 22, 23, 24, 25, 26, 27, 28, 29, 30], 21, 30]] 

我無法找出下一步需要從這裏,遍歷擴大範圍,用範圍分隔符將每個對進行配對。

任何想法?

編輯2017年5月8日下午3:00

的Python的技術,即對一個CSV輸入的工作原理是:

import csv 
import gzip 
csvfile_expanded = gzip.open('C:\output.csv', 'wb') 
ranges_expanded = csv.writer(csvfile_expanded, delimiter=',', quotechar='"') 
csvfile = open('C:\input.csv', 'rb') 
ranges = csv.reader(csvfile, delimiter=',', quotechar='"') 
for row in ranges: 
    for i in range(int(row[0]),int(row[1])+1): 
     ranges_expanded.writerow([i,row[0],row[1]) 

的PySpark腳本我質疑始於CSV文件已經被加載到HDFS中並作爲RDD進行投射。

回答

2

試試這個:

c = b.flatMap(lambda x: ([y, x[0], x[1]] for y in xrange(x[0], x[1]+1))) 

flatMap()確保您能獲得每範圍的元素一個輸出記錄。還請注意外()連同xrange - 這是一個生成器表達式,避免實現執行程序的內存中的整個範圍。

注意:xrange()是Python2。如果您正在運行Python3,請使用range()

+0

這非常完美!非常感謝您的幫助和解釋。我無法弄清楚如何在lambda中插入'for'循環,但看到你的解決方案很有意義。 – nxl4