2016-12-30 101 views
2

我試圖用卡夫卡流0.10.1在Scala中創建一個簡單的聚合示例,儘管我似乎失敗了一個簡單的「count」聚合(使用Kafka控制檯製片人)。有了這樣的代碼:Kafka Streams 0.10.1「無法刷新狀態存儲」

val inputStream: KStream[String, String] = builder.stream("inputTopic") 

inputStream 
    .map(new KeyValueMapper[String, String, KeyValue[String, String]] { 
    override def apply(k: String, v: String): KeyValue[String, String] = { 
     new KeyValue[String, String](v, v) 
    } 
    }) 
    .groupByKey() 
    .count(TimeWindows.of(10000L), "count-test-1") 
    .toStream() 
    .to("outputTopic") 

它失敗,「無法刷新狀態存儲計數測試-1」,我已經包含在帖子的末尾完整堆棧跟蹤。在另一方面,如果我用的而不是()它就像一個魅力,打印出結果到控制檯/終端打印():

[KTABLE-TOSTREAM-0000000013]: [[email protected]] , 1 
[KTABLE-TOSTREAM-0000000013]: [[email protected]] , 1 
[KTABLE-TOSTREAM-0000000013]: [[email protected]] , 2 
[KTABLE-TOSTREAM-0000000013]: [[email protected]] , 3 
[KTABLE-TOSTREAM-0000000013]: [[email protected]] , 4 

有沒有人有任何想法可能是這樣的原因行爲?我使用的操作系統是Windows 10作爲主機(也通過IntelliJ運行Scala應用程序)和Ubuntu 16.04 VM(用於Kafka(在Docker容器中)以及生產者/消費者應用程序)。但是,我可以確認在Ubuntu VM上運行應用程序時可能會遇到問題。

非常感謝提前對你的幫助,任何見解表示讚賞:-)

完整堆棧跟蹤:

2016-12-30 08:57:43 INFO StreamThread:573 - stream-thread [StreamThread-1] Committing task 2_0 
2016-12-30 08:57:43 ERROR StreamThread:582 - stream-thread [StreamThread-1] Failed to commit StreamTask 2_0 state: 
org.apache.kafka.streams.errors.ProcessorStateException: task [2_0] Failed to flush state store count-test-1 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331) 
     at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:275) 
     at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:576) 
     at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:562) 
     at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:538) 
     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:456) 
     at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String 
     at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24) 
     at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:72) 
     at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72) 
     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
     at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) 
     at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
     at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86) 
     at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) 
     at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118) 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329) 
     ... 6 more 
2016-12-30 08:57:43 INFO StreamThread:268 - stream-thread [StreamThread-1] Shutting down 
2016-12-30 08:57:43 INFO StreamThread:358 - stream-thread [StreamThread-1] Committing consumer offsets of task 0_0 
2016-12-30 08:57:43 INFO StreamThread:358 - stream-thread [StreamThread-1] Committing consumer offsets of task 1_0 
2016-12-30 08:57:43 INFO StreamThread:358 - stream-thread [StreamThread-1] Committing consumer offsets of task 2_0 
2016-12-30 08:57:43 INFO StreamThread:751 - stream-thread [StreamThread-1] Closing a task 0_0 
2016-12-30 08:57:43 INFO StreamThread:751 - stream-thread [StreamThread-1] Closing a task 1_0 
2016-12-30 08:57:43 INFO StreamThread:751 - stream-thread [StreamThread-1] Closing a task 2_0 
2016-12-30 08:57:43 INFO StreamThread:368 - stream-thread [StreamThread-1] Flushing state stores of task 0_0 
2016-12-30 08:57:43 INFO StreamThread:368 - stream-thread [StreamThread-1] Flushing state stores of task 1_0 
2016-12-30 08:57:43 INFO StreamThread:368 - stream-thread [StreamThread-1] Flushing state stores of task 2_0 
2016-12-30 08:57:43 ERROR StreamThread:330 - stream-thread [StreamThread-1] Failed while executing StreamTask 2_0 duet to flush state: 
org.apache.kafka.streams.errors.ProcessorStateException: task [2_0] Failed to flush state store count-test-1 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331) 
     at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:180) 
     at org.apache.kafka.streams.processor.internals.StreamThread$4.apply(StreamThread.java:369) 
     at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328) 
     at org.apache.kafka.streams.processor.internals.StreamThread.flushAllState(StreamThread.java:365) 
     at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:301) 
     at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:269) 
     at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:252) 
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String 
     at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24) 
     at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:72) 
     at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72) 
     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
     at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) 
     at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
     at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86) 
     at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) 
     at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118) 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329) 
     ... 7 more 
2016-12-30 08:57:43 INFO StreamThread:347 - stream-thread [StreamThread-1] Closing the state manager of task 0_0 
2016-12-30 08:57:43 INFO StreamThread:347 - stream-thread [StreamThread-1] Closing the state manager of task 1_0 
2016-12-30 08:57:43 INFO StreamThread:347 - stream-thread [StreamThread-1] Closing the state manager of task 2_0 
2016-12-30 08:57:43 ERROR StreamThread:330 - stream-thread [StreamThread-1] Failed while executing StreamTask 2_0 duet to close state manager: 
org.apache.kafka.streams.errors.ProcessorStateException: task [2_0] Failed to close state store count-test-1 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:351) 
     at org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:120) 
     at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:348) 
     at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328) 
     at org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:344) 
     at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:305) 
     at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:269) 
     at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:252) 
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String 
     at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24) 
     at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:72) 
     at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72) 
     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
     at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) 
     at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
     at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86) 
     at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) 
     at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:124) 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:349) 
     ... 7 more 
2016-12-30 08:57:43 INFO KafkaProducer:685 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 
2016-12-30 08:57:43 INFO StreamThread:725 - stream-thread [StreamThread-1] Removing all active tasks [[0_0, 1_0, 2_0]] 
2016-12-30 08:57:43 INFO StreamThread:740 - stream-thread [StreamThread-1] Removing all standby tasks [[]] 
2016-12-30 08:57:43 INFO StreamThread:292 - stream-thread [StreamThread-1] Stream thread shutdown complete 
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [2_0] Failed to flush state store count-test-1 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331) 
     at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:275) 
     at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:576) 
     at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:562) 
     at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:538) 
     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:456) 
     at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String 
     at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24) 
     at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:72) 
     at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72) 
     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
     at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) 
     at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
     at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86) 
     at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) 
     at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100) 
     at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118) 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329) 
     ... 6 more 
2016-12-30 08:57:43 INFO KafkaStreams:237 - Stopped Kafka Stream process 

回答

4

count(...)結果類型不是<String,Long><Windowed<String>,Long>因爲你用一個窗口聚集。因此,您的默認密鑰德/串即是String類型失敗:

Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String 

你要麼需要指定to(...)不同的密鑰德/串行或你需要把額外的map()toStream()後您的密鑰類型轉換從Windowed<String>String

如果您使用print(),它會生效,因爲沒有將序列化結果寫入Kafka主題。

+0

非常感謝!應該更多地關注堆棧跟蹤中的實際情況。 – hun7er