2016-08-23 75 views
0

我是Spark,Scala和Cassandra的新手。 使用Spark我想從MySQL獲取一些ID。Spark Shell:任務不可序列化

import org.apache.spark.rdd.JdbcRDD 
import java.sql.{Connection, DriverManager, ResultSet} 
Class.forName("com.mysql.jdbc.Driver").newInstance 

import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf 

val myRDD = new JdbcRDD(sc,() => DriverManager.getConnection(url,username,password) ,"select id from user limit ?, ?",1, 20, 10, r => r.getString("id")) ; 
myRDD.foreach(println) 

我能夠看到控制檯上打印的ID。

現在對於每個提取的ID,我需要對Cassandra中的表執行Sum操作。

我創建了我可以通過傳遞個人ID

object HelloWorld { 
     def sum(id : String): Unit = { 
     val each_spark_rdd = uplink_rdd.select("number").where("id=?",Id).as((c: Int) => c).sum 
     println(each_spark_rdd) 
     } 
    } 

調用一個函數,並宣佈uplink_rdd作爲

val uplink_rdd = sc.cassandraTable("keyspace", "table") 

我能夠通過將單獨的ID調用該函數,並且可以看到總和

scala> HelloWorld.sum("5") 
50 

當我試圖在每個fetc上運行相同的功能^ h id作爲

myRDD.map(HelloWorld.sum) 
or 
myRDD.foreach(HelloWorld.sum) 
or 
for (id <- myRDD) HelloWorld.sum(id) 

它賦予相同的例外,因爲例外

org.apache.spark.SparkException:任務不能序列在 org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner。 Scala:304) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clea n(SparkContext.scala:2055)at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:911)at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:910)at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala :111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:54) at $ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:59) at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ IWC萬國表$$ $$ iwC。(:61) at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:63) at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:65)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:67)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:69)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:71)at $ iwC $$ iwC $$:iwC $$ iwC。(:73)at $ iwC $$ iwC $$ iwC。(:75)at $ iwC $$ iwC。(:77) at $ iwC。(:79)at(:81 ): 。(:85)at。()at 。(:7)at。()at $ print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.Delegating MethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.spark.repl.SparkIMain $ ReadEvalPrint.call(SparkIMain。斯卡拉:1065) 在 org.apache.spark.repl.SparkIMain $ Request.loadAndRun(SparkIMain.scala:1346) 在 org.apache.spark.repl.SparkIMain.loadAndRunReq $ 1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl。 SparkILoop.reallyInterpret $ 1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala: 814)at org.apache.spark.repl.SparkILoop.processLine $ 1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop $ 1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org $ apache $ spark $ repl $ SparkILoop $$ loop(SparkILoop.scala :670) 在 org.apache.spark.repl.SparkILoop $$ anonfun $ $組織阿帕奇$火花$ REPL $ SparkILoop $$過程$ 1.適用$ MCZ $ SP(SparkILoop.scala:997) 在 組織。 apache.spark.repl.SparkILoop $$ anonfun $ org $ apache $ spark $ repl $ SparkILoop $$進程$ 1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop $$ anonfun $ org $ apache $ spark $ repl $ SparkILoop $$進程$ 1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader $ .savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org $ apache $ spark $ repl $ Sparkiloop $$進程(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main $ .main(Main.scala:31)at org.apache.spark.repl.Main.main(Main.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native方法)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java :498)at org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206) at org.apache。 spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)引發 作者:java.io.NotSerializableException:org.apache.spark.SparkConf

我嘗試添加到@Transient RDDS看完Apache Spark: "SparkException: Task not serializable" in spark-shell for RDD constructed manually

@transient val myRDD = new JdbcRDD ... 
@transient val uplink_rdd = sc.cassandra.... 

但仍然摹同樣的錯誤。

請讓我知道如何從Cassandara表中找到從Mysql中獲取的每個id的總和。

+0

的問題是,你基本上是試圖進行轉型中的作用 - 在星火變革和行動不能嵌套。當你調用'foreach'時,Spark試圖序列化'HelloWorld.sum'將它傳遞給每個執行者 - 但是爲了這樣做,它也必須序列化函數的閉包,其中包括'uplink_rdd'(並且這不是可序列化的)。 但是,當你發現自己試圖做這種事情時,通常只是表明你想要使用「join」或類似的東西。 – Alec

+0

你檢查了這個嗎? [鏈接](https://stackoverflow.com/questions/32661018/scala-spark-task-not-serializable?rq=1) – Cfuentes

回答

0

您的代碼正試圖在myRDD轉換中使用uplink_rdd。應用於RDD的閉包不能包含另一個RDD。

你應該做一些沿着joinWithCassandraTable的行,它將並行分佈(ly?)使用來自myRDD的信息從Cassandra中提取數據。這工作,如果你是從卡桑德拉

拉單分區鍵

the Docs

另一種選擇是使用手動連接從連接器使用的池提取。

val cc = CassandraConnector(sc.getConf) 
myRDD.mapPartitions { it => 
    cc.withSessionDo { session => 
    session.execute("whatever query you want") 
    } 
} 

如果你實際上是在多個分區上求和的卡桑德拉你需要 爲每個ID的新RDD。

喜歡的東西

myRDD.collect.foreach(HelloWorld.sum) 
相關問題