2017-01-22 98 views
0

我想了解IRecordProcessor的processRecords方法何時從worker調用。如果我之前對processRecords的調用尚未完成,工人將調用下一個processRecords?工作人員是否會開始從kinesis獲取新記錄,或者是否會等到當前記錄完成執行。kinesis客戶端工作邏輯

基本上我想等待很長時間,如果processRecords得到一些異常,同時保存記錄在外部數據庫,因爲數據庫已關閉或其他一些錯誤。因此,如果工作人員在更早完成處理之前沒有開始提取新記錄,那麼要確認是否會有任何問題?

回答

0

來自其他問題摘錄:

申請書(KCL的幫助下)將繼續在後臺輪詢「碎片 迭代器」,因此您會收到有關新 數據,當談到。

來源:https://stackoverflow.com/a/35582161/1622134

而且還通過「工人」你的意思是在應用程序中的「工人」線程;這是一個可運行的。

每個碎片正好由一個KCL工人處理,有且只有一個 相應的記錄處理器,所以你永遠需要多個實例 處理一個碎片。請參閱KCL源文件中的Worker.java類。

來源:https://stackoverflow.com/a/34509567/1622134

要回答你的問題,你可以在它在你的processRecords實現。在處理記錄時,當且僅當try部分成功時,才使用try-catch塊並將檢查點寫入DynamoDB。那樣;如果在寫入外部數據庫時發生錯誤,則不會丟失記錄並重新啓動。您還應該將這些記錄數據(不能插入到數據庫中)保存到其他地方以供日後處理。

也看到這樣的回答:https://stackoverflow.com/a/32517002/1622134

+0

在worker.java,它調用runProcessLoop和它調用shardConsumer.consumeShard()有它調用checkAndSubmitNextTask(),它檢查readyForNextTask與否。如果notReady它不消費新記錄。那麼工作人員如何在沒有記錄處理器處理過程的情況下檢索新記錄呢? – user1846749

+0

如果您的一方存在臨時數據庫中斷(這會阻止使用記錄);你應該停止你的Kinesis消費者應用程序,直到它被修復。或者還有第二種方法:在我的答案的最後一個鏈接中,有一行解釋了你的問題:「但是如果失敗了,記下來到另一個地方去調查失敗的原因。」 - 因此,您可以手動處理日後在數據庫中斷期間使用的記錄。 – az3