apache-kafka-streams

    0熱度

    1回答

    我正在使用Kafka Streams v.0.10.2.0在主題之間進行簡單處理之間的流式傳輸。最近我遇到了一個問題,其中一家經紀商倒閉,卡夫卡流應用程序關閉,直到我手動重新啓動它。嘗試調試這個問題我無法從日誌究竟是什麼造成這種理解,這裏是日誌摘錄: INFO [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - Revoking previousl

    1熱度

    2回答

    我們有多個輸入主題,其中包含不同的業務事件(頁面查看,點擊,滾動事件等)。就我所瞭解的Kafka流而言,它們都會得到一個事件時間戳,可用於KStream與其他流或表格進行連接以調整時間。 我們想要做的是:合併所有不同的事件(源自上述不同的主題)爲用戶ID(即按用戶ID分組)並將會話窗口應用於他們。 這應該儘可能在包含所有事件的流上使用groupByKey,然後aggregate/reduce(在此

    0熱度

    1回答

    Kafka保證具有相同密鑰的消息始終會轉到同一個 分區。 例如,我有消息與字符串鍵:2329.和兩個主題t1和t2。按照預期,當我執行此消息的寫入時,它將在兩個主題中進入分區1。 現在問題本身:我正在使用Kafka Streams 0.10.2.0持久狀態存儲,它會自動創建一個備份主題。現在如果這個備份主題消息的關鍵字是:2329進入另一個分區(分區0),這對我來說很陌生。 有沒有人遇到過這個問題

    0熱度

    2回答

    我們計劃運行分佈在兩臺機器上的kafka流應用程序。每個實例都將其Ktable數據存儲在自己的機器上。 我們面臨的挑戰是, 我們有100萬條記錄推送到Ktable。我們需要迭代整個Ktable(RocksDB)數據並生成報告。 假設存儲在每個實例中的500K個記錄。通過http (除非有任何流式TCP技術可用),通過單個GET從其他實例獲取所有記錄是不可能的。基本上 我們需要一次調用兩個實例數據並

    1熱度

    1回答

    我在map/foreach期間使用調用外部系統的Kafka流。 map或foreach可以使用多長時間? 是否有任何警告阻塞很長一段時間(例如小時)?

    0熱度

    1回答

    我們目前有一個數據管道設置,我們正在使用Logstash從單個Kafka主題讀取原始數據並將其寫入ElasticSearch。 本主題中的數據採用JSON格式,但每行可以屬於完全不同的業務域,因此它可能具有完全不同的模式。例如: 記錄1:「{」id「:1,」model「:」model2「,」updated「:」2017-01-1T00:00:00.000Z「,」domain「:」A「 } 記錄2:

    0熱度

    1回答

    我有收到一個具有以下信息事件的話題: 鍵 - >orderId(整數) 價值 - >{"orderId" : aaa, "productId" : xxx, "userId" : yyy, "state" : "zzz"}(JSON與訂單的全部信息) 我想實現一個交互式查詢,以通過orderId獲取完整的訂單信息。這個想法能夠從物化視圖(Kafka Streams存儲)中獲取訂單的當前狀態。 首先

    0熱度

    1回答

    我有一個用例做多連接的兩個主題聯接和線程數, 比方說,我有話題A(2個分區)和主題B(2分區)並運行KafkaStreams應用程序的單個實例。 我必須使用案例發現斷裂,左小姐和右小姐的兩個主題之間,所以我執行以下3個操作, A.join(B) B.leftJoin(A) A.leftJoin(B) 按照該文件,將有兩個任務(MAX( 2,2))將每個拓撲結構,共6級的任務,即創建, 1.

    1熱度

    1回答

    如何計算在卡夫卡發送的主題消息總數有多少,以及消費者當時消費或承諾了多少消息? 我initiatting卡夫卡連接器AS- Map<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", "localhost:9092"); Set<String> topics = Colle

    1熱度

    1回答

    我有一個生成一個商店這個簡單的KTable定義更新的原因: KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE); table.print(); 我發佈消息到ORDERS_TOPIC但商店ISN」直到每30秒真正更新一次。這是有一個關於承諾,因爲3