2015-12-21 49 views
0

我是新來的卡夫卡,我試圖爲原型簡單的消費者 - 生產者消息隊列使用Apache卡夫卡(傳統隊列)模型0.9.0 Java客戶端。卡夫卡0.9.0新的Java API消費者獲取重複記錄

從生產者進程,我推100條隨機消息與3個分區構成的主題。這看起來很好。

我創建了同一組ID3消費者線程,訂閱了同一主題。啓用自動提交。由於所有3個消費者線程都訂閱了相同的主題,因此我假設每個消費者都將獲得一個分區來消費,並將提交每個分區的偏移日誌。

但我在這裏面臨奇怪的問題。我所有的信息都是重複的。我從每個線程獲得x時間更多的消費者記錄。由於我的每個消費者線程都會無限循環地從主題輪詢,所以我必須終止進程。

我甚至用單個線程嘗試和我仍然得到重複記錄x次,並仍在繼續。

可以在任何請幫我鑑定我在做什麼錯在這裏。

我張貼我的消費者代碼供您參考。

public class ConsumerDemo { 

public static void main(String[] args) { 

    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Consumer-%d").build(); 
    ExecutorService executor = Executors.newFixedThreadPool(3, threadFactory); 

    executor.submit(new ConsumerThread("topic1", "myThread-1")); 
    executor.submit(new ConsumerThread("topic1", "myThread-2")); 
    executor.submit(new ConsumerThread("topic1", "myThread-3")); 

    //executor shutdown logic is skipped 
} 
} 

消費主題:

public class ConsumerThread implements Runnable { 

private static final String KAFKA_BROKER = "<<IP:port>>"; 

private final KafkaConsumer<String, String> consumer; 

    public ConsumerThread(String topic, String name) { 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", ConsumerThread.KAFKA_BROKER); 
     props.put("group.id", "DemoConsumer"); 
     props.put("enable.auto.commit", "true"); 
     props.put("auto.commit.interval.ms", "6000"); 
     props.put("session.timeout.ms", "30000"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

     this.consumer = new KafkaConsumer(props); 
     this.consumer.subscribe(Collections.singletonList(topic)); 
    } 


    public void run() { 
     try { 
      boolean isRunning = true; 
      while (isRunning) { 
       ConsumerRecords<String,String> records= consumer.poll(10L); 
       System.out.println("Partition Assignment to this Consumer: "+consumer.assignment()); 
       Iterator it = records.iterator(); 
       while(it.hasNext()) { 
        ConsumerRecord record = (ConsumerRecord)it.next(); 
        System.out.println("Received message from thread : "+Thread.currentThread().getName()+"(" + record.key() + ", " + (String)record.value() + ") at offset " + record.offset()); 
       } 
      } 
      consumer.close(); 
     } 
     catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
} 

而且非常重要的是,我的目標整整一次語義。我知道那是1000英里。任何幫助真的很感激。

觀察:調試系統輸出打印所有3個tpoics。這是否意味着分區不分配給每個消費者?

分區分配給該消費者:[topic1-1,topic1-0,topic1-2]

卡夫卡專家,除了上述問題我尋找其他2個輸入。

  1. 請幫我理解上面代碼中的錯誤。
  2. 一般來說,一次電路圖可以如何實現。如果可能的話。
  3. 異常情況,如消費者下降。如何處理而不會丟失消息。

在此先感謝。

回答

1

您是否在使用比會話超時慢的消息?在這種情況下,您有可能導致雙重消費的風險重新平衡。

+0

不是。我發現了問題和可能的解決方案。感謝您的回答 – devThoughts

2

嗯,我想出了什麼是錯的,我的code/undertanding。

在我開始進行原型設計之前,我應該完全閱讀Kafka文檔。

這是我發現的。

默認Kafka保證至少一次示意圖。這意味着消費者至少會收到一次消息(可能是多次),我假設如果我有3個分區並創建3個消費者,則Kafka API將負責爲一個消費者隨機分配一個分區,這是錯誤的。所以,我手動分配一個分區給每個消費者,以確保一個,我的消費者擁有的分區和控制失調像下面

consumer = new KafkaConsumer(props)  
TopicPartition partition = new TopicPartition(topic, partitionNum); 
consumer.assign(Collections.singletonList(partition)); 

恰好一次的情景: 爲了確保我們消耗的消息exaclty一次,我們需要儘管我還沒有嘗試過,但基於我從大量的搜索引擎中學到的東西是,它更好的方式來保存偏移量和數據。有利的相同交易。數據和偏移量都會保存或回退以供重試。

任何其他解決方案,讚賞。

+3

如果您希望至多一次投放,您還可以在處理每封郵件之前手動提交偏移量。它可能是一個替代方案。 – Ztyx

0

排除由於應用程序層內的生產者重試或重試導致的重複。

在生產者方面,如果「request.timeout.ms」未適當配置網絡/集羣,則有可能的是,由於慢啓動(初始化生產者,連接建立等),初始請求將超時在生產者身上,但實際上由經紀人/服務器處理。這將導致重複重複。