2017-04-15 185 views
0

我正在使用scala將json數據讀入spark數據框。 架構如下:使用scala從spark中的數組中抽取結構值

root 
|-- metadata: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- playerId: string (nullable = true) 
| | |-- sources: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- matchId: long (nullable = true) 

的數據如下所示:

{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 1 } ] }, { "playerId": "1235", "sources": [ { "matchId": 1 } ] } ] } 
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 2 } ] }, { "playerId": "1248", "sources": [ { "score": 12.2 , "matchId": 1 } ] } ] } 
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 3 } ] }, { "playerId": "1248", "sources": [ { "matchId": 3 } ] } ] } 

的目標是找出是否playerId是1234和matchId爲1,則返回isPlayed爲真。來源的結構不固定。可能有其他字段不是matchId。

我寫了一個UDF考慮對象元數據類型WrappedArray [字符串]和我能夠讀取playerId列

def hasPlayer = udf((metadata: WrappedArray[String], playerId: String) => { 
    metadata.contains(playerId) 
    }) 

df.withColumn("hasPlayer", hasPlayer(col("metadata"), col("superPlayerId"))) 

但我無法弄清楚如何查詢給出playerId的matchId領域。我嘗試將該字段作爲WrappedArray [WrappedArray [Long]]讀取,但它在metadata.sources.matchId列的withColumn中提供了類型轉換異常。

我對Spark比較新。任何幫助將深表感謝。

乾杯!

回答

0

當您處理JSON時,請了解內置函數explode,該函數將包含WrappedArray的單個單元格變成表示內部結構的多行。我認爲它有助於在這裏(兩次):

df.select(explode($"metadata").as("metadata")) 
    .select($"metadata.playerId", explode($"metadata.sources.matchId").as("matchId")) 
    .filter($"matchId".equalTo(1)) 
    .select($"matchId", lit(true).as("isPlayed")) 

基本上我用explode創建多個行(和重命名爲方便),瀏覽對象樹的JSON領域我想,重複explode /爲重命名過程在matchId,並篩選出所有不1這讓我終於用​​3210功能爲「硬編碼」的true值一個全新的列名爲isPlayed因爲這是不1消失了一切。

如果這不是你正在尋找的東西,希望它能給你一些指示。當你瞭解Spark時,functionslibrary對你很有幫助。

+0

嘿謝謝你的答案。這會是一個代價高昂的操作嗎?還有一種方法可以保留行數。爆炸我認爲會乘以陣列1中爆炸元素的數量*陣列2中爆炸元素的數量。將這個數據與原始表格連接起來效率高嗎? –

+0

它使用了所有更高級別的Spark功能,這是儘可能讓Catalyst優化事情的一種方法。但是,你爲什麼不試試看它是否「高效」? – Vidya