我想了解IRecordProcessor的processRecords方法何時從worker調用。如果我之前對processRecords的調用尚未完成,工人將調用下一個processRecords?工作人員是否會開始從kinesis獲取新記錄,或者是否會等到當前記錄完成執行。kinesis客戶端工作邏輯
基本上我想等待很長時間,如果processRecords得到一些異常,同時保存記錄在外部數據庫,因爲數據庫已關閉或其他一些錯誤。因此,如果工作人員在更早完成處理之前沒有開始提取新記錄,那麼要確認是否會有任何問題?
在worker.java,它調用runProcessLoop和它調用shardConsumer.consumeShard()有它調用checkAndSubmitNextTask(),它檢查readyForNextTask與否。如果notReady它不消費新記錄。那麼工作人員如何在沒有記錄處理器處理過程的情況下檢索新記錄呢? – user1846749
如果您的一方存在臨時數據庫中斷(這會阻止使用記錄);你應該停止你的Kinesis消費者應用程序,直到它被修復。或者還有第二種方法:在我的答案的最後一個鏈接中,有一行解釋了你的問題:「但是如果失敗了,記下來到另一個地方去調查失敗的原因。」 - 因此,您可以手動處理日後在數據庫中斷期間使用的記錄。 – az3