我有一個具有以下驅動程序代碼的流應用程序,用於實時消息轉換。使用UncaughtExceptionHandler重新啓動或關閉流的正確方法
String topicName = ...
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(topicName);
source.transform(() -> new MyTransformer()).to(...);
KafkaStreams streams = new KafkaStreams(builder, appConfig);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
logger.error("UncaughtExceptionHandler " + e.getMessage());
System.exit(0);
}
});
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
經過幾分鐘的執行後,應用程序拋出下面的異常,然後不通過流進展。
[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101)
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
... 13 more
我試着衝了/tmp/kafka-streams/TRANSFORMATION-APP
目錄並重新啓動應用程序,但再次拋出同樣的異常。我注意到的一件事是,應用程序工作正常,直到它轉換所有積壓消息,但在處理一些新消息後拋出異常!
有時它也會拋出下面未捕獲的異常。
[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created
[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed
to rebalance
拋出這些異常之一後,應用程序仍在運行,但沒有在流中進行。
處理這些錯誤的正確方法是什麼?是否有可能以編程方式重新啓動流,而不殺死應用程序?這個程序是在monit。在最壞的情況下,我寧願正確終止應用程序(沒有任何消息丟失),以便monit可以重新啓動它。
輸入主題有100個分區,我在應用程序配置中將num.stream.threads
設置爲100。該應用程序在Kafka 0.10.1.1-cp1.
謝謝。我用單線程重新啓動了應用程序。它運行良好一段時間,但拋出'未能重新平衡'異常,這是'UncaughtExceptionHandler'中捕獲的異常。爲了解決這個問題,我增加了內部生產者/消費者的rebalance.backoff.ms和zookeeper.session.timeout.ms參數。現在它似乎運行良好! – Samy
作爲後續工作,是否有任何最佳實踐要完全關閉流實例?在我的'UncaughtExceptionHandler'中啓動流之前,我使用'streams :: close'。我沒有後果.. – Samy
沒關係。您應該只有兩個線程不能同時調用'streams :: close' - 這可能會導致死鎖。否則,可以在異常處理程序中關閉。 –