2017-09-13 123 views
0

我有一個卡夫卡流應用程序版本 - 0.11從幾個主題中獲取數據,並加入了數據,並把它放在另一個話題。卡夫卡流啓動文檔 - org.apache.kafka.streams.errors.LockException

卡夫卡配置:

5 kafka brokers - version 0.11 
Kafka Topics - 15 partitions and 3 replication factor. 

的幾百萬美元的記錄被消費/製造每隔一小時。每當我採取任何卡夫卡經紀人下來,它拋出以下異常:

org.apache.kafka.streams.errors.LockException: task [4_10] Failed to lock the state directory for task 4_10 
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99) 
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80) 
    at org.apache.kafka.streams.processor.internals.StandbyTask.<init>(StandbyTask.java:62) 
    at org.apache.kafka.streams.processor.internals.StreamThread.createStandbyTask(StreamThread.java:1325) 
    at org.apache.kafka.streams.processor.internals.StreamThread.access$2400(StreamThread.java:73) 
    at org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:313) 
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:1366) 
    at org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:73) 
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:185) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) 

我已經在那cleaningUp流可能有助於解決這個問題幾JIRA問題讀取。但是每次我們開始Kafka Stream應用程序時清理流是一個正確的解決方案或補丁?另外,stream cleanUp會延遲應用程序啓動嗎?

注:我需要調用streams.start()之前調用streams.cleanUp(),每次啓動卡夫卡流應用

回答

1

眼看org.apache.kafka.streams.errors.LockException: task [4_10] Failed to lock the state directory for task 4_10實際預期,可以自行解決。該線程將退出,以等待另一個線程釋放鎖並稍後重試。因此,如果在第二個線程釋放鎖之前發生重試,您甚至可能會多次看到此WARN消息是日誌。

然而,最終鎖定應該是第二個線程,第一個線程釋放就能獲得鎖。之後,Streams應該繼續前進。請注意,這是WARN消息,不是錯誤。

+0

但解決此警告需要很長時間。是否有配置來減少重試的次數。 –

+0

理想情況下,我應該調用清理()每次我開始我的流實例? –

+0

不,你不應該!它會引入很多不必要的開銷!比較「提示」對話框中的文檔:https://docs.confluent.io/current/streams/developer-guide.html#step-2-reset-the-local-environments-of-your-application-instances –