我有一個生成一個商店這個簡單的KTable定義更新的原因:卡夫卡流 - 解釋爲什麼KTable及其相關商店只能獲得每30秒
KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE);
table.print();
我發佈消息到ORDERS_TOPIC但商店ISN」直到每30秒真正更新一次。這是有一個關於承諾,因爲30000ms時間已過消息日誌:
2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] o.a.k.c.consumer.internals.Fetcher : Sending fetch for partitions [orders-0] to broker EXPRF026.SUMINISTRADOR:9092 (id: 0 rack: null)
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.processor.internals.StreamTask : task [0_0] Committing its state
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : task [0_0] Flushing all stores registered in the state manager
f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec
{"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}
[KTABLE-SOURCE-0000000001]: f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec , ({"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}<-null)
2017-07-25 23:53:15.569 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.state.internals.ThreadCache : Thread order-service-streams-16941f70-87b3-45f4-88de-309e4fd22748-StreamThread-1 cache stats on flush: #puts=1, #gets=1, #evicts=0, #flushes=1
2017-07-25 23:53:15.576 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.internals.RecordCollectorImpl : task [0_0] Flushing producer
我發現,控制該屬性是commit.interval.ms
:
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
爲什麼設置爲30000ms默認情況下(聽起來很長時間),將其改爲10ms有什麼意義?
如果不是一個KTable我有KStream工作...
KStream<String, JsonNode> kStream = kStreamBuilder.stream(ORDERS_TOPIC);
kStream.print();
...我能看見消息向右走,無需等待那些30000ms,爲什麼不同?