2017-05-29 89 views
0

我是新來的spark和scala。我想讀取包含json文件的目錄。該文件具有名爲「EVENT_NAME」的屬性,可以有20個不同的值。我需要根據屬性值分開事件。即EVENT_NAME = event_A事件。將這些寫入配置單元外部表結構中,例如:/ apps/hive/warehouse/db/event_A/dt = date/hour = hr火花數據框被寫入分區

這裏我有20個不同的表,用於所有事件類型和與每個事件相關的數據應該去到各自的桌子。 我已經設法編寫了一些代碼,但需要幫助才能正確寫入我的數據。

{ 
import org.apache.spark.sql._ 
import sqlContext._ 

val path = "/source/data/path" 
val trafficRep = sc.textFile(path) 

val trafficRepDf = sqlContext.read.json(trafficRep) 
trafficRepDf.registerTempTable("trafficRepDf") 

trafficRepDf.write.partitionBy("EVENT_NAME").save("/apps/hive/warehouse/db/sample") 
} 

最後一行創建分區輸出,但不是我確切需要它。請建議我怎樣才能得到它正確或任何其他代碼來做到這一點。

回答

1

我假設你的意思是你想保存數據放入單獨的目錄中,而不使用Spark/Hive的{column}={value}格式。

您將無法使用Spark的partitionBy,因爲Spark分區迫使您使用該格式。

相反,你要打破你的DataFrame成組成的分區,並保存它們一個接一個,像這樣:

{ 
    import org.apache.spark.sql._ 
    import sqlContext._ 

    val path = "/source/data/path" 
    val trafficRep = sc.textFile(path) 

    val trafficRepDf = sqlContext.read.json(trafficRep) 
    val eventNames = trafficRepDf.select($"EVENT_NAME").distinct().collect() // Or if you already know what all 20 values are, just hardcode them. 
    for (eventName <- eventNames) { 
    val trafficRepByEventDf = trafficRepDef.where($"EVENT_NAME" === eventName) 
    trafficRepByEventDf.write.save(s"/apps/hive/warehouse/db/sample/${eventName}") 
    } 
} 
0

我想,你希望像/apps/hive/warehouse/db/EVENT_NAME=xx/dt=yy/hour=zz表結構,那麼你需要通過EVENT_NAMEdthour分區,所以試試這個:

trafficRepDf.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample") 
+0

數據沒有日期和時間信息在裏面。我需要在外部提供它。 – Anup

1

您可以添加使用日期和時間列到您的數據幀。

import org.apache.spark.sql._ 
import sqlContext._ 

val path = "/source/data/path" 
val trafficRep = sc.textFile(path) 

val trafficRepDf = sqlContext.read.json(trafficRep) 
trafficRepDf.withColumn("dt", lit("dtValue")).withColumn("hour", lit("hourValue")) 

trafficRepDf.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample")