2015-02-23 113 views
2

我有一個存儲在S3存儲桶中的大型數據集,但它不是一個單獨的大文件,而是由許多(確切的說是113K)單個JSON文件組成的,其中每個包含100-1000個觀察值。這些觀察不是最高級別的,但需要在每個JSON中進行一些導航才能訪問。 即 json [「interact」]是一個詞典列表。PySpark:如何閱讀許多JSON文件,每個文件有多個記錄

我試圖利用Spark/PySpark(版本1.1.1)解析並減少這些數據,但是我找不到正確的方法將它加載到RDD中,因爲它既不是所有記錄>一個文件(在這種情況下,我會使用sc.textFile,雖然增加了JSON的複雜性),但每個記錄>一個文件(在這種情況下,我會使用sc.wholeTextFiles)。

是我最好的選擇使用sc.wholeTextFiles,然後使用一個地圖(或在這種情況下flatMap?)拉多個觀測從單個文件名鍵存儲到他們自己的密鑰?還是有更簡單的方法來做到這一點,我失蹤了?

我已經在這裏看到了答案,建議在通過sc.textFile加載的所有文件上使用json.loads(),但它看起來不像那樣對我有效,因爲JSON不是簡單的最高級別名單。

+0

我遇到了類似的問題。請讓我知道是否有解決方案。我剛開始嘗試pyspark,並且我在s3中有很多json文件需要分析 – user1652054 2015-04-16 04:25:41

回答

3

該名稱具有誤導性(因爲它是單數形式),但sparkContext.textFile()(至少在Scala案例中)也接受目錄名稱或通配符路徑,因此您只能說textFile("/my/dir/*.json")

2

如何使用數據框?

確實 testFrame = sqlContext.read.json('s3n://<bucket>/<key>') 給你你想要從一個文件?

每個觀察是否都有相同的「列」(鍵的數量)?

如果是這樣,你可以使用boto列出你想添加的每個對象,讀入它們並將它們相互結合。

from pyspark.sql import SQLContext 
import boto3 
from pyspark.sql.types import * 
sqlContext = SQLContext(sc) 

s3 = boto3.resource('s3') 
bucket = s3.Bucket('<bucket>') 

aws_secret_access_key = '<secret>' 
aws_access_key_id = '<key>' 

#Configure spark with your S3 access keys 
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_access_key_id) 
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_secret_access_key) 
object_list = [k for k in bucket.objects.all() ] 
key_list = [k.key for k in bucket.objects.all()] 

paths = ['s3n://'+o.bucket_name+'/'+ o.key for o in object_list ] 

dataframes = [sqlContext.read.json(path) for path in paths] 

df = dataframes[0] 
for idx, frame in enumerate(dataframes): 
    df = df.unionAll(frame) 

我是新來激發自己,所以我不知道是否有使用dataframes有很多的S3文件更好的辦法,但到目前爲止,這是爲我工作。

+0

StackOverflow是一個問答網站,不是論壇。所以我們喜歡把答案看成是解決方案,而不是充滿質疑的回答。特別是在回答老線程時適用,因爲OP不太可能讓你參與對話。請考慮重新編寫您的答案作爲確鑿的答案,並在必要時提出警告。 – APC 2015-11-20 22:18:52

3

以前的答案不會以分佈式方式讀取文件。爲此,您需要並行化s3鍵,然後在下面的flatMap步驟中讀入文件。

import boto3 
import json 
from pyspark.sql import Row 

def distributedJsonRead(s3Key): 
    s3obj = boto3.resource('s3').Object(bucket_name='bucketName', key=s3Key) 
    contents = json.loads(s3obj.get()['Body'].read().decode('utf-8')) 
    for dicts in content['interactions'] 
     yield Row(**dicts) 

pkeys = sc.parallelize(keyList) #keyList is a list of s3 keys 
dataRdd = pkeys.flatMap(distributedJsonRead) 

Boto3參考:http://boto3.readthedocs.org/en/latest/guide/quickstart.html