2016-06-28 136 views
3

我寫了一個類,獲取DataFrame,做一些計算,並可以導出結果。數據框由鍵列表生成。我知道我是在一個非常效率不高的方式,現在這樣做:並行/避免在火花foreach循環

var l = List(34, 32, 132, 352)  // Scala List 

l.foreach{i => 
    val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame 
    val x = new MyClass(data)      // initialize MyClass with new Object 
    x.setSettings(...) 
    x.calcSomething() 
    x.saveResults()        // writes the Results into another Dataframe that is saved to HDFS 
} 

我覺得斯卡拉名單上的foreach不平行,所以怎麼能我儘量避免使用這裏的foreach? DataFrames的計算可能會並行進行,因爲計算結果不是爲下一個DataFrame輸入的 - 我如何實現這一點?

非常感謝你!

__edit:

什麼我試圖做的:

val l = List(34, 32, 132, 352)  // Scala List 
var l_DF:List[DataFrame] = List() 
l.foreach{ i => 
    DataContainer.getDataFrame(i)::l  //append DataFrame to List of Dataframes 
} 

val rdd:DataFrame = sc.parallelize(l) 
rdd.foreach(data => 
    val x = new MyClass(data) 
) 

但給人

Invalid tree; null: 
null 

編輯2: 好吧,我不弄evrything是如何工作的引擎蓋下。 ...

1)一切工作正常,當我在火花外殼執行這個

spark-shell –driver-memory 10g  
//... 
var l = List(34, 32, 132, 352)  // Scala List 

l.foreach{i => 
    val data:DataFrame = AllData.where($"a" === i) // get DataFrame 
    val x = new MyClass(data)      // initialize MyClass  with new Object 
    x.calcSomething() 
} 

2)錯誤,當我開始同與

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g 
// same code as above 
java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1263] 
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) 
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) 
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
    at scala.concurrent.Promise$class.complete(Promise.scala:55) 
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) 
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) 
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) 
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
    at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) 
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) 
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
    at scala.concurrent.Promise$class.complete(Promise.scala:55) 
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) 
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) 
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) 
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 

3)當我嘗試並行化,我得到一個錯誤,太

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g 
//... 
var l = List(34, 32, 132, 352).par 
// same code as above, just parallelized before calling foreach 
// i can see the parallel execution by the console messages (my class gives some and they are printed out parallel now instead of sequentielly 

scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.IllegalStateException: SparkContext has been shutdown 
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816) 
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) 
org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215) 
    org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) 
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) 
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) 
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) 
. 
. 
. 

java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext     org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:104) 
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320) 
    org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:104) 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59) 
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) 
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.makeBroadcastHashJoin(SparkStrategies.scala:92) 
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:104) 

實際上有更超過10個執行者,但有4個節點。我從不配置spark-context。它已經在啓動時給出。

+0

請提供錯誤完整的堆棧跟蹤。另外'DataContainer.getDataFrame(i):: l'行看起來不正確。 –

回答

2

您可以使用scala的parallel collections在驅動程序端實現foreach並行度。

val l = List(34, 32, 132, 352).par 
l.foreach{i => // your code to be run in parallel for each i} 

*然而,謹慎的說法是:您的集羣能夠並行運行作業嗎?您可以並行地將作業提交給您的Spark集羣,但最終可能會在羣集中排隊並按順序執行。

+0

謝謝!我正在使用的集羣有幾個執行程序。這是最有效的方式嗎?我的解決方案做什麼(請參閱編輯) – johntechendso

+1

請從Spark文檔查看此內容 - http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application 以下是相關報價: 「默認情況下,Spark的調度程序以FIFO方式運行作業。[...]從Spark 0開始。8,也可以配置作業之間的公平分享。在公平分享下,Spark以「循環」方式在作業之間分配任務,以便所有作業獲得大致相等的羣集資源份額。 要啓用公平調度器,只需配置SparkContext時設置spark.scheduler.mode物業公平。」 –

+0

您是否使用了火花獨立集羣或紗? –

0

您可以使用scala的Future和Spark Fair Scheduling,例如,

import scala.concurrent._ 
import scala.concurrent.duration._ 
import ExecutionContext.Implicits.global 

object YourApp extends App { 
    val sc = ... // SparkContext, be sure to set spark.scheduler.mode=FAIR 
    var pool = 0 
    // this is to have different pools per job, you can wrap it to limit the no. of pools 
    def poolId = { 
    pool = pool + 1 
    pool 
    } 
    def runner(i: Int) = Future { 
    sc.setLocalProperty("spark.scheduler.pool", poolId) 
    val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame 
    val x = new MyClass(data)      // initialize MyClass with new Object 
    x.setSettings(...) 
    x.calcSomething() 
    x.saveResults() 
    } 

    val l = List(34, 32, 132, 352)  // Scala List 
    val futures = l map(i => runner(i)) 

    // now you need to wait all your futures to be completed 
    futures foreach(f => Await.ready(f, Duration.Inf)) 

} 

使用FairScheduler和不同的池,每個併發作業將擁有Spark集羣資源的公平份額。

有關斯卡拉未來的一些參考文獻here。您可能需要在完成,成功和/或失敗時添加必要的回調。