2017-10-11 76 views
2

有用例,我們要從S3中讀取具有JSON的文件。然後,基於特定的JSON節點值,我們希望將數據分組並將其寫入S3。將Apache Spark中的列按列分區到

我能夠讀取數據,但無法找到關於如何根據JSON密鑰對數據進行分區然後上傳到S3的很好示例。任何人都可以提供任何示例或指向我的教程,可以幫助我用這個用例嗎?

我有我的數據的架構創建數據幀後:

root 
|-- customer: struct (nullable = true) 
| |-- customerId: string (nullable = true) 
|-- experiment: string (nullable = true) 
|-- expiryTime: long (nullable = true) 
|-- partitionKey: string (nullable = true) 
|-- programId: string (nullable = true) 
|-- score: double (nullable = true) 
|-- startTime: long (nullable = true) 
|-- targetSets: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- featured: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- data: struct (nullable = true) 
| | | | | |-- asinId: string (nullable = true) 
| | | | |-- pk: string (nullable = true) 
| | | | |-- type: string (nullable = true) 
| | |-- reason: array (nullable = true) 
| | | |-- element: string (containsNull = true) 
| | |-- recommended: array (nullable = true) 
| | | |-- element: string (containsNull = true) 

我想分區基於CustomerID列隨機哈希的數據。但是,當我這樣做:

df.write.partitionBy("customerId").save("s3/bucket/location/to/save"); 

它給錯誤:

org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true)); 

請讓我知道我可以訪問客戶ID列。

回答

3

讓我們以實例數據集sample.json

{"CUST_ID":"115734","CITY":"San Jose","STATE":"CA","ZIP":"95106"} 
{"CUST_ID":"115728","CITY":"Allentown","STATE":"PA","ZIP":"18101"} 
{"CUST_ID":"115730","CITY":"Allentown","STATE":"PA","ZIP":"18101"} 
{"CUST_ID":"114728","CITY":"San Mateo","STATE":"CA","ZIP":"94401"} 
{"CUST_ID":"114726","CITY":"Somerset","STATE":"NJ","ZIP":"8873"} 

現在開始通過"ZIP"列星火盜號的

val jsonDf = spark.read 
    .format("json") 
    .load("path/of/sample.json") 

jsonDf.show() 

+---------+-------+-----+-----+ 
|  CITY|CUST_ID|STATE| ZIP| 
+---------+-------+-----+-----+ 
| San Jose| 115734| CA|95106| 
|Allentown| 115728| PA|18101| 
|Allentown| 115730| PA|18101| 
|San Mateo| 114728| CA|94401| 
| Somerset| 114726| NJ| 8873| 
+---------+-------+-----+-----+ 

然後分區數據集,並寫入S3

jsonDf.write 
    .partitionBy("ZIP") 
    .save("s3/bucket/location/to/save") 
    // one liner athentication to s3 
    //.save("s3n://$accessKey:$secretKey" + "@" + s"$buckectName/location/to/save") 

Note: In order this code successfully S3 access and secret key has to be configured properly. Check this answer for Spark/Hadoop integration with S3

編輯:分辨率:分區列的customerId不是在架構發現(按評論)

customerId內部存在customer結構,所以儘量提取customerId然後做分區。

df.withColumn("customerId", $"customer.customerId") 
    .drop("customer") 
    .write.partitionBy("customerId") 
    .save("s3/bucket/location/to/save")