2017-08-02 79 views
0

我玩的卡夫卡流API(Kakfa版本:0.10.2.0)試圖做一個簡單的wordcount示例工作:Wordcount App gist。我同時運行生產者和消費者的控制檯:卡夫卡流字數統計應用程序

./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092

./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning 

啓動應用程序,一切似乎是工作的罰款,但是當我在控制檯內生產一些字符串類型,消費者接受什麼都沒有。如果我改變了應用程序做對消費者接收流輸入一個簡單的toUppercase(修改爲大寫)罰款:

//The following code works fine: val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase()) uppercasedWithMapValues.to("output-topic")

有誰知道爲什麼我的字計數例如接收什麼?我應該在消費者上指定任何序列化程序嗎?在我的最後一次測試控制檯消費者處理,我通過控制檯發送,但並沒有表現出他們的消息,請參閱下面的輸出:

➜ bin ./kafka-console-consumer.sh \ 
      --topic output-topic \ 
      --bootstrap-server localhost:9092 \ 
      --from-beginning                     
[2017-08-02 07:48:20,187]WARN Error while fetching metadata with correlation id 2 : 
{output-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 
[2017-08-02 07:48:20,197] WARN The following subscribed topics are not assigned 
to any members in the group console-consumer-91651 : [output-topic] 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 

^CProcessed共有7級的消息

回答

2

KStream的作品,因爲它不使用緩存。對於KTable你必須等一下,或者將cache.max.bytes.buffering設置爲0(但不是在生產代碼!)

+0

太棒了!這就是訣竅!非常感謝你!我想我需要閱讀更多關於kafka流內部的內容。再次感謝你@Arek – ardlema

+0

我很高興幫助你@ardlema :) – Arek