2017-02-18 97 views
12

我想一個DataFrame保存到HDFS使用DataFrameWriter木地板格式,三個值分割,就像這樣:如何在Spark中分區和寫入DataFrame而不刪除沒有新數據的分區?

dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path) 

正如this question提到的,partitionBy將在path刪除分區的全部現有層次結構和用dataFrame中的分區替換它們。由於特定日期的新增量數據將週期性地發佈,我想要的是隻替換dataFrame有數據的層次結構中的那些分區,而其他分區不變。

要做到這一點看來我需要保存每個分區單獨使用它的完整路徑,像這樣:

singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890") 

但是我無法理解來組織數據爲單分區的最好辦法DataFrame s,這樣我就可以用他們的完整路徑寫出來。一個想法是這樣的:

dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ... 

foreachPartitionIterator[Row]這是不理想的寫出來,以平面形式運行。

我還考慮使用select...distinct eventdate, hour, processtime來獲取分區列表,然後通過每個分區過濾原始數據幀並將結果保存到其完整分區路徑。但是,對於每個分區而言,獨特的查詢加上一個過濾器似乎並不是非常有效,因爲它會進行大量的過濾/寫入操作。

我希望有一個更清潔的方法來保存dataFrame沒有數據的現有分區?

感謝您的閱讀。

Spark版本:2.1

回答

0

您可以嘗試模式追加。

dataFrame.write.format("parquet") 
.mode("append") 
.partitionBy("year","month") 
.option("path",s"$path/table_name") 
.saveAsTable(s"stg_table_name") 
1

模式選項Append有一個趕上!

df.write.partitionBy("y","m","d") 
.mode(SaveMode.Append) 
.parquet("/data/hive/warehouse/mydbname.db/" + tableName) 

我測試過,看到這會保留現有的分區文件。然而,這次的問題如下:如果你運行相同的代碼兩次(使用相同的數據),那麼它將創建新的parquet文件,而不是用相同的數據替換現有的文件(Spark 1.6)。因此,我們仍然可以用Overwrite來解決這個問題,而不是使用Append。我們應該在分區級覆蓋,而不是在表級重寫。

df.write.mode(SaveMode.Overwrite) 
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day) 

請參閱以下鏈接瞭解更多信息:

Overwrite specific partitions in spark dataframe write method

(我suriyanto的評論後更新了我的答覆日Thnx。)

+0

你測試,如果當你寫的一樣數據兩次取代舊分區?從我的測試中,它實際上在分區目錄內創建了一個新的parquet文件,導致數據翻倍。我在Spark 2.2上。 – suriyanto