2015-07-19 145 views
1

我有一個Spark SQL將我的S3 JSON文件讀入DataFrame中。Spark SQL read.json讀取JSON輸入兩次

然後我在該DataFrame上運行2個SQL,並在執行每個SQL之前發現SparkSQL讀取我的S3 JSON文件兩次。

如果數據框對象不被重用,這將是非常昂貴的...

任何幫助表示讚賞。

這裏是我的代碼片段:

protected boolean doAggregations() throws IOException { 

    SQLContext sqlContext = getSQLContext(); 

    DataFrame edgeDataFrame = sqlContext.read().json(sourceDataDirectory); 


    edgeDataFrame.cache(); 

    getLogger().info("Registering and caching the table 'edgeData'"); 
    edgeDataFrame.registerTempTable("edgeData"); 

    String dateKey = DateTimeUtility.SECOND_FORMATTER.print(System.currentTimeMillis()); 

    for (Map.Entry<String, AggregationMetadata> entry : aggMetadataMap.entrySet()) { 
     String aggName = entry.getKey(); 
     String resultDir = getAggregationResultDirectory(aggName, dateKey); 
     String sql = entry.getValue().getSql(); 
     // The input file(s) are being read again and again instead of operating on the "edgeDataFrame" 
     DataFrame dataFrame = sqlContext.sql(sql); 
     dataFrame.write().format("json").save(resultDir); 
    } 
    return true; 
} 

回答

0

您的JSON文件被讀出兩次,因爲星火不知道JSON的架構和SQL需要一個已知的模式。因此,Spark採用了兩步法:

  1. 發現所有JSON記錄的模式爲每個JSON記錄模式的聯合。

  2. 將數據加載到適當配置的數據結構中。

想象一下,你有簡單的一行JSON文件:

{"category" : "A", "num" : 5} 

如果您在spark-shell執行

sqlContext.read.json(path).saveAsTable("test")

你會發現兩遍。

第一遍有一個映射階段,它收集每個分區發現的模式,reduce階段將模式組合爲所有分區的聯合模式。

對於map階段,你會看到類似這樣的:

​​

對於減少階段,你會看到類似這樣的:

INFO SparkContext: Starting job: reduce at JsonRDD.scala:54 

之後,當模式是已知的, JSON數據的實際加載將開始。這隻會涉及映射階段,因爲一旦發現模式,就不需要在分區處理器之間共享信息。

你可以看到星火如何對待在日誌中的數據列:

INFO ColumnChunkPageWriteStore: written 56B for [category] BINARY: 1 values, 11B raw, 29B comp, 1 pages, encodings: [PLAIN, RLE, BIT_PACKED] 
INFO ColumnChunkPageWriteStore: written 70B for [num] INT64: 1 values, 14B raw, 29B comp, 1 pages, encodings: [PLAIN, RLE, BIT_PACKED]