2015-10-06 45 views
0

我是Spark的新手,任何人都可以幫助我嗎?Spark羣集上的DStrream [String] .foreachrdd

def streamStart() { 
val sparkConf = new SparkConf().setAppName("kafkaStreamingNew!!").setMaster("spark://husnain:7077").setJars(Array("/home/husnain/Downloads/ScalaWorkspace/KafkaStreaming/target/KafkaStreaming-1.1.0-jar-with-dependencies.jar")) //,"/home/husnain/.m2/repository/org/apache/spark/spark-streaming-kafka_2.10/1.4.1/spark-streaming-kafka_2.10-1.4.1.jar" , "/home/husnain/.m2/repository/org/apache/spark/spark-streaming_2.10/1.4.1/spark-streaming_2.10-1.4.1.jar" ,"/home/husnain/.m2/repository/org/apache/spark/spark-core_2.10/1.4.1/spark-core_2.10-1.4.1.jar")) 
val ssc = new StreamingContext(sparkConf, Seconds(1)) 

val topics = "test"; 
ssc.checkpoint("checkpoint") 
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark", Map("test" -> 1)).map(_._2) 
lines.print() 
println("*****************************************************************************") 
lines.foreachRDD(
    iter => iter.foreach(
    x => println(x + "\n***-------------------------------------------------------***\n"))) 
println("-----------------------------------------------------------------------------") 
ssc.start() 
ssc.awaitTermination() 

在一個Spark獨立的集羣,代碼不工作,但對當地[*],它工作正常:

lines.foreachRDD(
    iter => iter.foreach(
    x => println(x + "\n***-------------------------------------------------------***\n") 
    ) 
    ) 
+2

什麼意思_it不WORK_?或者_works正確地處理這件事。 – zero323

+0

我懷疑你是以錯誤的方式提交你的應用程序。你在使用spark-submit嗎?從代碼提交應用程序到集羣是相當不可能的(事實上非常棘手)。看看這裏:http://spark.apache.org/docs/latest/submitting-applications.html –

+0

@dwysakowicz是的,我通過spark-submit –

回答

0

我認爲什麼它被稱爲「正常工作」是你看到控制檯上的println

當您向集羣提交相同的代碼時,控制檯的println在每個執行程序本地發生,所以如果其他所有操作都在工作,則缺少輸出僅僅是分佈式執行的結果。

查找範圍執行人的輸出爲那些println小號

+0

提交作業我在這裏寫了'println'來縮短代碼。我的實際任務是在進行一些計算後將傳入的數據保存到hbase中。在本地它的作品(意味着成功保存到hbase),但在火花獨立羣集模式下它不起作用:( –

+0

你提到hbase ..你確定你使用正確的配置嗎?你是在代碼中指定連接參數還是通過* .xml?在工作人員可能不會選擇正確的配置。 –