我已經開始了我的動物園管理員和卡夫卡服務器。 我開始了我的卡夫卡製作人,他發送了10封主題爲'xxx'的信息。然後停下了我的卡夫卡製片人。 現在我開始了我的卡夫卡消費者並訂閱了主題'xxx'。我的消費者使用我的Kafka製作人發送的10條消息,該消息現在不在運行。 我需要我的Kafka使用者應該只使用運行Kafka服務器的消息。 有什麼辦法可以做到這一點? 以下是我的消費屬性中的內容。在卡夫卡消費活的消息
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg1";
props.put("group.id", consumeGroup);
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "100");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
您可以丟棄任何正在等待的消息,只能在程序啓動後到達。如果您只想要非持久性消息傳遞幾乎任何其他消息傳遞解決方案可能是更好的選擇,則Kafka旨在持久保留消息(因爲您需要此功能,因此使用Kafka)。 –
是否有任何標誌可以識別等待很長時間的消息? – Prasath
刪除此屬性'props.put(「auto.offset.reset」,「earliest」);',它與ConsumerConfig.AUTO_OFFSET_RESET_CONFIG相同。只保留最新的屬性。 – Rambler