我玩的卡夫卡流API(Kakfa版本:0.10.2.0)試圖做一個簡單的wordcount示例工作:Wordcount App gist。我同時運行生產者和消費者的控制檯:卡夫卡流字數統計應用程序
./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092
./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning
啓動應用程序,一切似乎是工作的罰款,但是當我在控制檯內生產一些字符串類型,消費者接受什麼都沒有。如果我改變了應用程序做對消費者接收流輸入一個簡單的toUppercase(修改爲大寫)罰款:
//The following code works fine: val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase()) uppercasedWithMapValues.to("output-topic")
有誰知道爲什麼我的字計數例如接收什麼?我應該在消費者上指定任何序列化程序嗎?在我的最後一次測試控制檯消費者處理,我通過控制檯發送,但並沒有表現出他們的消息,請參閱下面的輸出:
➜ bin ./kafka-console-consumer.sh \
--topic output-topic \
--bootstrap-server localhost:9092 \
--from-beginning
[2017-08-02 07:48:20,187]WARN Error while fetching metadata with correlation id 2 :
{output-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-08-02 07:48:20,197] WARN The following subscribed topics are not assigned
to any members in the group console-consumer-91651 : [output-topic]
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
^CProcessed共有7級的消息
的
太棒了!這就是訣竅!非常感謝你!我想我需要閱讀更多關於kafka流內部的內容。再次感謝你@Arek – ardlema
我很高興幫助你@ardlema :) – Arek