2
後約1分鐘當我做爲SPART流的工作,我可以看到,它的approximatelly1分鐘期間運行,然後它停止了與最終地位SUCCEEDED
:星火流被停止,而錯誤
16/11/16 18:58:16 INFO yarn.Client: Application report for application_XXXX_XXX (state: RUNNING)
16/11/16 18:58:17 INFO yarn.Client: Application report for application_XXXX_XXX (state: FINISHED)
我不明白爲什麼它會停下來,但我期望它運行一個未定義的時間,並由從卡夫卡隊列收到的消息觸發。在日誌中,我可以看到所有println
輸出,並且沒有錯誤。
這是從代碼中的一個片段:
val conf = new SparkConf().setAppName("MYTEST")
val sc = new SparkContext(conf)
sc.setCheckpointDir("~/checkpointDir")
val ssc = new StreamingContext(sc, Seconds(batch_interval_seconds))
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
println("Dividing the topic into partitions.")
val inputKafkaTopicMap = inputKafkaTopic.split(",").map((_, kafkaNumThreads)).toMap
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, inputKafkaTopicMap).map(_._2)
messages.foreachRDD(msg => {
msg.foreach(s => {
if (s != null) {
//val result = ... processing goes here
//println(result)
}
})
})
// Start the streaming context in the background.
ssc.start()
這是我命令:
/usr/bin/spark-submit --master yarn --deploy-mode cluster --driver-memory 10g --executor-memory 10g --num-executors 2 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC \
-XX:+AlwaysPreTouch" --class org.test.StreamingRunner test.jar param1 param2
當我打開資源管理器,我看有沒有工作是RUNNING
和火花流工作被標記爲FINISHED
。
我會嘗試註釋'rootLogger.setLevel(Level.ERROR)'以獲得更詳細的輸出。可能無論什麼是殺死你的工作沒有被標記爲'錯誤',所以它被過濾出日誌。此外,它看起來像你在代碼結束時錯過了對'ssc.awaitTermination'的調用。 –
@EricM:好的,謝謝。讓我測試它沒有這行代碼。我會在幾分鐘之內告訴你結果。 – duckertito
@EricM .:我試着用'ssc.awaitTermination'運行它,但是也有同樣的問題。但無論如何,讓我再次檢查它。 – duckertito