apache-kafka-streams

    1熱度

    1回答

    我目前使用kafka流來整理窗口內的相關事件。如果所有相關事件都沒有到達窗口內,卡夫卡流中是否有方法可以處理已過期的事件。這將有助於處理/通知下游應用程序所有相關事件未到達整理。感謝您的迴應。 以下是在示例 例-1: - 的GroupID:G1 - 事件到來:E1,10am; E2 10:01 am和E3 10:02 am - 窗口:不活動持續時間爲5分鐘的會話窗口。 - 結果:所有事件均已成功整

    3熱度

    1回答

    在探索如何單元測試卡夫卡流我碰到ProcessorTopologyTestDriver來了,可惜的是這個類似乎已經變得與版本0.10.1.0(KAFKA-4408) 斷是否有變通適用於KTable問題? 我看到了「Mocked Streams」項目,但首先它使用版本0.10.2.0,而我在0.10.1.1上,第二個是Scala,而我的測試是Java/Groovy。 任何幫助這裏如何單元測試流而不必

    0熱度

    1回答

    Kafka州商店Rock DB是容錯的,從更改日誌中可以如何恢復那些不起作用的數據?

    3熱度

    1回答

    我有一個使用來自Kafka Streams的KTable的單實例Java應用程序。直到最近,我可以使用KTable檢索所有數據,突然間一些消息似乎消失了。那裏應該有〜33k消息和唯一的密鑰。 當我想通過密鑰檢索消息時,我沒有收到一些消息。我使用ReadOnlyKeyValueStore檢索消息: final ReadOnlyKeyValueStore<GenericRecord, GenericR

    0熱度

    1回答

    因此,假設圈點時間爲X分鐘/秒 { props.put("group.max.session.timeout.ms", X*2); props.put("session.timeout.ms", x); props.put("request.timeout.ms", X*2); } 是自主設置會話超時卡夫卡流低水平處理器API正確的方法是什麼?

    1熱度

    1回答

    我一直在與卡夫卡消費者和生產者api工作了一段時間,並想嘗試我的手在流api。我在網上查了很多參考資料,但我無法弄清楚這一件簡單的事情。 如何製作只將消息發送到輸出主題的KStream。 拿他們在github上的這個最基本的例子來回答: https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/i

    3熱度

    1回答

    我有一個卡夫卡Streams應用程序從消費國和生產的卡夫卡集羣3名經紀人和3.除了消費者偏移主題(50個分區)複製的因素,所有其他議題都只有一個分區。 當經紀人嘗試一個優選的複製品的選舉中,流應用(其在一個完全不同的實例比經紀人運行)失敗,錯誤: Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exce

    1熱度

    1回答

    如果有兩個Kafka流從相同的Kafka主題消耗,那麼在容錯情況下,如果一個Kafka流發生故障,則Kafka狀態存儲將在第二個Kafka流中複製。如果第一次回到卡夫卡第二流的國家商店將會發生什麼。

    5熱度

    1回答

    有沒有辦法用Kafka Stream手動提交? 通常使用的KafkaConsumer,我這樣做如下: while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ /

    1熱度

    1回答

    當我運行Kafka Streams應用程序的多個實例時,只有第一個實例正在正確接收消息。但是,如果我啓動新的實例,它們不會收到任何消息。 有沒有解決這個問題的建議? 這裏是我的卡夫卡流媒體應用 package test.kafkastream; import java.util.Properties; import org.apache.kafka.clients.consumer.Con