爲了避免讀取KAFKA STREAMS被殺死時處理但未提交的消息,我希望獲得每個消息的偏移以及鍵和值,以便我可以將它存儲在某處並將其用於避免重新處理已處理的消息。有沒有一種方法可以獲得在kafka流中消費的每條消息的偏移量?
3
A
回答
2
是的,這是可能的。請參閱http://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information的FAQ條目。
我將在下面的關鍵信息複製粘貼:
訪問記錄元數據,如主題,分區和偏移信息?
記錄元數據可通過Processor API訪問。 它也可以通過DSL間接獲得,這要感謝它的 Processor API integration。
使用Processor API,您可以通過
ProcessorContext
訪問記錄元數據。您可以在Processor#init()
存儲在你的處理器的 實例字段的上下文的引用,然後 查詢中Processor#process()
處理器方面,例如 (同爲Transformer
)。上下文自動更新以匹配 當前正在處理的記錄,這意味着方法 (如ProcessorContext#partition()
)總是返回當前的 記錄的元數據。在punctuate()
內調用處理器 上下文時,有些注意事項適用,請參閱Javadocs以瞭解詳細信息。如果使用DSL與自定義
Transformer
組合,例如, 你可以改變輸入記錄的值也包括分區 和偏移元數據,以及諸如map
或 隨後的DSL業務filter
可以再利用這些信息。
相關問題
- 1. Golang Kafka沒有消耗所有消息偏移西南
- 2. 有沒有辦法阻止特定偏移量的Kafka消費者?
- 3. 卡夫卡消費者中的控制消息偏移量
- 4. 有沒有辦法獲得dailymotion實時流的聊天消息?
- 5. Apache Kafka消費者組的偏移量如何到期?
- 6. 如何獲取Golang Kafka中的分區的消費者組偏移量10
- 7. 有沒有一種方法可以在Android中獲得沒有GPS的速度?
- 8. 有沒有一個很好的例子,用於消費通用json消息的Spring雲流kafka活頁夾
- 9. 是否有可能在Spark + KafkaRDD中獲得特定的消息偏移
- 10. 有沒有一種方法可以讓ActionListener取消?
- 11. 有沒有辦法跟蹤Akka中的每條消息?
- 12. 消費流,轉型,然後交給其他消費者(沒有州)的方法
- 13. python-kafka:消費者可以根據消息屬性跳過消息嗎?
- 14. 有沒有任何方法可以在沒有交互式消息的情況下獲得按鈕回答?
- 15. 有沒有一種方法可以在主要的FOREX消息下以編程方式避免EA交易?
- 16. 有沒有一種方法可以記錄Smalltalk上每個消息參數的類型? (如Objective-C)
- 17. kafka消費者提取API不返回正確的偏移值
- 18. 有沒有一種方法可以免費安裝.NET 3.5 SP1?
- 19. 有沒有一種方法可以識別已被取消的查詢?
- 20. Kafka Listener方法未被調用。消費者不消費。
- 21. 有沒有一種方法可以共享對克隆調解員2條WSO2消息(WSO2 4.8.1)
- 22. 有沒有一種方法來編碼PHP中的錯誤消息
- 23. 有沒有任何方法可以提升單向消息,如工作者消息或WebSocket消息?
- 24. 消費者在卡夫卡消費的消息有哪些方式?
- 25. 有沒有一種方法可以在ActionScript中獲得頂級應用程序?
- 26. 有一種方法可以在Azure隊列或服務總線中獲得「處理完成」消息事件?
- 27. 有沒有辦法確定VTable中每個特徵方法的偏移量?
- 28. ActiveMQ消息收據事件每秒只有一條消息?
- 29. 有沒有更好的方法來計算消息隊列(MSMQ)中的消息?
- 30. Kafka:消費者羣體中的消費者可以共享數據嗎?