2017-05-04 221 views
1

我正在嘗試從kafka火花流的基本示例。我很新火花並沒有什麼經驗。卡夫卡流火花不減少計數

我的程序如下(在Apache的火花的例子複製):

if (args.length < 4) { 
     System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); 
     System.exit(1); 
    } 

    String zkQuorum = args[0]; 
    String groupId = args[1]; 
    String topicsToListen = args[2]; 
    String numOfThread = args[3]; 

    StreamingExamples.setStreamingLogLevels(); 
    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); 
    // Create the context with 2 seconds batch size 
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 

    int numThreads = Integer.parseInt(numOfThread); 
    Map<String, Integer> topicMap = new HashMap<>(); 
    String[] topics = topicsToListen.split(","); 
    for (String topic : topics) { 
     topicMap.put(topic, numThreads); 
    } 

    JavaPairReceiverInputDStream<String, String> messages = 
      KafkaUtils.createStream(jssc, zkQuorum, groupId, topicMap); 

    JavaDStream<String> lines = messages.map(Tuple2::_2); 

    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); 

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) 
      .reduceByKey((i1, i2) -> i1 + i2); 

    wordCounts.print(); 
    jssc.start(); 
    jssc.awaitTermination(); 

然後我開始我的卡夫卡經紀人和生產以下命令運行內置的jar:

$ SPARK_HOME/bin/spark-submit --class「JavaKafkaWordCount」--master local [2] PATH_TO_JAR/kafka-spark-streaming-1.0-SNAPSHOT -jar -with-dependencies.jar localhost:2181 test-consumer組測試1

當我產生從卡夫卡製片我期待已發佈多次以增加單詞的計數的一些話,但我看到的是1打印的每一個新單詞和計數發佈:

(你好,1)

我期待數增加時,我發佈了同一個詞不止一次,

(你好,2)

但這並沒有發生。我究竟在這裏理解了什麼錯誤,這與我傳遞給他的論點或者工作的目的有什麼關係?

有人可以提供一些見解嗎?

感謝 沙比爾

回答

0

讀代碼幾次之後,我設法確定爲什麼我總是得到每字爲僅1而不是合計總計數

在下面的行:

// Create the context with 2 seconds batch size 
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 

我設置之間連續從流中2秒讀取間隔。我意識到我沒有在生產者端的間隔(2秒)內生成相同字符串的足夠內容以獲得彙總結果。

然而,當我增加此時間間隔爲喜歡10000米利斯(10秒),然後我可以產生從卡夫卡製片數據的多條線。這些行由作業正確處理,並且類似的字符串計數在該特定時間段內很好地彙總。

(你好,4)

(世界,6)

非常感謝 沙比爾