2016-09-14 70 views
1

將應用程序exe寫入sysDF.write.partitionBy,併成功寫出第一個parquet文件。但在此之後,應用程序掛起,所有執行者死亡,直到發生一些加班。行動碼是如下:使用DF寫出時Spark作業掛起

import sqlContext.implicits._ 

val systemRDD = basicLogRDD.map(basicLog => if (basicLog.isInstanceOf[SystemLog]) basicLog.asInstanceOf[SystemLog] else null).filter(_ != null) 
val sysDF = systemRDD.toDF() 
sysDF.write.partitionBy("appId").parquet(outputPath + "/system/date=" + dateY4M2D2) 

val customRDD = basicLogRDD.map(basicLog => if (basicLog.isInstanceOf[CustomLog]) basicLog.asInstanceOf[CustomLog] else null).filter(_ != null) 
val customDF = customRDD.toDF() 
customDF.write.partitionBy("appId").parquet(outputPath + "/custom/date=" + dateY4M2D2) 

val illegalRDD = basicLogRDD.map(basicLog => if (basicLog.isInstanceOf[IllegalLog]) basicLog.asInstanceOf[IllegalLog] else null).filter(_ != null) 
val illegalDF = illegalRDD.toDF() 
illegalDF.write.partitionBy("appId").parquet(outputPath + "/illegal/date=" + dateY4M2D2) 
+0

你能提供一些更多的信息,那裏有多少行,還有多少不同的'appId'值? –

+0

有大約1億行和500個'appId' –

回答

0

首先,在地圖上可以與濾波器,這應該優化查詢位被組合:

val rdd = basicLogRDD.cache() 

rdd.filter(_.isInstanceOf[SystemLog]).write.partitionBy("appId").parquet(outputPath + "/system/date=" + dateY4M2D2) 
rdd.filter(_.isInstanceOf[CustomLog]).write.partitionBy("appId").parquet(outputPath + "/custom/date=" + dateY4M2D2) 
rdd.filter(_.isInstanceOf[IllegalLog]).write.partitionBy("appId").parquet(outputPath + "/illegal/date=" + dateY4M2D2) 

首先,它是緩存一個好主意因爲它被多次使用。 .cache()運營商將keep the RDD in memory。 其次,不需要明確地將RDD轉換爲DataFrame,因爲它是implicitly converted to a DataFrame的暗示,允許使用Parquet存儲它(您需要定義import sqlContext.implicits._)。

+0

感謝您的建議! –