2016-07-29 61 views
0

這裏是我的卡夫卡消息生產者:僅消耗特定partion消息

 ProducerRecord producerRecord = new ProducerRecord(topic, "k1", message); 
     producer.send(producerRecord); 

這裏是我的消費者

TopicPartition partition0 = new TopicPartition(topic, 0); 
consumer.assign(Arrays.asList(partition0)); 
    final int minBatchSize = 200; 
    List<ConsumerRecord<String, byte[]>> buffer = new ArrayList<>(); 
    while (true) { 
     ConsumerRecords<String, byte[]> records = consumer.poll(100); 
     for (ConsumerRecord<String, byte[]> record : records) { 
      buffer.add(record); 
      System.out.println(record.key() + "KEY: " + record.value()); 

怎麼可能消耗爲​​作爲分區鍵只topic消息

+0

用「partition key'k1'」我猜你實際上是指「message key'k1'」。卡夫卡確實爲您選擇了鑰匙。在使用分區時,必須使用該分區的每條消息。無論你做什麼或只是放棄它,如果它沒有正確的鑰匙取決於你。 – Harald

+0

你是說在接收郵件的同時我應該做'if(msg.key()。equals(「k1」)' – manish

+0

是的,那就是要走的路。 – Harald

回答

1

我看到實現這種行爲的唯一方法是讓分區的數量==可能的密鑰數量,並有一個自定義的分區程序來維護密鑰唯一性的分區(默認散列分區將工作,我認爲)。但是這個解決方案遠非最佳,我不能推薦它。除此之外,你不能使用任何內置的機制來實現類似的行爲 - 你必須在客戶端過濾郵件

+0

不是你我認爲這個功能並不好,比如我發佈了一個消息給'topic',所有的'topic'用戶都會收到消息,儘管它並不打算髮送消息。cos過濾應該在客戶端基於消息密鑰 – manish

0

一個建議是要記住partition和具體的消息offset, 使用assignseekpoll(同時設置消費者max.poll.records = 1,一次獲取一條消息)。

  • 指定給消費者分配特定分區;
  • 尋求特定抵消,然後下一次民意調查將得到您的預期消息K1

注意:它的工作原理與「隨機」查找相似,但會降低消息的消耗性能。
0.10新消費者和新配置max.poll.records是必需的。