2017-01-02 155 views

回答

0

我不知道我是否正確地理解你的問題。但是,我認爲有多種方法(取決於你實際想要達到的目標,而不是從問題中得出)。

  1. 使用卡夫卡流DSL(卡夫卡0.10):使用卡夫卡流(Java流處理庫),你可以指定一個窗口聚集爲任何規模大小
  2. 開拓時間戳(卡夫卡0.10)的翻滾窗口:如果您要使用KafkaConsumer你可以閱讀郵件,並通過間隔檢查它們的時間戳,塊數據
  3. 系統基於時間的(所有版本卡夫卡):剛剛從卡夫卡讀取消息,並把傳遞消息在系統中的時間間隔基地。也就是說,在你處理下一個記錄之前,你需要檢查你的本地時鐘來間隔地發送消息。
+0

我的意思是說,有一個選項可以每隔1分鐘從卡夫卡提取數據(例如:在10:01從10:00到10:01讀取所有記錄,在10:02讀取所有記錄從10:01到10:02等),而不是在運行時獲取新記錄? 我想讀取數據進行處理每個給定的時間間隔,而不是將數據保存在內存中,直到處理。 – user7365161

+0

由於Kafka是基於拉的,因此沒有內置的支持。您需要使用建議的方法之一將此邏輯放入客戶端。如果我正確理解了你的評論,則可能需要結合使用方法(3)以在開始poll()之前獲取當前的日誌結束偏移量,並且僅將消費者消息轉換爲獲得的偏移量(以避免讀取添加的記錄開始消費後) –