kafka-producer-api

    0熱度

    3回答

    比方說,有用於執行任務列表[T]一批API。爲了完成這項工作,所有的任務都需要推到卡夫卡。有兩種方法做到這一點: 1)在卡夫卡 2)推動各個工作T卡夫卡 我相信方法1會更好,因爲我沒有推推列表作爲消息消息發送到單個批次調用的多個時間段。有人能告訴我這種方法是否有害嗎?

    0熱度

    1回答

    我已經設置了一個MirrorMaker集羣,它消耗來自世界各地的集羣的主題。 問題是鏡像羣集(位於歐盟)和源羣集(位於美國)之間的延遲會造成偏移滯後的大幅上升。 鏡像消耗9個主題,每個主題由24個分區組成。 鏡像設置 /opt/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config /opt/kafka/conf

    3熱度

    1回答

    我正在使用Kafka和Spark Streaming構建應用程序。輸入數據來自第三部分流媒體,併發布在kafka話題上。這段代碼顯示了流代理模塊:這是我從流的結果和方式如何我送他們到KafkaPublisher(它顯示只是一個草圖): def on_result_response(self,*args): self.kafkaPublisher.pushMessage(str(args[

    1熱度

    1回答

    我正在使用處理器來使用來自主題的字節數組serdes的字節數組數據,將它們處理爲通用記錄(基於模式I從我的HTTP GET請求獲得)並將它們發送到具有格式化avro模式註冊表的主題。 我沒有問題從HTTP GET請求中檢索架構,並根據它來映射我的數據以生成架構之後的通用記錄。然而,當我試圖把它沉到的話題,我得到一個空指針異常: org.apache.kafka.common.errors.Seri

    0熱度

    1回答

    從最後一個版本的Kafka(0.11.0.0)於2017年6月28日發佈,kafka團隊提供了新功能,以便在交付時支持。 在我下載最新版本後,我嘗試配置生產者(通過kafka-console-producer.sh腳本執行),如Producer configs中所述:我設置了enable.idempotence=true和transactional.id=0A0A。 的問題是,當我開始製作我得到一

    0熱度

    2回答

    我正在閱讀一個csv文件,並將此輸入的行給予我的Kafka Producer。現在我想讓我的卡夫卡製作人以每秒100條消息的速度製作信息。

    0熱度

    1回答

    隨着Kafka 0.11.0.0的最後一個版本,Apache團隊正在引入冪等生產者和交易。 是否有可能保證我們想要記錄的整組消息(例如100萬)只會在最後被提交? 我想說的是,如果生產者與經紀商之間的聯繫鬆動並且無法使其穩定下來,消費者就不會看到消息。可能嗎?

    0熱度

    1回答

    我面臨的一個奇怪的問題,在我的製片代碼卡夫卡 producer.flush(); logger.info("Closing producer"); producer.close(); logger.info("successfully closed producer"); 我上面的代碼片斷作爲守則的一部分,問題是,我只能看到以下輸出 Closing producer 在我的日誌中,最

    0熱度

    1回答

    Kafka保證具有相同密鑰的消息始終會轉到同一個 分區。 例如,我有消息與字符串鍵:2329.和兩個主題t1和t2。按照預期,當我執行此消息的寫入時,它將在兩個主題中進入分區1。 現在問題本身:我正在使用Kafka Streams 0.10.2.0持久狀態存儲,它會自動創建一個備份主題。現在如果這個備份主題消息的關鍵字是:2329進入另一個分區(分區0),這對我來說很陌生。 有沒有人遇到過這個問題

    3熱度

    1回答

    新的Kafka版本(0.11)只支持一次語義。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging 我有一個製片人設置在這樣的java卡夫卡的交易代碼。 producer.initTransactions(); try {