2016-11-16 109 views
2

後約1分鐘當我做​​爲SPART流的工作,我可以看到,它的approximatelly1分鐘期間運行,然後它停止了與最終地位SUCCEEDED星火流被停止,而錯誤

16/11/16 18:58:16 INFO yarn.Client: Application report for application_XXXX_XXX (state: RUNNING) 
16/11/16 18:58:17 INFO yarn.Client: Application report for application_XXXX_XXX (state: FINISHED) 

我不明白爲什麼它會停下來,但我期望它運行一個未定義的時間,並由從卡夫卡隊列收到的消息觸發。在日誌中,我可以看到所有println輸出,並且沒有錯誤。

這是從代碼中的一個片段:

val conf = new SparkConf().setAppName("MYTEST") 
val sc = new SparkContext(conf) 
sc.setCheckpointDir("~/checkpointDir") 

val ssc = new StreamingContext(sc, Seconds(batch_interval_seconds)) 

val rootLogger = Logger.getRootLogger() 
rootLogger.setLevel(Level.ERROR) 

println("Dividing the topic into partitions.") 
val inputKafkaTopicMap = inputKafkaTopic.split(",").map((_, kafkaNumThreads)).toMap 
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, inputKafkaTopicMap).map(_._2) 

messages.foreachRDD(msg => { 
    msg.foreach(s => { 
    if (s != null) { 
     //val result = ... processing goes here 
     //println(result) 
    } 
    }) 
}) 

// Start the streaming context in the background. 
ssc.start() 

這是我​​命令:

/usr/bin/spark-submit --master yarn --deploy-mode cluster --driver-memory 10g --executor-memory 10g --num-executors 2 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC \ 
-XX:+AlwaysPreTouch" --class org.test.StreamingRunner test.jar param1 param2 

當我打開資源管理器,我看有沒有工作是RUNNING和火花流工作被標記爲FINISHED

+0

我會嘗試註釋'rootLogger.setLevel(Level.ERROR)'以獲得更詳細的輸出。可能無論什麼是殺死你的工作沒有被標記爲'錯誤',所以它被過濾出日誌。此外,它看起來像你在代碼結束時錯過了對'ssc.awaitTermination'的調用。 –

+0

@EricM:好的,謝謝。讓我測試它沒有這行代碼。我會在幾分鐘之內告訴你結果。 – duckertito

+0

@EricM .:我試着用'ssc.awaitTermination'運行它,但是也有同樣的問題。但無論如何,讓我再次檢查它。 – duckertito

回答

0

您的代碼缺少對ssc.awaitTermination的調用來阻止驅動程序線程。

不幸的是,在控制檯上看到map函數內部的打印輸出並不容易,因爲這些函數調用發生在YARN執行程序中。 Cloudera Manager雖然提供了一個體面的日誌,但如果您確實需要在驅動程序上收集這些日誌,則可以在HDFS中寫入一個位置,然後自己從中擦除各種日誌。如果您想跟蹤的信息是純數字的,您可能還會考慮使用Accumulator