2017-04-07 60 views
0

在我的Spark應用程序中。我從兩個Kafka主題創建兩個DStream。我這樣做,因爲我需要以不同的方式處理兩個DStream。以下是代碼示例:在Spark Streaming中創建兩個來自Kafka的DStream主題不起作用

object KafkaConsumerTest3 { 
    var sc:SparkContext = null 
    def main(args: Array[String]) { 



    Logger.getLogger("org").setLevel(Level.OFF); 
    Logger.getLogger("akka").setLevel(Level.OFF); 

    val Array(zkQuorum, group, topics1, topics2, numThreads) = Array("localhost:2181", "group3", "test_topic4", "test_topic5","5") 
    val sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[2]") 
    sc = new SparkContext(sparkConf) 
    val ssc = new StreamingContext(sc, Seconds(2)) 


    val topicMap1 = topics1.split(",").map((_, numThreads.toInt)).toMap 
    val topicMap2 = topics2.split(",").map((_, numThreads.toInt)).toMap 

    val lines2 = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap2).map(_._2) 
    val lines1 = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap1).map(_._2) 

    lines2.foreachRDD{rdd => 
     rdd.foreach { println }} 

    lines1.foreachRDD{rdd => 
     rdd.foreach { println }} 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

這兩個主題都可能有也可能沒有數據。在我的情況下,第一個主題目前沒有獲取數據,但第二個主題正在獲得。但我的火花應用程序不打印任何數據。也沒有例外。 有什麼我失蹤?或者我如何解決這個問題。

+0

它是否適用於單流? – Natalia

+0

是的..它可以與一個流一起使用。 – Alok

+0

@Alok:你可以嘗試打印在foreachRDD方法裏面的rdd.count? – Shankar

回答

0

發現上述代碼的問題。問題是我們使用master作爲local [2],並且我們註冊了兩個receiver。增加線程的數量解決了這個問題。

相關問題