2016-08-23 528 views
0

在我的應用程序中,我從S3上不同位置的數據創建不同的數據幀,然後嘗試將數據幀合併到單個數據幀中。現在我正在爲此使用for循環。但我有一種感覺,這可以通過在pyspark中使用map和reduce功能的更有效的方式來完成。這裏是我的代碼:將for循環與map並行化並使用pyspark減少火花

from pyspark import SparkConf, SparkContext 
from pyspark.sql import SQLContext, GroupedData 
import pandas as pd 
from datetime import datetime 


sparkConf = SparkConf().setAppName('myTestApp') 
sc = SparkContext(conf=sparkConf) 
sqlContext = SQLContext(sc) 

filepath = 's3n://my-s3-bucket/report_date=' 

date_from = pd.to_datetime('2016-08-01',format='%Y-%m-%d') 
date_to = pd.to_datetime('2016-08-22',format='%Y-%m-%d') 
datelist = pd.date_range(date_from, date_to) 

First = True 

#THIS is the for-loop I want to get rid of 
for dt in datelist: 
    date_string = datetime.strftime(dt, '%Y-%m-%d') 
    print('Running the pyspark - Data read for the date - '+date_string) 
    df = sqlContext.read.format("com.databricks.spark.csv").options(header = "false", inferschema = "true", delimiter = "\t").load(filepath + date_string + '/*.gz') 

    if First: 
     First=False 
     df_Full = df 
    else: 
     df_Full = df_Full.unionAll(df) 
+0

你有使用Spark的限制嗎?如果沒有,你考慮使用dask來代替? dask被設計成其他很好的東西來實現你想要做的事 – Boud

+0

我需要使用spark,因爲在這之後,我將在其中運行Spark algos。而且數據的大小也相當大。 – nishant

+0

http://dask.pydata.org/zh/latest/spark.html – Boud

回答

1

其實迭代union,儘管次優,是不是最大的問題在這裏。模式推理引入了更爲嚴重的問題(inferschema = "true")。

它不僅使數據幀的創建不是懶惰,而且還需要單獨的數據掃描來進行推斷。如果你知道了架構前,你應該將它作爲一個參數爲DataFrameReader

schema = ... 

df = sqlContext.read.format("com.databricks.spark.csv").schema(schema) 

否則,你可以從第一DataFrame提取出來。結合良好的調優並行性,它應該工作得很好,但如果你獲取的文件數量很大,你應該考慮比迭代聯合有點更聰明的方法。你會在我對Spark union of multiple RDDs的回答中找到一個例子。它比較昂貴,但具有更好的綜合性能。

關於你的想法,不可能在分佈式數據結構上嵌套操作,所以如果你想讀取map中的數據,你將不得不直接使用S3客戶端而不使用SQLContext