2014-08-30 77 views
0

我想在Scala中編寫簡單的Spark代碼。如何在DStream中應用RDD函數,同時在scala中編寫代碼

這裏我得到一個DStream。我成功地能夠打印此DStream。但是當我試圖在此DStream上執行任何種類的「foreach」,「foreachRDD」或「transform」函數時,在執行我的程序期間,我的控制檯正在凍結。在這裏,我沒有收到任何錯誤,但是直到我手動終止eclipse控制檯操作時,控制檯才變得無響應。我在這裏附上代碼。請告訴我我做錯了什麼。

我的主要目標是在DStream上應用RDD操作,並根據我的知識使用「foreach」,「foreachRDD」或「transform」函數將我的DStream轉換爲RDD。

我已經通過使用Java實現了相同。但在斯卡拉我有這個問題。

是否有其他人面臨同樣的問題?如果沒有,那麼請幫助我。由於

我寫的樣本代碼在這裏

object KafkaStreaming { 
    def main(args: Array[String]) { 
     if (args.length < 4) { 
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") 
      System.exit(1) 
     } 

     val Array(zkQuorum, group, topics, numThreads) = args 
       val ssc = new StreamingContext("local", "KafkaWordCount", Seconds(2)) 
     val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap 
     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) 
     val splitLines:DStream[String] = lines.flatMap(_.split("\n")) 

     val pairAlarm = splitLines.map(

       x=>{ 
          //Some Code 
          val alarmPair = new Tuple2(key, value) 
          alarmPair 
       } 

       ) 


      //pairAlarm.print 


      pairAlarm.foreachRDD(x=>{ 
      println("1 : "+x.first) 
      x.collect       // When the execution reaching this part its getting freeze 
      println("2: "+x.first) 
      }) 


       ssc.start() 
       ssc.awaitTermination() 
    } 
} 

回答

3

我不知道這是不是你的問題,但我也有類似的一個。我的程序經過幾次迭代後才停止打印。 5-6次打印後,沒有例外情況等停止打印。

更改此:

val ssc = new StreamingContext("local", "KafkaWordCount", Seconds(2)) 

這樣:

val ssc = new StreamingContext("local[2]", "KafkaWordCount", Seconds(2)) 

解決了這個問題。 Spark至少需要2個線程才能運行,並且文檔示例也有誤導性,因爲它們也僅使用local

希望這會有所幫助!

+0

謝謝Serejja。在我看到你的答案後,我意識到了這個錯誤。這真是一個非常愚蠢的問題。 :D謝謝 – 2014-09-01 10:23:47