2016-03-28 125 views
-2

我有一些文件(part-00000.gz,part-00001.gz,part-00002.gz,...),每個部分都比較大。我需要使用每個部分的文件名,因爲它包含時間戳信息。據我所知,似乎在pyspark中只有整個文件文件可以讀取輸入(文件名,內容)。但是,使用wholeTextFiles時出現內存不足的錯誤。所以,我的猜測是wholeTextFiles將整個部分作爲映射器中的內容讀取,而不進行分區操作。我也找到了這個答案(How does the number of partitions affect `wholeTextFiles` and `textFiles`?)。如果是這樣,我怎麼能得到一個相當大的部分文件的文件名。謝謝在pyspark中使用wholeTextFiles,但得到內存不足的錯誤

回答

2

由於wholeTextFiles試圖將整個文件讀取到單個RDD中,所以出現錯誤。你最好逐行閱讀文件,只需編寫自己的生成器並使用flatMap函數即可完成。這是一個example這樣做讀取gzip文件:

import gzip 
def read_fun_generator(filename): 
    with gzip.open(filename, 'rb') as f: 
     for line in f: 
      yield line.strip() 

gz_filelist = glob.glob("/path/to/files/*.gz") 
rdd_from_bz2 = sc.parallelize(gz_filelist).flatMap(read_fun_generator) 
+0

我在亞馬遜s3工作。 glob.glob是否工作?你的答案似乎也是讀取文件中所有文件的行(part-00000)。我應該用sc.textfile替換bz2.open – ScutterKey

+0

我不僅想使用文件名,還要使用內容。看起來你仍然一行一行讀取RDD中的整個零件文件。 – ScutterKey

+0

你必須調整此代碼才能完成你想要的功能。 glob命令實際上只是爲了得到一個文件名列表,它被保存到'bz2_filelist'中。這個想法是創建一個文件名的RDD(這就是「並行化」),然後爲每個文件讀取該文件中的每一行。請注意,您可以訪問此生成器中的文件名。例如,你可以做'yield filename +「|」 + line.strip()'如果你想在每一行預先加上文件名。 – santon