2016-05-29 56 views
4

我想在火花數據幀寫入到HDFS的位置,我希望,如果我加入了「partitionBy」符號火花將創建分區 (類似於實木複合地板的格式書寫) 文件夾的形式爲「partition_column_name = partition_value」 (即partition_date=2016-05-03)。 這樣做,我跑以下命令:寫火花數據幀爲CSV與分區

df.write.partitionBy('partition_date').mode('overwrite').format("com.databricks.spark.csv").save('/tmp/af_organic')

但尚未創建的分區的文件夾 任何想法要高度重視我爲了做火花DF自動創建這些文件夾?

感謝,

回答

13

星火2.0.0+

內置csv格式支持分區開箱即用,所以你應該能夠簡單地使用:

df.write.partitionBy('partition_date').mode(mode).format("csv").save(path) 

不包括任何額外的包裹

星火< 2.0.0

此時(V1.4.0)spark-csv不支持partitionBy(見),但你可以調整內置信號源,實現你想要什麼。

您可以嘗試兩種不同的方法。假設你的數據是相對簡單(沒有複雜的字符串和需要的字符轉義),看起來或多或少是這樣的:

df = sc.parallelize([ 
    ("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1) 
]).toDF(["k", "x1", "x2", "x3"]) 

您可以手動寫入準備值:

from pyspark.sql.functions import col, concat_ws 

key = col("k") 
values = concat_ws(",", *[col(x) for x in df.columns[1:]]) 

kvs = df.select(key, values) 

,並使用text寫源

kvs.write.partitionBy("k").text("/tmp/foo") 

df_foo = (sqlContext.read.format("com.databricks.spark.csv") 
    .options(inferSchema="true") 
    .load("/tmp/foo/k=foo")) 

df_foo.printSchema() 
## root 
## |-- C0: integer (nullable = true) 
## |-- C1: double (nullable = true) 
## |-- C2: double (nullable = true) 

在更復雜的情況下,你可以嘗試使用正確的CSV解析器預處理值以類似的方式,或者通過使用UDF或超過映射RDD,但它將顯着更昂貴。

如果CSV格式不是硬性要求,您還可以使用JSON作家支持partitionBy外的開箱:

df.write.partitionBy("k").json("/tmp/bar") 

以及在讀取分區發現。

2

我建議使用下列內容:

df = your dataframe object 
df.coalesce(n).write.csv('name_of_outputfolder',header=True) 

其中n是分區的數量。

這應該做的伎倆。讓我知道事情的後續!

+0

它根本沒有解決這個問題。 – zero323