我是新來的卡夫卡,我試圖爲原型簡單的消費者 - 生產者消息隊列使用Apache卡夫卡(傳統隊列)模型0.9.0 Java客戶端。卡夫卡0.9.0新的Java API消費者獲取重複記錄
從生產者進程,我推100條隨機消息與3個分區構成的主題。這看起來很好。
我創建了同一組ID3消費者線程,訂閱了同一主題。啓用自動提交。由於所有3個消費者線程都訂閱了相同的主題,因此我假設每個消費者都將獲得一個分區來消費,並將提交每個分區的偏移日誌。
但我在這裏面臨奇怪的問題。我所有的信息都是重複的。我從每個線程獲得x時間更多的消費者記錄。由於我的每個消費者線程都會無限循環地從主題輪詢,所以我必須終止進程。
我甚至用單個線程嘗試和我仍然得到重複記錄x次,並仍在繼續。
可以在任何請幫我鑑定我在做什麼錯在這裏。
我張貼我的消費者代碼供您參考。
public class ConsumerDemo {
public static void main(String[] args) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Consumer-%d").build();
ExecutorService executor = Executors.newFixedThreadPool(3, threadFactory);
executor.submit(new ConsumerThread("topic1", "myThread-1"));
executor.submit(new ConsumerThread("topic1", "myThread-2"));
executor.submit(new ConsumerThread("topic1", "myThread-3"));
//executor shutdown logic is skipped
}
}
消費主題:
public class ConsumerThread implements Runnable {
private static final String KAFKA_BROKER = "<<IP:port>>";
private final KafkaConsumer<String, String> consumer;
public ConsumerThread(String topic, String name) {
Properties props = new Properties();
props.put("bootstrap.servers", ConsumerThread.KAFKA_BROKER);
props.put("group.id", "DemoConsumer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "6000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer(props);
this.consumer.subscribe(Collections.singletonList(topic));
}
public void run() {
try {
boolean isRunning = true;
while (isRunning) {
ConsumerRecords<String,String> records= consumer.poll(10L);
System.out.println("Partition Assignment to this Consumer: "+consumer.assignment());
Iterator it = records.iterator();
while(it.hasNext()) {
ConsumerRecord record = (ConsumerRecord)it.next();
System.out.println("Received message from thread : "+Thread.currentThread().getName()+"(" + record.key() + ", " + (String)record.value() + ") at offset " + record.offset());
}
}
consumer.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
而且非常重要的是,我的目標整整一次語義。我知道那是1000英里。任何幫助真的很感激。
觀察:調試系統輸出打印所有3個tpoics。這是否意味着分區不分配給每個消費者?
分區分配給該消費者:[topic1-1,topic1-0,topic1-2]
卡夫卡專家,除了上述問題我尋找其他2個輸入。
- 請幫我理解上面代碼中的錯誤。
- 一般來說,一次電路圖可以如何實現。如果可能的話。
- 異常情況,如消費者下降。如何處理而不會丟失消息。
在此先感謝。
不是。我發現了問題和可能的解決方案。感謝您的回答 – devThoughts