2016-11-19 670 views
2

我想在foreachPartition中使用SparkContext和SQLContext,但由於序列化錯誤無法完成。我知道,無論對象是不可序列化,但我認爲foreachPartition在主服務器,其中兩個星火語境和SQLContext可供執行..如何在foreachPartition中使用SQLContext和SparkContext

符號:

`msg -> Map[String,String]` 
`result -> Iterable[Seq[Row]]` 

這是我當前的代碼(UtilsDM是extends Serializable的對象)。代碼失敗的部分從val schema =...開始,我想將result寫入DataFrame,然後將其保存到Parquet。也許我組織代碼的方式效率不高,那麼我想在這裏提出你的建議。謝謝。

// Here I am creating df from parquet file on S3 
val exists = FileSystem.get(new URI("s3n://" + bucketNameCode), sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages)) 
var df: DataFrame = null 
if (exists) { 
    df = sqlContext 
    .read.parquet("s3n://bucket/pathToParquetFile") 
} 
UtilsDM.setDF(df) 

// Here I process myDStream 
myDStream.foreachRDD(rdd => { 
    rdd.foreachPartition{iter => 
    val r = new RedisClient(UtilsDM.getHost, UtilsDM.getPort) 
    val producer = UtilsDM.createProducer 
    var df = UtilsDM.getDF 
    val result = iter.map{ msg => 
     // ... 
     Seq(msg("key"),msg("value")) 
    } 

    // HERE I WANT TO WRITE result TO S3, BUT IT FAILS 
    val schema = StructType(
        StructField("key", StringType, true) :: 
        StructField("value", StringType, true) 

    result.foreach { row => 
     val rdd = sc.makeRDD(row) 
     val df2 = sqlContext.createDataFrame(rdd, schema) 

     // If the parquet file is not created, then create it 
     var df_final: DataFrame = null 
     if (df != null) { 
      df_final = df.unionAll(df2) 
     } else { 
      df_final = df2 
     } 
     df_final.write.parquet("s3n://bucket/pathToSentMessages) 
} 
    } 
}) 

編輯:

我使用星火1.6.2和Scala 2.10.6。

+0

這火花的版本? – mrsrinivas

+0

@MRSrinivas:我使用Spark 1.6.2和Scala 2.10.6。對不起,不提。 – duckertito

回答

2

這是不可能的。 SparkContextSQLContextSparkSession只能在驅動程序上使用。您可以在foreachRDD頂層使用sqlContext:

myDStream.foreachRDD(rdd => { 
    val df = sqlContext.createDataFrame(rdd, schema) 
    ... 
}) 

不能在轉型/動作使用它:

myDStream.foreachRDD(rdd => { 
    rdd.foreach { 
     val df = sqlContext.createDataFrame(...) 
     ... 
    } 
}) 

你可能想的相同的:

myDStream.foreachRDD(rdd => { 
    val foo = rdd.mapPartitions(iter => doSomethingWithRedisClient(iter)) 
    val df = sqlContext.createDataFrame(foo, schema) 
    df.write.parquet("s3n://bucket/pathToSentMessages) 
}) 
+0

好的,謝謝。這意味着,而不是'foreachPartition',我應該使用'mapPartitions'來返回'result'?你能告訴我該怎麼做嗎? – duckertito

+0

它應該是這樣嗎?它不爲我編譯:'myDStream.foreachRDD(rdd => {val finalResult = rdd.mapPartitions(iter => val r = new RedisClient(UtilsDM.getHost,UtilsDM.getPort)val result = iter.map {msg = > ...}))})' – duckertito

+0

我不完全理解你的代碼,但無論你做什麼,你都不能在RDD.foreach,RDD.map,RDD.mapPartitions等內部使用sqlContext – 2016-11-19 09:31:53

0

我發現,使用現有的SparkContext(假設我已經創建了一個sparkContext事先SC)內的循環工作,即您使用

// this works 
stream.foreachRDD(_ => { 
    // update rdd 
    .... = SparkContext.getOrCreate().parallelize(...) 
}) 

// this doesn't work - throws a SparkContext not serializable error 
stream.foreachRDD(_ => { 
    // update rdd 
    .... = sc.parallelize(...) 
}) 
相關問題