2016-09-28 157 views
1

我有一個看起來像這樣我已經用下面的代碼進口spark.implicits._未使用

val SomeCsv = spark.read.option("header", "true"). 
    csv(conf.getString("data.path.Somecsv")).toDF() 

我有一個函數(不執行任何到目前爲止)創建了一個數據幀。

def cleanUp(data: sql.DataFrame): sql.DataFrame = { 
    data.map({ 
    doc => 
     (
     doc 

     ) 
    }) 
} 

這對彙編中斷與錯誤:

「無法找到存儲在數據集型編碼器原始類型(int,字符串等)和產品類型(case類)上受支持導入spark.implicits._「

我有設置爲其他職位建議的導入語句。

​​

import語句是的IntelliJ

標記爲未使用我的猜測是,

1)的CSV加載代碼使用了一些編碼器是一個對象,而不是原語。

2.)和/或我需要在我的函數語句中指定數據框的數據類型,就像你對RDD的操作一樣?我在Spark文檔中找不到關於此的任何信息。

編輯

如果我改用

val SomeOtherCsv = SomeCsv.map(t => t(0) + "foobar") 

import語句觸發器,一切都很好地編譯。我現在的問題是相同數據的方法版本(上面)仍然中斷。

EDIT2

這裏是MCVE

import org.apache.spark._ 
import org.apache.spark.SparkContext 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql._/*statement unused*/ 
import com.typesafe.config.ConfigFactory 

object main { 
    def main(args: Array[String]) = { 
    /*load spark conf*/ 
    val sparkConf = new SparkConf().setAppName("main") 
    val sc = new SparkContext(sparkConf) 
    /*load configure tool*/ 
    val conf = ConfigFactory.load() 
    /*load spark session*/ 
    val spark = SparkSession.builder. 
     master("local") 
     .appName("tester") 
     .getOrCreate() 
    import spark.implicits._/* is used for val ProcessedGenomeCsv but not testFunction*/ 
    /*load genome csv as dataframe, conf.getString points to application.conf which contains a local directory for the csv file*/ 
    val GenomeCsv = spark.read.option("header", "true"). 
     csv(conf.getString("data.path.genomecsv")).toDF() 
    /*cleans up segment names in csv so the can be matched to amino data*/ 
    def testFunctionOne(data: sql.DataFrame): sql.DataFrame = {/* breaks with import spark.implicits._ error, error points to next line "data.map"*/ 
     data.map({ 
     doc => 
      (
      doc 

      ) 
     }) 
    } 
    val ProcessedGenomeCsv = GenomeCsv.map(t => t(12) + "foobar")/* breaks when adding sqlContext and sqlContext.implicits._, is fine otherwise*/ 
    val FunctionProcessedGenomCsv = testFunctionOne(GenomeCsv) 
    ProcessedGenomeCsv.take(1).foreach(println) 
    FunctionProcessedGenomCsv.take(1).foreach(println) 
    } 
} 
+0

你能提供一個MVCE,我們可以試着幫忙嗎? http://stackoverflow.com/help/mcve – eliasah

+0

補充,謝謝。我新來堆棧 – Tengansui

+0

你有沒有嘗試在def中添加導入? –

回答

0

你想sqlContext.implicits._

要聲明它創建sqlContext(在火花已創建後 - 殼,但不在火花提交)

你想它看起來像這樣:

object Driver { 
    def main(args: Array[String]):Unit = { 
     val spark_conf = 
      new SparkConf() 
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
      .setAppName("Spark Tika HDFS") 
     val sc = new SparkContext(spark_conf) 

     import sqlContext.implicits._ 

     val df = .... 

    } 
} 
+0

使用導入sqlContext.implicits._不會修復編譯錯誤,現在也打破了我上面添加的csv.map函數。 我相信這是因爲sqlContext贊成SparkSession的方法已過時(我使用的) 我剛剛注意到,在線csv.map功能觸發器現在進口spark.implicits._聲明,這很好。當我將它作爲一種方法編寫時,它並不起作用。 – Tengansui