2016-05-12 99 views
0

我試圖在我正在編寫的一個scala應用程序中創建一個DataFrame時遇到了問題。Spark 1.6.1:從RDD創建DataFrame [Array [Error]]

問題我遇到的問題是,編譯scala退出的錯誤是toDF不是RDD的一部分。我已經看到了一些答案,提示在sqlContext聲明之後將case類定義移出main並導入implicits,但即使這樣也不適用於我。

這是我目前有:

import scala.collection.mutable.ArrayBuffer 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.sql._ 

object ErrorParser { 

    case class Error(time: String, status: String, statusType: String, host: String, message: String) 

    def splitError(line: String) : Array[String] = { 

     var array:Array[String] = new Array[String](5) 

     ... 

     return array 

    } 

    def filterErrors(errors: Array[Array[String]]) : Array[Array[String]] = { 

     var filteredErrors = ArrayBuffer[Array[String]]() 

     ... 

     return filteredErrors.toArray 
    } 

    def main(args: Array[String]) { 

     val conf = new SparkConf().setAppName("ErrorParserAPI") 
     val sc = new SparkContext(conf) 

     val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
     import sqlContext.implicits._ 

     var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*") 
     var errors = logs.filter(line => line.contains("ERROR")) 

     val errors1 = errors.map(line => splitError(line)) 
     val filteredErrors = filterErrors(errors1.collect) 

     val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4))) 
     val filteredRDD = sc.parallelize(dfErrors) 
     var errorDF = filteredRDD.toDF() 

     errorDF.write.json("hdfs://hadoop-master:9000/results/errorParserResult") 

    } 

} 

我在火花殼的事情,因爲難倒像這樣工作。

我也看到了一些答案暗示改變RDD到RDD的一個實例[行],然後使用

sc.createDataFrame(rdd, scheme) 

但我不能換我周圍的頭,我怎麼會到處去這樣做。

任何幫助將不勝感激!

這是我的名.bst文件:

name := "ErrorParserAPI" 
version := "1.0" 
scalaVersion := "2.11.7" 
libraryDependencies ++= Seq(
     "org.apache.spark" % "spark-core_2.10" % "1.6.1", 
     "org.apache.spark" % "spark-sql_2.10" % "1.6.1" 
) 

編輯:一個錯字

+0

您是否嘗試將對象'ErrorParser'的錯誤'定義移出?不管問題出在哪裏,這是個好主意,因爲內部類捕獲外部範圍,這可能會導致序列化問題。另外,儘量避免使用像'filterErrors(errors1.collect)'這樣的代碼 - 最好以predicate的形式編寫filterErrors,並像'errors'.filter(isError)' –

+0

一樣使用它'感謝filterErrors實現,將嘗試移動案例類,看看它如何去 – Xranna

+0

悲哀地移動案例類對象的外部沒有解決問題@VitaliyKotlyarenko – Xranna

回答

0

我只是複製你的代碼,並在我的日食粘貼及其工作正常,沒有任何編譯錯誤。如果你使用eclipse,你可以嘗試清理和刷新你的項目。

import scala.Array.canBuildFrom 
import scala.collection.mutable.ArrayBuffer 
import scala.reflect.runtime.universe 

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 

object ErrorParser { 


    def filterErrors(errors: Array[Array[String]]): Array[Array[String]] = { 

    var filteredErrors = ArrayBuffer[Array[String]]() 

    return filteredErrors.toArray 
    } 

    def main(args: Array[String]) { 



    val conf = new SparkConf().setAppName("ErrorParserAPI") 
    val sc = new SparkContext(conf) 

    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*") 
    var errors = logs.filter(line => line.contains("ERROR")) 

    val errors1 = errors.map(line => splitError(line)) 
    val filteredErrors = filterErrors(errors1.collect) 

    val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4))) 
    val filteredRDD = sc.parallelize(dfErrors) 
    var errorDF = filteredRDD.toDF() 
    } 

    case class Error(time: String, status: String, statusType: String, host: String, message: String) 

    def splitError(line: String): Array[String] = { 

    var array: Array[String] = new Array[String](5) 

    return array 

    } 
} 
+0

在eclipse中建立.jar文件,然後將其部署到火花工作。仍然不知道核心問題是什麼,但我現在將它標記爲答案。 – Xranna

相關問題