2017-07-10 27 views
0

我正在使用Kafka Streams v.0.10.2.0在主題之間進行簡單處理之間的流式傳輸。最近我遇到了一個問題,其中一家經紀商倒閉,卡夫卡流應用程序關閉,直到我手動重新啓動它。嘗試調試這個問題我無法從日誌究竟是什麼造成這種理解,這裏是日誌摘錄:重新啓動正在關閉的Kafka Streams應用程序無例外

INFO [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned partitions [topicname-3, topicname-1, topicname-2] for group streams-group 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] partitions [[topicname-3, topicname-1, topicname-2]] revoked at the beginning of consumer rebalance. 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task's topology 0_1 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task's topology 0_2 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task's topology 0_3 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_1 
INFO [kafka-coordinator-heartbeat-thread | streams-group] o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 127.0.0.1:9092 dead for group streams-group 
INFO [kafka-coordinator-heartbeat-thread | streams-group] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator 127.0.0.1:9092 for group streams-group. 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_2 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_3 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Committing consumer offsets of task 0_1 
ERROR [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Failed while executing StreamTask 0_1 due to commit consumer offsets: 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Updating suspended tasks to contain active tasks [[0_1, 0_2, 0_3]] 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all active tasks [[0_1, 0_2, 0_3]] 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all standby tasks [[]] 
ERROR [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group streams-group failed on partition revocation 
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group streams-group 
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator dead for group streams-group 
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator for group streams-group. 
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group streams-group 
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Constructed client metadata ... 
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor 
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor 
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Assigned tasks to clients as {...=[activeTasks: ([0_0, 0_4]) assignedTasks: ([0_0, 0_4]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.2], ...=[activeTasks: ([0_1, 0_2, 0_3]) assignedTasks: ([0_1, 0_2, 0_3]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.30000000000000004]}. 
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - Successfully joined group streams-group with generation 17 
INFO [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [topicname-3, topicname-1, topicname-2] for group streams-group 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] New partitions [[topicname-3, topicname-1, topicname-2]] assigned at the end of consumer rebalance. 
INFO [StreamThread-1] o.a.k.s.p.i.StreamTask - task [0_1] Initializing processor nodes of the topology 
INFO [StreamThread-1] o.a.k.s.p.i.StreamTask - task [0_2] Initializing processor nodes of the topology 
INFO [StreamThread-1] o.a.k.s.p.i.StreamTask - task [0_3] Initializing processor nodes of the topology 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Shutting down 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task 0_1 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task 0_2 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task 0_3 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_1 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_2 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_3 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing the state manager of task 0_1 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing the state manager of task 0_2 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing the state manager of task 0_3 
INFO [StreamThread-1] o.a.k.c.p.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all active tasks [[0_1, 0_2, 0_3]] 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all standby tasks [[]] 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Stream thread shutdown complete 
WARN [StreamThread-1] o.a.k.s.p.i.StreamThread - Unexpected state transition from RUNNING to NOT_RUNNING 

,首先它似乎不太可能,處理是花費很長的時間,因爲它是非常簡單,該應用程序運行了幾個月,在日誌中沒有類似的消息。

同樣從日誌中判斷kafka流成功地重新加入了組,但然後突然它只是關閉而沒有例外。我在不同的機器上運行了兩個流應用程序,並且在代理程序重新啓動的同時關閉了這兩個應用程序。

如何調試此問題?它不應該至少拋出異常嗎? 另一個問題是,雖然流線程關閉了應用程序的其餘部分工作正常,所以它不會自動重新啓動。我能以某種方式捕捉這個並重新啓動線程嗎?保留策略使消費者非常不滿意,我怎樣才能讓kafka流應用更可靠?

謝謝!

回答

1

從日誌中很難說。也許DEBUG日誌會顯示更多信息...

唯一的「黑暗中拍攝」可能是,在Initializing processor nodes of the topology期間出現錯誤。但是如果有例外,它應該在日誌中。它也可能是庫中的一個bug。

關於監控您的應用程序,你有多種選擇:

  • 可以將KafkaStreams#setUncaughtExceptionHandler()註冊以查看是否有異常泡出如果StreamThread,因此線程死亡
  • 你可以註冊一個KafkaStreams#setStateListener()看如果應用程序進入NOT_RUNNING狀態(順便說一句:在0.10.2和0.11.0中有一個與NOT_RUNNING狀態有關的已知問題 - 剛剛在trunk中得到修復:如果所有線程都死了,狀態可能仍然是RUNNING,所以您應該監視仍在手動運行的線程數)

順便說一句:我會建議升級到0.10.2.1,其中包含多個重要的錯誤修復。

相關問題