2016-03-03 85 views
6

高級消費者API似乎一次只讀一條消息。卡夫卡是否有批量消費者?

如果消費者想要處理消息並將這些消息提交給其他下游消費者,如Solr或Elastic-Search,則這可能會造成很大問題,因爲他們傾向於批量消息而不是一次一個。

將這些消息批量存儲在內存中也不是微不足道的,因爲只有當批處理已經被提交時,Kafka中的偏移量才需要同步,否則將崩潰的kafka-使用者提供未提交的下游消息(如在Solr或ES中)將有其補償已更新,因此消息鬆散。

如果消費者在向下遊提交消息之後但在更新消息偏移之前崩潰,消費者可能會多次消費消息。

如果Kafka批量使用消息,那麼一些指向代碼/文檔的指針將非常感謝。

謝謝!

+0

什麼版本的卡夫卡是你問?我假設你是在談論高級消費者,它是0.8.2或之前。 – morganw09dev

回答

3

我不知道批量消費者。但即使有一個你的主要問題仍然存在。您希望在成功轉發數據後提交偏移量。達到此目的的一種方法是通過設置屬性auto.commit.enable = false來關閉用戶的自動提交。權衡當然是,你必須關心什麼時候提交抵消。

發現消費者特性在這裏的完整文檔:https://kafka.apache.org/documentation.html#consumerconfigs

如何manualy commiting的偏離java-doc的(https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)偷一個很好的例子:

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("group.id", "test"); 
props.put("enable.auto.commit", "false"); 
props.put("auto.commit.interval.ms", "1000"); 
props.put("session.timeout.ms", "30000"); 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
consumer.subscribe(Arrays.asList("foo", "bar")); 
final int minBatchSize = 200; 
List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records) { 
     buffer.add(record); 
    } 
    if (buffer.size() >= minBatchSize) { 
     insertIntoDb(buffer); 
     consumer.commitSync(); 
     buffer.clear(); 
    } 
} 
+0

我同意你對自動提交的解釋。但是就你的代碼而言,ConsumerRecord是一個Kafka 0.9類,而他的問題使他看起來像是在詢問0.9以前的消費者。雖然他沒有明確說明。 – morganw09dev

+0

這就是上述代碼的問題。 – user2250246

+1

如果消費者在提交補償之前崩潰,則會重播消息。我沒有相當於beginTransaction()和endTransaction()的DB。 – user2250246