星火2.0.0+:
內置csv格式支持分區開箱即用,所以你應該能夠簡單地使用:
df.write.partitionBy('partition_date').mode(mode).format("csv").save(path)
不包括任何額外的包裹。
星火< 2.0.0:
此時(V1.4.0)spark-csv
不支持partitionBy
(見),但你可以調整內置信號源,實現你想要什麼。
您可以嘗試兩種不同的方法。假設你的數據是相對簡單(沒有複雜的字符串和需要的字符轉義),看起來或多或少是這樣的:
df = sc.parallelize([
("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1)
]).toDF(["k", "x1", "x2", "x3"])
您可以手動寫入準備值:
from pyspark.sql.functions import col, concat_ws
key = col("k")
values = concat_ws(",", *[col(x) for x in df.columns[1:]])
kvs = df.select(key, values)
,並使用text
寫源
kvs.write.partitionBy("k").text("/tmp/foo")
df_foo = (sqlContext.read.format("com.databricks.spark.csv")
.options(inferSchema="true")
.load("/tmp/foo/k=foo"))
df_foo.printSchema()
## root
## |-- C0: integer (nullable = true)
## |-- C1: double (nullable = true)
## |-- C2: double (nullable = true)
在更復雜的情況下,你可以嘗試使用正確的CSV解析器預處理值以類似的方式,或者通過使用UDF或超過映射RDD,但它將顯着更昂貴。
如果CSV格式不是硬性要求,您還可以使用JSON作家支持partitionBy
外的開箱:
df.write.partitionBy("k").json("/tmp/bar")
以及在讀取分區發現。
它根本沒有解決這個問題。 – zero323