2016-09-30 164 views
2

我已經開始了我的動物園管理員和卡夫卡服務器。 我開始了我的卡夫卡製作人,他發送了10封主題爲'xxx'的信息。然後停下了我的卡夫卡製片人。 現在我開始了我的卡夫卡消費者並訂閱了主題'xxx'。我的消費者使用我的Kafka製作人發送的10條消息,該消息現在不在運行。 我需要我的Kafka使用者應該只使用運行Kafka服務器的消息。 有什麼辦法可以做到這一點? 以下是我的消費屬性中的內容。在卡夫卡消費活的消息

props.put("bootstrap.servers", "localhost:9092"); 
    String consumeGroup = "cg1"; 
    props.put("group.id", consumeGroup); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.offset.reset", "earliest"); 
    props.put("auto.commit.interval.ms", "100"); 
    props.put("heartbeat.interval.ms", "3000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 
+0

您可以丟棄任何正在等待的消息,只能在程序啓動後到達。如果您只想要非持久性消息傳遞幾乎任何其他消息傳遞解決方案可能是更好的選擇,則Kafka旨在持久保留消息(因爲您需要此功能,因此使用Kafka)。 –

+1

是否有任何標誌可以識別等待很長時間的消息? – Prasath

+0

刪除此屬性'props.put(「auto.offset.reset」,「earliest」);',它與ConsumerConfig.AUTO_OFFSET_RESET_CONFIG相同。只保留最新的屬性。 – Rambler

回答

1

設置以下屬性:

consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 

它告訴消費者只閱讀最新的消息,那就是,這是消費者以後公佈消息開始。

+0

爲我的消費者使用提供的財產。 Stil我正在收到非現場製作人的數據。 – Prasath

+0

您正在使用哪個kafka api版本? – Rambler

+0

以下依賴關係我使用過。此外,我更新了我的問題中的消費者屬性。 \t \t \t ch.qos.logback \t \t \t 的logback經典 \t \t \t 1.0.13 \t \t \t \t \t \t \t org.apache。卡夫卡 \t \t \t 卡夫卡客戶 \t \t \t 0.9.0.0 \t \t \t \t \t \t \t org.apache.avro \t \t \t 的Avro \t \t \t 1.7.7 \t \t Prasath