2016-09-16 82 views
3

幾次我用Spring Kafka API實現卡夫卡消費者與手動偏移管理:閱讀相同的消息從卡夫卡

@KafkaListener(topics = "some_topic") 
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) { 
    if (someCondition) { 
     acknowledgment.acknowledge(); 
    } 
} 

在這裏,我想消費者承諾只有當someCondition持有偏移。否則,消費者應該睡一段時間,並再次讀取相同的消息

卡夫卡配置:

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()); 
    factory.getContainerProperties().setAckMode(MANUAL); 
    return factory; 
} 

private Map<String, Object> consumerConfig() { 
    Map<String, Object> props = new HashMap<>(); 
    ... 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
    ... 
    return props; 
} 

當前的配置,如果someCondition == false,消費者不提交的偏移量,但仍讀取下一個消息。如果卡夫卡acknowledgement未執行,是否有方法讓消費者重新閱讀消息?

回答

3

您可以停止並重新啓動容器,它將被重新發送。

即將發佈的1.1版本,你可以seek to the required offset,它會被重新發送。

但是如果它們已經被檢索到,你仍然會首先看到後面的消息,所以你將不得不放棄這些消息。

second milestone具有該功能,我們預計它會在下週發佈。

+0

謝謝。現在我懷疑我是否使用手動偏移管理來實現其預期目的。似乎應該使用功能在**使用者失敗的情況下重新讀取消息**,而不是在使用消息的組件失敗時使用。 – Aliaxander

+0

嗨@Gary Russel,你知道是否存在一些關於使用Spring Boot尋找的例子。我正在嘗試一些類似的東西來聽一個主題,但審查分區中的消息響應:https://stackoverflow.com/questions/48349993/how-to-listen-for-the-right-ack-message- from-kafka – jabrena

+0

尋求並不是該用例的正確解決方案;在那邊看我的回答。 –

2

正如@加里已經指出,你是在正確的方向,seek()是做到這一點。當我遇到這個問題時,我今天找不到它的代碼示例。這是任何想解決問題的人的代碼。

public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware { 

    private ConsumerSeekCallback consumerSeekCallback; 


    @Override 
    public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) { 

     if (/*some condition*/) { 
      //process 
      acknowledgment.acknowledge(); //send ack 
     } else { 

      consumerSeekCallback.seek("your.topic", record.partition(), record.offset()); 

     } 
    } 

    @Override 
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) { 
     this.consumerSeekCallback = consumerSeekCallback; 
    } 

    @Override 
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) { 

     // nothing is needed here for this program 
    } 

    @Override 
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) { 

     // nothing is needed here for this program 
    } 

}