2017-07-28 68 views
0

我有一個基於微服務的應用程序,它從Kafka主題讀取消息。當服務關閉時,如果在主題上有任何消息正在寫入,我希望消費者在下次啓動並運行時閱讀這些消息。但是當服務停止時,我錯過了所有的消息。如何讓消費者讀取服務關閉時未讀取的消息?與Kafka的春天雲流 - 重新啓動消費者後郵件未被讀取

當我的微服務啓動並且任何消息正在返回到主題時,我正在收到所有消息。

我application.properties:

spring.cloud.stream.bindings.input.destination=test 
spring.cloud.stream.bindings.input.consumer.headerMode=raw 
spring.cloud.stream.bindings.input.consumer.startOffset=latest 
spring.cloud.stream.bindings.input.consumer.resetOffsets=true 
spring.cloud.stream.bindings.input.consumer.instanceCount=3 
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false 

//這是我的微服務的根目錄

@EnableBinding(Sink.class) 
public class Consumer { 
    @ServiceActivator(inputChannel = Sink.INPUT) 
    public void consoleSink(Object payload){ 
     logger.info("Type: "+ payload.getClass() + " which is byte array"); 
     logger.info("Payload: " + new String((byte[])payload)); 
    } } 

我明白任何線索來解決這個問題,在我的消費者的代碼。

回答

0

設置下面的屬性幫助我解決了我的問題。

spring.cloud.stream.bindings.input.destination=test 
spring.cloud.stream.bindings.input.consumer.headerMode=raw 
spring.cloud.stream.bindings.input.consumer.startOffset=latest 
spring.cloud.stream.bindings.input.consumer.resetOffsets=true 
spring.cloud.stream.bindings.input.consumer.instanceCount=3 
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false 
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false 
spring.cloud.stream.kafka.binder.autoCreateTopics=false 
spring.cloud.stream.bindings.input.group=testGroup50 
spring.cloud.stream.bindings.input.partitioned=false 

感謝,

BR