2017-03-10 86 views
0

我正在嘗試構建pub/sub應用程序,我正在探索最佳工具。我目前正在尋找卡夫卡並有一個小演示應用程序已經運行。但是,我正在遇到一個概念問題。沒有消費者連接時,卡夫卡經紀人是否可以保留消息?

我有一個製片人(的Java代碼):

String topicName = "MyTopic; 
    String key = "MyKey"; 

    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092,localhost:9093"); 
    props.put("acks", "all"); 
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
    Producer<String, byte[]> producer = new KafkaProducer <String, byte[]>(props); 

    byte[] data = <FROM ELSEWHERE>; 
    ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(topicName, key, data); 

    try { 
     RecordMetadata result = producer.send(record).get(); 
    } 
    catch (Exception e) { 
     // Nothing for now 
    } 
    producer.close(); 

當我開始通過Kakfa命令行工具消費者:

kafka-console-consumer --bootstrap-server localhost:9092 --topic MyTopic 

,然後我執行製片人的代碼,我看到我的消費者終端上顯示的數據信息。

然而,如果我這樣做前執行製片人跑消費者,出現一則「丟失」。當我啓動消費者(執行生產者之後)時,消費者終端中不會顯示任何內容。

有誰知道是否有可能讓卡夫卡經紀人保留消息,而沒有消費者連接?如果是這樣,怎麼樣?

+0

嘗試附加'--from-beginning'來查看您的問題是否消失。這可能是由於默認的偏移重置策略。查看消費者配置中的'auto.offset.reset'的詳細信息。 – amethystic

+0

@amethystic是的,它做到了!感謝您的參考。把這個答案,我會接受。它也很高興地看到它顯示了我所有的其他「丟失」消息。 – Brett

+0

在學習期間不要錯過這些設置的一個很好的起點是https://kafka.apache.org/quickstart。 – randominstanceOfLivingThing

回答

4

--from-beginning附加到控制檯消費者命令,使其從最早的偏移量開始消耗。這實際上是由config auto.offset.reset控制的偏移復位策略。下面是這個配置是指:

怎麼辦時,沒有初始卡夫卡或者如果電流偏移不存在任何更多的服務器(例如因爲該數據已被刪除)偏移:

earliest:自動復位偏移,偏移最早

latest:自動復位偏移到最新偏移

none:如果沒有先前偏移發現爲C拋出異常給消費者onsumer的團體 其他:向消費者拋出異常。

+0

正是我需要的。 – Brett