2016-12-02 100 views
0

到JSON我有一個pairRDD看起來像火花/斯卡拉字符串中的地圖

(1, {"id":1, "picture": "url1"}) 
(2, {"id":2, "picture": "url2"}) 
(3, {"id":3, "picture": "url3"}) 
... 

,其中第二個元素是一個字符串,我是從功能的get()從http://alvinalexander.com/scala/how-to-write-scala-http-get-request-client-source-fromurl了。這裏是該功能:

@throws(classOf[java.io.IOException]) 
@throws(classOf[java.net.SocketTimeoutException]) 
def get(url: String, 
     connectTimeout: Int = 5000, 
     readTimeout: Int = 5000, 
     requestMethod: String = "GET") = 
{ 
    import java.net.{URL, HttpURLConnection} 
    val connection = (new URL(url)).openConnection.asInstanceOf[HttpURLConnection] 
    connection.setConnectTimeout(connectTimeout) 
    connection.setReadTimeout(readTimeout) 
    connection.setRequestMethod(requestMethod) 
    val inputStream = connection.getInputStream 
    val content = io.Source.fromInputStream(inputStream).mkString 
    if (inputStream != null) inputStream.close 
    content 
} 

現在我想將該字符串轉換爲json以從中獲取圖片url。 (從這個https://stackoverflow.com/a/38271732/1456026

val step2 = pairRDD_1.map({case(x,y)=>{ 
val jsonStr = y 
val rdd = sc.parallelize(Seq(jsonStr)) 
val df = sqlContext.read.json(rdd) 
(x,y("picture")) 
}}) 

但我經常收到

異常線程 「main」 org.apache.spark.SparkException:任務不 序列化

當我打印出前20個元素,並嘗試將字符串轉換爲json,然後手動將它們一個接一個地外部化。

val rdd = sc.parallelize(Seq("""{"id":1, "picture": "url1"}""")) 
val df = sqlContext.read.json(rdd) 
println(df) 
>>>[id: string, picture: string] 

如何將字符串轉換爲spark/scala中的json inside .map?

回答

0

通常,當您看到此消息時,這是因爲您正在使用地圖函數中的資源(讀取匿名函數),該資源是在其外部定義的,並且無法序列化。

以集羣模式運行時,匿名函數將在不同的機器上運行。在這個獨立的機器上,你的應用程序的一個新實例被實例化,並且它的狀態(變量/值/ etc)由驅動程序序列化併發送到新實例的數據設置。如果你的匿名函數是一個閉包(即利用它範圍之外的變量),那麼這些資源必須是可序列化的,以便發送給工作節點。

例如,map函數可能會嘗試使用數據庫連接來獲取RDD中每個記錄的一些信息。該數據庫連接僅在創建它的主機上有效(當然,從網絡的角度來看),這通常是驅動程序,所以它不能被序列化,發送和從不同的主機使用。在這個特定的例子中,你會做一個mapPartitions()來實例化一個來自工作者本身的數據庫連接,然後map()該分區中的每個記錄來查詢數據庫。

如果沒有完整的代碼示例,我無法提供更多幫助,以查看哪些潛在值或變量無法序列化。

1

在分佈式操作中不能使用SparkContext。在上面的代碼中,您無法訪問map操作pairRDD_1上的SparkContext。

考慮使用JSON庫來執行轉換。

0

其中一個答案是使用json4s庫。 來源:http://muster.json4s.org/docs/jawn_codec.html

//case class defined outside main() 
case class Pictures(id: String, picture: String) 

// import library 
import muster._ 
import muster.codec.jawn._ 

// here all the magic happens 
val json_read_RDD = pairRDD_1.map({case(x,y) => 
     { 
      val json_read_to_case_class = JawnCodec.as[Pictures](y) 
      (x, json_read_to_case_class.picture) 
    }}) 

// add to build.sbt 
libraryDependencies ++= Seq(
"org.json4s" %% "muster-codec-json" % "0.3.0", 
"org.json4s" %% "muster-codec-jawn" % "0.3.0") 

學分去特拉維斯HEGNER,誰解釋了爲什麼原來的代碼沒有工作 和安東Okolnychyi使用JSON庫的建議。