2017-07-17 11 views
0

我有收到一個具有以下信息事件的話題:阿帕奇卡夫卡流互動式查詢 - 如何創建一個賣場裏的值是一個實體,而不是一個聚集

鍵 - >orderId(整數)

價值 - >{"orderId" : aaa, "productId" : xxx, "userId" : yyy, "state" : "zzz"}(JSON與訂單的全部信息)

我想實現一個交互式查詢,以通過orderId獲取完整的訂單信息。這個想法能夠從物化視圖(Kafka Streams存儲)中獲取訂單的當前狀態。

首先,我創建主題的KStream:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC);

然後,我創建一個KTable將其分配到一家商店。問題是,顯然我只能創建商店,其價值是一個聚合,例如:stream.groupByKey().count("myStore");

我需要的商店應該有整個訂單信息,而不是聚合。這可能嗎?

回答

0

您可以閱讀主題直接作爲KTable,太:

KTable<Integer, JsonNode> stream = kStreamBuilder.table(integerSerde, jsonSerde, STREAMING_TOPIC, "store-name-for-IQ"); 

此FAQ也可能有幫助:http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

+0

喜馬蒂亞斯,這幾乎是我需要的,爲簡單起見,我已經省略了一步:在創建'stream'後,有一個過濾操作排除了一些命令。也就是說,我必須從「KStream」創建「KTable」,而不是直接從主題創建。從鏈接信息,我想我將不得不使用虛擬聚合權嗎? – codependent

+0

還有別的。我使用了虛擬聚合:'.groupByKey()。reduce((val1,val2) - > val2,「OrdersStore」);'問題是存在序列化錯誤。我如何將值序列化指定爲JSON? 'props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,XXXX);' – codependent

+0

修正了它使用自定義Serde:'props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,JsonSerde.class.getName());'''''''''public class JsonSerde implements Serde { private serializer serializer; 私有解串器解串器; ...''' – codependent

相關問題