2016-03-07 77 views
7

我們使用在AWS EMR 4.3.x上運行的Spark Streaming 1.6.0,它使用來自Kinesis流的數據。 用於在Spark 1.3.1中正常工作 遷移後,我們無法承受長時間的負載。神經節顯示羣集使用的內存不斷增長,直到達到某個極限時沒有GC。 之後,有幾個非常長的微批(幾十分鐘而不是幾秒)。 然後Spark開始查殺和彈跳執行者(一遍又一遍),Spark streaming 1.6.0 - 執行程序彈跳

基本上,集羣變得不可用。 這個問題可以在負載下多次重現。 如果Spark沒有殺死執行者,Spark會失敗的原因是什麼? 我們如何讓集羣運行數週(目前無法運行數小時)

歡迎任何輸入。

我們定義一個作業時使用以下定義:

sparkConf.set("spark.shuffle.consolidateFiles", "true"); 
sparkConf.set("spark.storage.memoryFraction", "0.5"); 
sparkConf.set("spark.streaming.backpressure.enabled", "true"); 
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 

做的

KinesisUtils.createStream(streamingContext, appName, 
         kinesisStreamName, kinesisEndpoint, awsRegionName, initialPositionInStream, checkpointInterval, 
         StorageLevel.MEMORY_AND_DISK_2()); 

我剝奪我們的應用程序的梗概爲測試工會。 保留從字節流到字符串流的映射,然後轉換爲對象,過濾出不相關的事件,然後保存並存儲到S3。

eventStream = eventStream.persist(StorageLevel.MEMORY_AND_DISK_SER_2());

eventStream = eventStream.repartition(configuration.getSparkOutputPartitions());eventStream.foreachRDD(new RddByPartitionSaverFunction <>(New OutputToS3Function()));

火花作業被提交具有以下配置(具有存儲器大小修改從默認火花配置複製):

spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' -XX:PermSize=256M -XX:MaxPermSize=256M 
spark.driver.extraJavaOptions -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' -XX:PermSize=512M -XX:MaxPermSize=512M 

添加異常。 1-ST集羣

16/03/06 13:54:52 WARN BlockManagerMaster: Failed to remove broadcast 1327 with removeFromMaster = true - Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout 
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout 
     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
     at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) 
     at scala.util.Try$.apply(Try.scala:161) 
     at scala.util.Failure.recover(Try.scala:185) 
     at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) 
     at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) 
     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
     at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) 
     at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) 
     at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
     at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
     at scala.concurrent.Promise$class.complete(Promise.scala:55) 
     at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) 
     at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) 
     at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) 
     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) 
     at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634) 
     at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) 
     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685) 
     at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
     at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
     at scala.concurrent.Promise$class.tryFailure(Promise.scala:112) 
     at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153) 
     at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds 
     at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242) 
     ... 7 more 
16/03/06 13:54:52 ERROR ContextCleaner: Error cleaning broadcast 1327 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout 
     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) 
     at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136) 
     at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228) 
     at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) 
     at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67) 
     at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180) 
     at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) 
     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173) 
     at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68) 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] 
     at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
     at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
     at scala.concurrent.Await$.result(package.scala:107) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) 
     ... 12 more 

16/03/06 13:55:04 ERROR YarnClusterScheduler: Lost executor 6 on ip-***-194.ec2.internal: Container killed by YARN for exceeding memory limits. 11.3 GB of 11.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 
1 
16/03/06 13:55:10 ERROR YarnClusterScheduler: Lost executor 1 on ip-***-193.ec2.internal: Container killed by YARN for exceeding memory limits. 11.3 GB of 11.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

第二個集羣的嘗試:

16/03/07 14:24:38 ERROR server.TransportChannelHandler: Connection to ip-***-22.ec2.internal/N.N.N.22:40791 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. 
16/03/07 14:24:38 ERROR client.TransportResponseHandler: Still have 12 requests outstanding when connection from ip-***-22.ec2.internal/N.N.N.22:40791 is closed 
16/03/07 14:24:38 ERROR netty.NettyBlockTransferService: Error while uploading block input-47-1457357970366 
java.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:745) 
16/03/07 14:24:38 ERROR netty.NettyBlockTransferService: Error while uploading block input-15-1457357969730 
java.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     a.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:745) 
t io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:745) 

在此先感謝...

+0

您是否找到解決方案或問題? –

+0

嗨。不,我們仍然沒有找到解決方案。 只有大量Kinesis碎片(120)才能重現此問題。 一旦執行者開始彈跳。 想法? 謝謝 – visitor

+0

執行器彈跳在Spark 2.0.0(EMR 5.0.0)中停止。有一個新問題阻止了同一個應用程序的長時間運行:http:// stackoverflow。COM /問題/ 39289345 /火花流-2-0-0-凍結,後數天,欠載 – visitor

回答

-2

我得到的同樣的東西。當一個微批花了超過120秒完成,它發射:

16/03/14 22:57:30 INFO SparkStreaming$: Batch size: 2500, total read: 4287800 
16/03/14 22:57:35 INFO SparkStreaming$: Batch size: 2500, total read: 4290300 
16/03/14 22:57:42 INFO SparkStreaming$: Batch size: 2500, total read: 4292800 
16/03/14 22:57:45 INFO SparkStreaming$: Batch size: 2500, total read: 4295300 
16/03/14 22:59:45 ERROR ContextCleaner: Error cleaning broadcast 11251 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout 
     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) 
     at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136) 
     at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228) 
     at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) 
     at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67) 
     at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180) 
     at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) 
     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173) 
     at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68) 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] 
     at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
     at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
     at scala.concurrent.Await$.result(package.scala:107) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) 
     ... 12 more 

我在本地模式下運行並消耗f rom Kinesis。我也沒有使用任何廣播變量。