我正在使用Kafka Streams v.0.10.2.0在主題之間進行簡單處理之間的流式傳輸。最近我遇到了一個問題,其中一家經紀商倒閉,卡夫卡流應用程序關閉,直到我手動重新啓動它。嘗試調試這個問題我無法從日誌究竟是什麼造成這種理解,這裏是日誌摘錄:重新啓動正在關閉的Kafka Streams應用程序無例外
INFO [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned partitions [topicname-3, topicname-1, topicname-2] for group streams-group
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] partitions [[topicname-3, topicname-1, topicname-2]] revoked at the beginning of consumer rebalance.
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task's topology 0_1
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task's topology 0_2
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task's topology 0_3
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_1
INFO [kafka-coordinator-heartbeat-thread | streams-group] o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 127.0.0.1:9092 dead for group streams-group
INFO [kafka-coordinator-heartbeat-thread | streams-group] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator 127.0.0.1:9092 for group streams-group.
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_2
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_3
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Committing consumer offsets of task 0_1
ERROR [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Failed while executing StreamTask 0_1 due to commit consumer offsets:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Updating suspended tasks to contain active tasks [[0_1, 0_2, 0_3]]
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all active tasks [[0_1, 0_2, 0_3]]
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all standby tasks [[]]
ERROR [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group streams-group failed on partition revocation
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group streams-group
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator dead for group streams-group
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator for group streams-group.
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group streams-group
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Constructed client metadata ...
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Assigned tasks to clients as {...=[activeTasks: ([0_0, 0_4]) assignedTasks: ([0_0, 0_4]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.2], ...=[activeTasks: ([0_1, 0_2, 0_3]) assignedTasks: ([0_1, 0_2, 0_3]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.30000000000000004]}.
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - Successfully joined group streams-group with generation 17
INFO [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [topicname-3, topicname-1, topicname-2] for group streams-group
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] New partitions [[topicname-3, topicname-1, topicname-2]] assigned at the end of consumer rebalance.
INFO [StreamThread-1] o.a.k.s.p.i.StreamTask - task [0_1] Initializing processor nodes of the topology
INFO [StreamThread-1] o.a.k.s.p.i.StreamTask - task [0_2] Initializing processor nodes of the topology
INFO [StreamThread-1] o.a.k.s.p.i.StreamTask - task [0_3] Initializing processor nodes of the topology
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Shutting down
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task 0_1
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task 0_2
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task 0_3
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_1
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_2
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_3
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing the state manager of task 0_1
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing the state manager of task 0_2
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing the state manager of task 0_3
INFO [StreamThread-1] o.a.k.c.p.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all active tasks [[0_1, 0_2, 0_3]]
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all standby tasks [[]]
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Stream thread shutdown complete
WARN [StreamThread-1] o.a.k.s.p.i.StreamThread - Unexpected state transition from RUNNING to NOT_RUNNING
,首先它似乎不太可能,處理是花費很長的時間,因爲它是非常簡單,該應用程序運行了幾個月,在日誌中沒有類似的消息。
同樣從日誌中判斷kafka流成功地重新加入了組,但然後突然它只是關閉而沒有例外。我在不同的機器上運行了兩個流應用程序,並且在代理程序重新啓動的同時關閉了這兩個應用程序。
如何調試此問題?它不應該至少拋出異常嗎? 另一個問題是,雖然流線程關閉了應用程序的其餘部分工作正常,所以它不會自動重新啓動。我能以某種方式捕捉這個並重新啓動線程嗎?保留策略使消費者非常不滿意,我怎樣才能讓kafka流應用更可靠?
謝謝!