2015-08-28 132 views
2

我試圖創建是在S3的EMR星火羣集上CSV源DataFrame,使用Databricks spark-csv包和flights dataset火花:火花CSV時間過長

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('s3n://h2o-airlines-unpacked/allyears.csv') 

df.first() 

這確實不會終止在4個羣組中。我正在尋找建議,以在PySpark中從S3上的CSV文件創建DataFrame。另外,我曾嘗試將文件放在HDFS上,並從HFDS中讀取,但這也不會終止。該文件不是太大(12 GB)。

+0

如果spark-csv lib是1.2.0+版本,你可以嘗試將'parserLib'選項設置爲'univocity'嗎? – rchukh

+1

@rchukh是不是默認?今天已經從主人那裏建立了罐子。編輯:不,它不是。將嘗試。 – tchakravarty

回答

1

對於只讀12GB的行爲良好的csv文件,您可以將其複製到所有工人和驅動程序機器上,然後手動分割「,」。這可能不會解析任何RFC4180 csv,但它解析了我所擁有的。

  • 當您申請集羣時,爲每個工作人員添加至少12GB的輔助磁盤空間。
  • 使用至少具有12GB RAM的機器類型,例如c3.2xlarge。如果您不打算保持閒置狀態,並且可以承受較大的費用,則可以擴大規模。更大的機器意味着更少的磁盤文件複製即可開始。我經常在現貨市場上以每小時0.50美元的價格看到c3.8xlarge。

將文件複製到每個工人的每個工人的相同目錄中。這應該是物理連接的驅動器,即每臺計算機上的不同物理驅動器。

請確保您在驅動程序機器上也有相同的文件和目錄。

raw = sc.textFile("/data.csv") 

print "Counted %d lines in /data.csv" % raw.count() 

raw_fields = raw.first() 
# this regular expression is for quoted fields. i.e. "23","38","blue",... 
matchre = r'^"(.*)"$' 
pmatchre = re.compile(matchre) 

def uncsv_line(line): 
    return [pmatchre.match(s).group(1) for s in line.split(',')] 

fields = uncsv_line(raw_fields) 

def raw_to_dict(raw_line): 
    return dict(zip(fields, uncsv_line(raw_line))) 

parsedData = (raw 
     .map(raw_to_dict) 
     .cache() 
     ) 

print "Counted %d parsed lines" % parsedData.count() 

parsedData將類型的字典,其中,所述類型的字典的鍵是從第一行的CSV的字段名稱的RDD,並且值是當前行的CSV值。如果您的CSV數據中沒有標題行,這可能並不適合您,但應該清楚,您可以覆蓋讀取第一行的代碼並手動設置字段。

請注意,這對於創建數據框架或註冊Spark SQL表格並不是很有用。但是對於其他任何東西,都可以,如果需要將其轉儲到Spark SQL中,則可以進一步提取並將其轉換爲更好的格式。

我在沒有任何問題的情況下在一個7GB文件上使用它,除了我已經刪除了一些過濾器邏輯來檢測有效數據,這些數據的副作用是從解析數據中刪除標題。您可能需要重新實現一些過濾。

+0

Paul,感謝您的評論,並且我很欣賞您已經嘗試回答我所問的內容。但是,你會建議一個不同的方法嗎?就像先讀HDFS中的CSV到Hive中並從Hive表中創建DataFrame一樣?考慮到基礎設施什麼是讀取12 GB文件作爲DataFrame的最佳方式? – tchakravarty

+0

對不起,我們現在根本就不用這裏的HDFS/Hive基礎設施,所以沒有意見。 – Paul

+0

Paul,請您提供一些關於如何獲取RDD的字典並製作DataFrame的具體信息? – tchakravarty