2017-04-06 90 views
3

我有一個使用來自Kafka Streams的KTable的單實例Java應用程序。直到最近,我可以使用KTable檢索所有數據,突然間一些消息似乎消失了。那裏應該有〜33k消息和唯一的密鑰。爲什麼Kafka KTable缺少條目?

當我想通過密鑰檢索消息時,我沒有收到一些消息。我使用ReadOnlyKeyValueStore檢索消息:

final ReadOnlyKeyValueStore<GenericRecord, GenericRecord> store = ((KafkaStreams)streams).store(storeName, QueryableStoreTypes.keyValueStore()); 
store.get(key); 

這些是我設置爲KafkaStreams的配置設置。

final Properties config = new Properties(); 
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverId); 
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); 
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); 
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class); 
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); 
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); 

卡夫卡:0.10.2.0-CP1
合流:3.2.0

調查給我帶來了一些非常令人擔憂的見解。使用REST代理我手動讀取分區,發現有些偏移返回錯誤。

請求: /topics/{topic}/partitions/{partition}/messages?offset={offset}

{ 
    "error_code": 50002, 
    "message": "Kafka error: Fetch response contains an error code: 1" 
} 

沒有客戶端,無論是Java和命令行然而返回任何錯誤。他們只是跳過錯誤的缺失消息導致KTables中的數據丟失。一切都很好,沒有注意到,似乎某些消息被破壞了。

我有兩個代理,所有主題的複製因子爲2,並且完全複製。兩家經紀商分別返還。重新啓動經紀人沒有任何區別。

  • 可能是什麼原因?
  • 如何在客戶端檢測到這種情況?
+0

我不知道'StoreManager'是什麼 - 這不是Kafka Streams的一部分。你使用窗式或非窗式KTable嗎?你使用什麼版本的Kafka Streams? –

+2

@ MatthiasJ.Sax對不起,我的錯誤讓我的問題更加精確。 – Maciej

+0

感謝您的更新。這聽起來很奇怪。 「他們只是跳過錯誤的消息導致數據丟失」 - 這聽起來也非常奇怪 - AFAIK,消費者沒有內置的「跨越」消息機制。也許你應該問在Kafka用戶列表http://kafka.apache.org/contact(這可能是一個錯誤...) - 雖然Kafka Streams問題似乎並不是Kafka Streams問題,因爲Kafka Streams內部只是使用了Kafka Consumer,如果消費者表現得很奇怪,那麼Kafka Streams就無法解決這個問題。 –

回答

1

通過default Kafka Broker配置關鍵cleanup.policy設置爲delete。將其設置爲compact以保留每個密鑰的最新消息。 See compaction

刪除舊消息不會更改最小偏移量,因此嘗試檢索下面的消息會導致錯誤。錯誤非常模糊。 Kafka Streams客戶端將開始從最小偏移量讀取消息,因此沒有錯誤。唯一可見的影響是KTables中缺少數據。

當應用程序運行時,由於caches,即使從Kafka本身刪除消息,所有數據仍然可用。清理後它們會消失。