2

我有一個場景,我需要使用for循環並行觸發許多sql查詢,並將結果列表收集到ListBuffer中。但是,我收到很多運行循環的錯誤,結果並不完整。爲了舉例我犯了一個非常簡單的可重複的例子:spark 2.0並行JobProgressListener失敗慘劇

import scala.collection.mutable.ListBuffer 
val dummy = List("a","b").toDF.createOrReplaceTempView("df") 
spark.catalog.cacheTable("df") 
val dig = (0 to 9).par 
var counter = 0:Int 
var results = ListBuffer[List[org.apache.spark.sql.Row]]() 

for (i1 <- dig) { 
    for (i2 <- dig) { 
    for (i3 <- dig) { 
     println("||==="+i1+"=="+i2+"=="+i3+"===="+(i1*100+i2*10+i3*1)+"===="+counter+"=======||") 
     counter +=1 
     results += spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"','"+(i1*100+i2*10+i3*1)+"','"+counter+"',* from df ").collect().toList 
     } 
    } 
    } 
results(0).take(2).foreach(println) 
results.size 
results.flatten.size 

上述代碼簡單地從0計數到999,每個計數插入到一個ListBuffer的2行的列表。與比較

運行代碼的結果與「串行」計數器值沿着表:

||===9==8==3====983====969=======|| 
||===9==8==5====985====969=======|| 
||===9==8==1====981====969=======|| 
||===9==8==2====982====969=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 784 
||===9==8==7====987====974=======|| 
||===5==8==9====589====975=======|| 
||===9==8==4====984====976=======|| 
||===9==8==6====986====976=======|| 
||===9==8==9====989====977=======|| 
||===9==8==8====988====977=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 773 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 790 
||===5==9==0====590====980=======|| 
||===5==9==2====592====980=======|| 
||===5==9==5====595====980=======|| 
||===5==9==1====591====980=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 795 
||===5==9==3====593====984=======|| 
||===5==9==7====597====985=======|| 
||===5==9==8====598====985=======|| 
||===5==9==6====596====987=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 798 
||===5==9==9====599====988=======|| 
||===5==9==4====594====989=======|| 
||===9==9==0====990====990=======|| 
||===9==9==5====995====991=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 784 
||===9==9==2====992====992=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 789 
||===9==9==3====993====993=======|| 
||===9==9==1====991====994=======|| 
||===9==9==4====994====995=======|| 
||===9==9==7====997====996=======|| 
||===9==9==8====998====997=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 790 
||===9==9==6====996====998=======|| 
||===9==9==9====999====999=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 805 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 798 

scala> results(0).take(2).foreach(println) 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 802 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 805 
[trial,0,0,0,0,16,a] 
[trial,0,0,0,0,16,b] 

scala> results.size 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 839 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 840 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 839 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 842 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 855 
res3: Int = 1000 

scala> results.flatten.size 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 860 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 854 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 860 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 868 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 874 
res4: Int = 2000 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 882 

scala> 

[Stage 589:=(28 + 0)/28][Stage 590:>(27 + 1)/28][Stage 591:>(20 + 7)/28]16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 888 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 895 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 898 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 898 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 905 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 906 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 907 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 902 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 905 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 913 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 915 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 916 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 913 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 920 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 942 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 946 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 942 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 946 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 948 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 956 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 952 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 965 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 965 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 966 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 976 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 976 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 990 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 999 


scala> 

,這些都是但一些WARN我得到。

你可以看到,計數器「步履蹣跚」有時

**這就是麻煩開始**警告

很多,但results.size=1000results.flatten.size = 2000預期。

但是試圖算到10000以同樣的方式會導致更警告:

import scala.collection.mutable.ListBuffer 
val dummy = List("a","b").toDF.createOrReplaceTempView("df") 
spark.catalog.cacheTable("df") 
val dig = (0 to 9).par 
var counter = 0:Int 
var results = ListBuffer[List[org.apache.spark.sql.Row]]() 

for (i1 <- dig) { 
    for (i2 <- dig) { 
    for (i3 <- dig) { 
     for (i4 <- dig) { 
     println("||==="+i1+"=="+i2+"=="+i3+"=="+i4+"===="+(i1*1000+i2*100+i3*10+i4*1)+"===="+counter+"=======||") 
     counter +=1 
     results += spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',* from df ").collect().toList 
     } 
    } 
    } 
} 
results(0).take(2).foreach(println) 
results.size 
results.flatten.size 

輸出:

16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8797 
||===0==9==4==3====943====9998=======|| 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8799 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8801 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8802 
||===0==9==4==4====944====9999=======|| 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8803 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8804 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8805 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8806 

,結果:

scala> results(0).take(2).foreach(println) 
[trial,3,0,0,0,3000,7,a] 
[trial,3,0,0,0,3000,7,b] 

scala> results.size 
res3: Int = 9999 

scala> results.flatten.size 
res4: Int = 19998 

它缺少一個值。

我請你來試試下面的代碼計數100000:

import scala.collection.mutable.ListBuffer 
val dummy = List("a","b").toDF.createOrReplaceTempView("df") 
spark.catalog.cacheTable("df") 
val dig = (0 to 9).par 
var counter = 0:Int 
var results = ListBuffer[List[org.apache.spark.sql.Row]]() 

for (i0 <- dig) { 
    for (i1 <- dig) { 
    for (i2 <- dig) { 
     for (i3 <- dig) { 
     for (i4 <- dig) { 
      println("============="+i0+"=="+i1+"=="+i2+"=="+i3+"=="+i4+"===="+(i0*10000+i1*1000+i2*100+i3*10+i4*1)+"===="+counter+"=========") 
      counter +=1 
      results += spark.sql("select 'trial','"+i0+"','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i0*10000+i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',* from df ").collect().toList 
     } 
     } 
    } 
    } 
} 

不僅我得到噸運行期間ofJobProgressListener警告,結果是不完整和不確定性:

scala> results(0).take(2).foreach(println) 
[trial,8,5,0,0,0,85000,13,a] 
[trial,8,5,0,0,0,85000,13,b] 

scala> results.size 
res3: Int = 99999 

scala> results.flatten.size 
res4: Int = 192908 

上我的真實生活中的例子我經常在運行的隨機點出現「spark.sql.execution.id已設置」異常

我該如何解決這個問題?

我已經試過

spark.conf.set("spark.extraListeners","org.apache.spark.scheduler.StatsReportListener,org.apache.spark.scheduler.EventLoggingListener") 

和閱讀Spark 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id is already set

Apache Spark: network errors between executors

http://docs.scala-lang.org/overviews/parallel-collections/overview.html有關影響的操作,但它似乎太多的方向有。

,似乎最相關的這個問題恕我直言,這個錯誤是https://issues.apache.org/jira/browse/SPARK-10548 本該是在火花1.6

解決

誰能提供關於解決這種情況下一些提示?我的現實世界案例的複雜性與100000計數相似,並在隨機階段執行時失敗。

我部署一個GCS dataproc集羣

gcloud dataproc clusters create clusTest --zone us-central1-b --master-machine-type n1-highmem-16 --num-workers 2 --worker-machine-type n1-highmem-8 --num-worker-local-ssds 2 --num-preemptible-workers 8 --scopes 'https://www.googleapis.com/auth/cloud-platform' --project xyz-analytics

a screenshot of spark UI durning run

+0

什麼版本星火的是你嗎? –

+0

google gcs上的spark 2.0 –

+0

java.lang.IllegalArgumentException:spark.sql.execution.id已經設置爲 at org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:81) at org。 apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) 在org.apache.spark.sql.Dataset.org $ $阿帕奇火花$ $ SQL數據集執行$$ $ 1(Dataset.scala:2182)在 org.apache.spark.sql.Dataset $$ anonfun $ $組織阿帕奇$火花$ SQL $數據集$$收集$ 1.適用(Dataset.scala:2187) 在... –

回答

2

結果是不完整和不確定性

非確定性部分應該給出提示。你在進入競爭狀態的同時將結果添加到你的ListBuffer(它不是真正的線程安全並行更新,所以如果你運行足夠長時間,最終會失去一些結果。)

我在本地嘗試了它,可能會重現這個不完整的結果問題。只需添加一個同步塊來添加到緩衝區就可以完成結果。您也可以使用其他​​數據結構爲你的工作,所以你不必把一個明確的​​塊例如java.util.concurrent.ConcurrentLinkedQueue什麼的。

所以下面的解決了這個問題:

for (i1 <- dig) { 
    for (i2 <- dig) { 
    for (i3 <- dig) { 
     for (i4 <- dig) { 
     counter +=1 
     val result = spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',* from df ").collect().toList 
     synchronized { 
      results += result 
     } 
     } 
    } 
    } 
} 

至於「spark.sql.execution.id已設置」例外:我無法與上面給出的例子重現它。 (但是,我在本地Spark上運行上述代碼。)它是否可以在本地安裝上重現?

相關問題