我有收到一個具有以下信息事件的話題:阿帕奇卡夫卡流互動式查詢 - 如何創建一個賣場裏的值是一個實體,而不是一個聚集
鍵 - >orderId
(整數)
價值 - >{"orderId" : aaa, "productId" : xxx, "userId" : yyy, "state" : "zzz"}
(JSON與訂單的全部信息)
我想實現一個交互式查詢,以通過orderId獲取完整的訂單信息。這個想法能夠從物化視圖(Kafka Streams存儲)中獲取訂單的當前狀態。
首先,我創建主題的KStream:
KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC);
然後,我創建一個KTable
將其分配到一家商店。問題是,顯然我只能創建商店,其價值是一個聚合,例如:stream.groupByKey().count("myStore");
我需要的商店應該有整個訂單信息,而不是聚合。這可能嗎?
喜馬蒂亞斯,這幾乎是我需要的,爲簡單起見,我已經省略了一步:在創建'stream'後,有一個過濾操作排除了一些命令。也就是說,我必須從「KStream」創建「KTable」,而不是直接從主題創建。從鏈接信息,我想我將不得不使用虛擬聚合權嗎? – codependent
還有別的。我使用了虛擬聚合:'.groupByKey()。reduce((val1,val2) - > val2,「OrdersStore」);'問題是存在序列化錯誤。我如何將值序列化指定爲JSON? 'props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,XXXX);' – codependent
修正了它使用自定義Serde:'props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,JsonSerde.class.getName());'''''''''public class JsonSerde implements Serde { private serializer serializer; 私有解串器解串器; ...''' –
codependent