2017-01-03 66 views
1

我使用合流的kafka連接器3.0.1版本。我創建了一個名爲的新組,,其中有大約20個主題。這些主題中的大多數都很忙。但它可惜的是,當我啓動連接器框架時,系統無法停止重新平衡,大約2分鐘後所有主題的重新平衡。我不知道原因。 一些錯誤消息的是:合流的Kafka連接器 - 無法停止重新平衡

[2017-01-03 21:43:57,718] ERROR Commit of WorkerSinkTask{id=new-connector-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:180) 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) 
     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) 
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
: 

我不知道是否有什麼關係不斷重新平衡。

我知道,如果KafkaConsumer.poll()比配置的超時時間長,卡夫卡將撤消分區,因此重新平衡被觸發,但我確信每次輪詢都不是那麼長。 有人可以給我一些線索嗎?

回答

1

我覺得max.poll.records可以解決this.It是格格不入的,但必須在每次循環處理記錄數。在0.10中有max.poll.records,這對每次調用返回的記錄數量都有一個上限。

還如每匯合,consumer.poll()應具有例如30相當高的會話超時爲60秒。

您可能還需要微調:

session.timeout.ms 
heartbeat.interval.ms 
max.partition.fetch.bytes 
+0

是的,當我花太多把輪詢結果放到hdfs中,然後重新平衡。我優化了我的代碼,重新平衡變得很少見。 – wuchang

0

考慮升級到0.10.1或更高,因爲消費者在這些版本增強,更好地處理電話之間更長的時間來輪詢()。

如果你正在服用超過5分鐘,把結果放到HDFS可以增加新的max.poll.interval.ms參數。這將阻止您的消費者被淘汰出消費者羣體而無法取得進展。

在0.10.1版本說明它說

新的Java消費者現在可以支持從後臺線程 心跳。有哪些 控制輪詢調用之間的最大時間消費者 之前將主動離開組(5分鐘默認情況下)的新配置max.poll.interval.ms。值 配置request.timeout.ms必須始終大於 max.poll.interval.ms,因爲這是JoinGroup 請求可以在服務器上阻止消費者重新平衡時的最長時間,因此我們有 將其默認值更改爲僅5分鐘以上。最後, session.timeout.ms的默認值已調低至10 秒,max.poll.records的默認值已更改爲500