2017-07-25 15 views
1

我有一個生成一個商店這個簡單的KTable定義更新的原因:卡夫卡流 - 解釋爲什麼KTable及其相關商店只能獲得每30秒

KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE); 
table.print(); 

我發佈消息到ORDERS_TOPIC但商店ISN」直到每30秒真正更新一次。這是有一個關於承諾,因爲30000ms時間已過消息日誌:

2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] o.a.k.c.consumer.internals.Fetcher  : Sending fetch for partitions [orders-0] to broker EXPRF026.SUMINISTRADOR:9092 (id: 0 rack: null) 
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread   : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed 
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread   : stream-thread [StreamThread-1] Committing task StreamTask 0_0 
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.processor.internals.StreamTask : task [0_0] Committing its state 
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.i.ProcessorStateManager  : task [0_0] Flushing all stores registered in the state manager 
f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec 
{"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"} 
[KTABLE-SOURCE-0000000001]: f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec , ({"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}<-null) 
2017-07-25 23:53:15.569 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.state.internals.ThreadCache  : Thread order-service-streams-16941f70-87b3-45f4-88de-309e4fd22748-StreamThread-1 cache stats on flush: #puts=1, #gets=1, #evicts=0, #flushes=1 
2017-07-25 23:53:15.576 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.internals.RecordCollectorImpl : task [0_0] Flushing producer 

我發現,控制該屬性是commit.interval.ms

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); 

爲什麼設置爲30000ms默認情況下(聽起來很長時間),將其改爲10ms有什麼意義?

如果不是一個KTable我有KStream工作...

KStream<String, JsonNode> kStream = kStreamBuilder.stream(ORDERS_TOPIC); 
kStream.print(); 

...我能看見消息向右走,無需等待那些30000ms,爲什麼不同?

回答

3

它的內存管理相關特別是KTable緩存:http://docs.confluent.io/current/streams/developer-guide.html#memory-management

KTable實際更新所有的時間,如果你使用"Interactive Queries"訪問底層狀態商店,你可以立即獲得每個更新。但是,KTable緩存會緩存更新以減少下游負載,並且每次觸發提交時,都需要將此緩存刷新到下游以避免數據在失敗時丟失。如果緩存大小很小,如果密鑰從緩存中被逐出,您還可能會看到下游記錄。

關於提交間隔:通常,提交間隔設置爲相對較大的值,以減少對代理的提交負載。