我寫了一個類,獲取DataFrame,做一些計算,並可以導出結果。數據框由鍵列表生成。我知道我是在一個非常效率不高的方式,現在這樣做:並行/避免在火花foreach循環
var l = List(34, 32, 132, 352) // Scala List
l.foreach{i =>
val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
val x = new MyClass(data) // initialize MyClass with new Object
x.setSettings(...)
x.calcSomething()
x.saveResults() // writes the Results into another Dataframe that is saved to HDFS
}
我覺得斯卡拉名單上的foreach不平行,所以怎麼能我儘量避免使用這裏的foreach? DataFrames的計算可能會並行進行,因爲計算結果不是爲下一個DataFrame輸入的 - 我如何實現這一點?
非常感謝你!
__edit:
什麼我試圖做的:
val l = List(34, 32, 132, 352) // Scala List
var l_DF:List[DataFrame] = List()
l.foreach{ i =>
DataContainer.getDataFrame(i)::l //append DataFrame to List of Dataframes
}
val rdd:DataFrame = sc.parallelize(l)
rdd.foreach(data =>
val x = new MyClass(data)
)
但給人
Invalid tree; null:
null
編輯2: 好吧,我不弄evrything是如何工作的引擎蓋下。 ...
1)一切工作正常,當我在火花外殼執行這個
spark-shell –driver-memory 10g
//...
var l = List(34, 32, 132, 352) // Scala List
l.foreach{i =>
val data:DataFrame = AllData.where($"a" === i) // get DataFrame
val x = new MyClass(data) // initialize MyClass with new Object
x.calcSomething()
}
2)錯誤,當我開始同與
spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
// same code as above
java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1263]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
3)當我嘗試並行化,我得到一個錯誤,太
spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
//...
var l = List(34, 32, 132, 352).par
// same code as above, just parallelized before calling foreach
// i can see the parallel execution by the console messages (my class gives some and they are printed out parallel now instead of sequentielly
scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.IllegalStateException: SparkContext has been shutdown
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
.
.
.
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:104)
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320)
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:104)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.makeBroadcastHashJoin(SparkStrategies.scala:92)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:104)
實際上有更超過10個執行者,但有4個節點。我從不配置spark-context。它已經在啓動時給出。
請提供錯誤完整的堆棧跟蹤。另外'DataContainer.getDataFrame(i):: l'行看起來不正確。 –