我是Spark,Scala和Cassandra的新手。 使用Spark我想從MySQL獲取一些ID。Spark Shell:任務不可序列化
import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}
Class.forName("com.mysql.jdbc.Driver").newInstance
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
val myRDD = new JdbcRDD(sc,() => DriverManager.getConnection(url,username,password) ,"select id from user limit ?, ?",1, 20, 10, r => r.getString("id")) ;
myRDD.foreach(println)
我能夠看到控制檯上打印的ID。
現在對於每個提取的ID,我需要對Cassandra中的表執行Sum操作。
我創建了我可以通過傳遞個人ID
object HelloWorld {
def sum(id : String): Unit = {
val each_spark_rdd = uplink_rdd.select("number").where("id=?",Id).as((c: Int) => c).sum
println(each_spark_rdd)
}
}
調用一個函數,並宣佈uplink_rdd作爲
val uplink_rdd = sc.cassandraTable("keyspace", "table")
我能夠通過將單獨的ID調用該函數,並且可以看到總和
scala> HelloWorld.sum("5")
50
當我試圖在每個fetc上運行相同的功能^ h id作爲
myRDD.map(HelloWorld.sum)
or
myRDD.foreach(HelloWorld.sum)
or
for (id <- myRDD) HelloWorld.sum(id)
它賦予相同的例外,因爲例外
org.apache.spark.SparkException:任務不能序列在 org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner。 Scala:304) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clea n(SparkContext.scala:2055)at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:911)at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:910)at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala :111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:54) at $ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:59) at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ IWC萬國表$$ $$ iwC。(:61) at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:63) at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:65)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:67)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:69)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:71)at $ iwC $$ iwC $$:iwC $$ iwC。(:73)at $ iwC $$ iwC $$ iwC。(:75)at $ iwC $$ iwC。(:77) at $ iwC。(:79)at(:81 ): 。(:85)at。()at 。(:7)at。()at $ print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.Delegating MethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.spark.repl.SparkIMain $ ReadEvalPrint.call(SparkIMain。斯卡拉:1065) 在 org.apache.spark.repl.SparkIMain $ Request.loadAndRun(SparkIMain.scala:1346) 在 org.apache.spark.repl.SparkIMain.loadAndRunReq $ 1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl。 SparkILoop.reallyInterpret $ 1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala: 814)at org.apache.spark.repl.SparkILoop.processLine $ 1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop $ 1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org $ apache $ spark $ repl $ SparkILoop $$ loop(SparkILoop.scala :670) 在 org.apache.spark.repl.SparkILoop $$ anonfun $ $組織阿帕奇$火花$ REPL $ SparkILoop $$過程$ 1.適用$ MCZ $ SP(SparkILoop.scala:997) 在 組織。 apache.spark.repl.SparkILoop $$ anonfun $ org $ apache $ spark $ repl $ SparkILoop $$進程$ 1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop $$ anonfun $ org $ apache $ spark $ repl $ SparkILoop $$進程$ 1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader $ .savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org $ apache $ spark $ repl $ Sparkiloop $$進程(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main $ .main(Main.scala:31)at org.apache.spark.repl.Main.main(Main.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native方法)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java :498)at org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206) at org.apache。 spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)引發 作者:java.io.NotSerializableException:org.apache.spark.SparkConf
我嘗試添加到@Transient RDDS看完Apache Spark: "SparkException: Task not serializable" in spark-shell for RDD constructed manually爲
@transient val myRDD = new JdbcRDD ...
@transient val uplink_rdd = sc.cassandra....
但仍然摹同樣的錯誤。
請讓我知道如何從Cassandara表中找到從Mysql中獲取的每個id的總和。
的問題是,你基本上是試圖進行轉型中的作用 - 在星火變革和行動不能嵌套。當你調用'foreach'時,Spark試圖序列化'HelloWorld.sum'將它傳遞給每個執行者 - 但是爲了這樣做,它也必須序列化函數的閉包,其中包括'uplink_rdd'(並且這不是可序列化的)。 但是,當你發現自己試圖做這種事情時,通常只是表明你想要使用「join」或類似的東西。 – Alec
你檢查了這個嗎? [鏈接](https://stackoverflow.com/questions/32661018/scala-spark-task-not-serializable?rq=1) – Cfuentes