2017-07-17 65 views
1

我是新來的火花。我有一些json數據來作爲HttpResponse。我需要將這些數據存儲在配置單元表中。每個HttpGet請求都會返回一個json,它將成爲表中的單個行。因此,我不得不將單行寫入配置單元表目錄中的文件。遞歸添加行到數據框

但是我覺得有太多的小文件會降低速度和效率。那麼是否有一種方法可以遞歸地將新行添加到Dataframe並一次性將其寫入到配置單元表目錄中。我覺得這也會減少我的spark代碼的運行時間。

實施例:

for(i <- 1 to 10){ 
newDF = hiveContext.read.json("path") 
df = df.union(newDF) 
} 
df.write() 

我明白dataframes是不可變的。有沒有辦法做到這一點?

任何幫助,將不勝感激。謝謝。

回答

1

你大部分都是在正確的軌道上,你想要做的是獲得多個單個記錄作爲Seq[DataFrame],然後通過聯合將它們減少爲單個DataFrame

從代碼去你提供:

val BatchSize = 100 
val HiveTableName = "table" 

(0 until BatchSize). 
map(_ => hiveContext.read.json("path")). 
reduce(_ union _). 
write.insertInto(HiveTableName) 

另外,如果你要執行的HTTP請求,你走了,我們也可以這樣做。讓我們假設你有一個確實的HTTP請求,並將其轉換成數據幀的功能:

def obtainRecord(...): DataFrame = ??? 

你可以做線沿線的東西:

val HiveTableName = "table" 
val OtherHiveTableName = "other_table" 
val jsonArray = ??? 

val batched: DataFrame = 
    jsonArray. 
    map { parameter => 
     obtainRecord(parameter) 
    }. 
    reduce(_ union _) 
batched.write.insertInto(HiveTableName) 
batched.select($"...").write.insertInto(OtherHiveTableName) 
+0

感謝您的回答。我正在努力實現這一點。爲了發佈我的get請求,我需要一個json數組的每個元素的參數(之前提取)。那麼,是否有更好的方法來實現for循環,以便我可以有一個隨着每次迭代而增加的變量(該變量用於訪問數組中每個元素的參數)? –

+0

你只需要一個從0到任何增量的索引? –

+0

我剛剛更新了答案,我認爲更好地反映了你正在嘗試做的事情。這假定您想要一次處理抓取的JSON數組(並寫入單個文件)。您也可以先拆分JSON數組,或者連接多個JSON數組,具體取決於您想要執行的操作。 –

0

你明顯濫用了Spark。 Apache Spark是分析系統,而不是數據庫API。像這樣使用Spark來修改Hive數據庫沒有任何好處。它只會帶來嚴重的性能損失,不會受益於任何Spark特性,包括分佈式處理。

相反,您應該直接使用Hive客戶端來執行事務操作。

+0

我很抱歉,但你明白我的問題?我正在尋找將我從Json獲得的數據框保存到配置單元表目錄。我不想使用spark來修改Hive表/數據庫。 –

+0

這不是問題的答案;它應該是對問題的評論。 – TriskalJM

0

如果您可以批量下載所有數據(例如使用curl或其他程序的腳本)並將其存儲在文件中(或多個文件,spark可以一次加載整個目錄),您可以然後將該文件(或多個文件)一次全部加載到spark中以進行處理。我還會檢查webapi作爲任何端點來獲取所需的所有數據,而不是一次只記錄一條記錄。

+0

感謝您的回答。你給的是一個好方法。但是,我無法一次從API獲取所有數據。我需要將數據框保存爲json文件。是否有可能創建一個包含所有記錄的json文件(使用java)?如果是,我可以將該文件讀入我的數據框。 –

+0

@HemanthAnnavarapu是的,這就是我所建議的,只是提前將所有數據下載到一個文件或幾個文件。類似於'curl -w「\ n」[url] >> [jsonfile]'在所有網址的循環中都應該有效。您也可以使用java或scala或任何其他您喜歡的語言進行下載。 – puhlen