2017-02-22 143 views
4

我有一個具有以下驅動程序代碼的流應用程序,用於實時消息轉換。使用UncaughtExceptionHandler重新啓動或關閉流的正確方法

String topicName = ... 
KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, String> source = builder.stream(topicName); 

source.transform(() -> new MyTransformer()).to(...); 

KafkaStreams streams = new KafkaStreams(builder, appConfig); 
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 
    public void uncaughtException(Thread t, Throwable e) { 
     logger.error("UncaughtExceptionHandler " + e.getMessage()); 
     System.exit(0); 
    } 
}); 


streams.cleanUp(); 
streams.start(); 

Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 

經過幾分鐘的執行後,應用程序拋出下面的異常,然後不通過流進展。

[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager 
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72) 
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89) 
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) 
    at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) 
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11 
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101) 
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69) 
    ... 13 more 

我試着衝了/tmp/kafka-streams/TRANSFORMATION-APP目錄並重新啓動應用程序,但再次拋出同樣的異常。我注意到的一件事是,應用程序工作正常,直到它轉換所有積壓消息,但在處理一些新消息後拋出異常!

有時它也會拋出下面未捕獲的異常。

[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created 

[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed 
to rebalance 

拋出這些異常之一後,應用程序仍在運行,但沒有在流中進行。

處理這些錯誤的正確方法是什麼?是否有可能以編程方式重新啓動流,而不殺死應用程序?這個程序是在monit。在最壞的情況下,我寧願正確終止應用程序(沒有任何消息丟失),以便monit可以重新啓動它。

輸入主題有100個分區,我在應用程序配置中將num.stream.threads設置爲100。該應用程序在Kafka 0.10.1.1-cp1.

回答

4

Kakfa 0.10.1.x有一些關於多線程的錯誤。你可以升級到0.10.2(AK今天公佈,CP 3.2應該會很快跟進)或應用以下解決方法:

  • 使用單個線程的執行僅僅是
  • ,如果你需要更多的線程,啓動多個實例
  • 對於每個實例,配置不同的狀態目錄

在重新啓動之前,您可能還需要刪除本地狀態目錄(只有一次)以進入總體一致的應用程序狀態。

無論如何,都不會有數據丟失。即使在失敗的情況下,Kafka Streams也能保證至少處理一次語義。這也適用於你本地的商店 - 在你刪除本地狀態目錄之後,啓動時這些狀態將從底層的Kafka更新日誌主題(儘管這是一個昂貴的操作)重新創建。

UncaughtExceptionHandler只能爲您提供一種方法來找出線程死亡。它不(直接)幫助重新啓動你的應用程序。要恢復死亡線程,您需要完全關閉KafkaStreams實例並創建/啓動一個新實例。我們希望在未來爲此增加更好的支持。

+1

謝謝。我用單線程重新啓動了應用程序。它運行良好一段時間,但拋出'未能重新平衡'異常,這是'UncaughtExceptionHandler'中捕獲的異常。爲了解決這個問題,我增加了內部生產者/消費者的rebalance.backoff.ms和zookeeper.session.timeout.ms參數。現在它似乎運行良好! – Samy

+0

作爲後續工作,是否有任何最佳實踐要完全關閉流實例?在我的'UncaughtExceptionHandler'中啓動流之前,我使用'streams :: close'。我沒有後果.. – Samy

+1

沒關係。您應該只有兩個線程不能同時調用'streams :: close' - 這可能會導致死鎖。否則,可以在異常處理程序中關閉。 –

相關問題