2017-04-20 71 views
2

我想用SinkTask保存數據時保證寫入順序。發生RetriableException時,Kafka Connect能否保證寫入順序?

如果我想在我的SinkTask.put()拋出RetriableException,將Kafka Connect寫入到數據源無序比分區的順序?

例如,如果在一個分區中的消息是1-2-3,如果寫入消息2,可卡夫卡連接保證郵件到達的數據源是1-2-3期間發生異常?

據我所知,卡夫卡連接異步寫入數據源。所以看起來好像數據將不按順序到達數據源。

回答

1

簡短的回答:是的,消息的發佈順序將被保留,但是你必須要處理的消息重新交付。

在您的例子,這意味着,如果SinkTask.put嘗試傳遞到您的水槽下面的批處理的消息:1,2,3和1寫入後並通過投擲RetriableException寫入2之前失敗,連接將暫停消費,並會嘗試重新呼叫轉交給SinkTask.put過程中失敗的批次。這給了我們上述的兩種效果:

一)連接將暫停消費者對這一任務/分區。這意味着在重試失敗之前不會傳送其他批消息。因此,消息順序被保留。例如。如果傳遞1,2,3失敗,RetriableException,Connect在傳遞1,2,3之前不會傳遞4,5,6。

b)連接將重試以傳遞在期間失敗的整個消息集SinkTask.put。這意味着您的接收器會在再次嘗試寫入消息2之前再次看到消息1。

+0

完美。雖然,我不太明白你的意思*「你將不得不處理重新傳送的消息」*因爲你提到Connect會自動傳送消息。這是否意味着正在寫入的數據源必須處理在發生故障時正在寫入的重複消息?例如,在你的例子中,數據源需要處理消息1再次被正確寫入? – Glide

+1

對。我以你最初的例子爲基礎。您提到_在編寫Message-2_期間發生異常,意味着msg1被Sink正確「處理」(例如它被寫入文件)。當從你的接收器中拋出一個_RetriableException_時,這意味着你有辦法重新處理批處理(你的接收器是冪等的),或者你不關心重複項。在一個文件的例子中,這意味着你有一種方法來重新附加部分用第1條消息寫入的文件,以包含現在的消息1,2和3或解析重複項。連接重新輸送整批。 –

+0

感謝您對卡夫卡的有益迴應和貢獻! – Glide

相關問題