2016-01-14 91 views

回答

30

此例外表明您正在以比它們可以發送更快的速度對記錄進行排隊。

當您調用send方法時,ProducerRecord將存儲在內部緩衝區中以發送給代理。一旦ProducerRecord已被緩衝,該方法立即返回,不管它是否已被髮送。

記錄被分組爲批次用於發送給代理,以減少每個消息被偷聽的傳輸並提高吞吐量。

一旦一個記錄添加一個批次,發送該批次的時間限制就會確保它在指定的時間內發送。這由Producer配置參數request.timeout.ms控制,默認值爲30秒。

如果批處理已經排隊超過超時限制,則會拋出異常。該批次中的記錄將從發送隊列中刪除。

使用配置參數增加超時限制將允許客戶端在到期之前將批次排隊更長時間。

+0

我已經回覆了您的評論,請告訴我,如果您有任何建議。 –

+0

我想知道是否將'batch.size'設置爲0(或1和標準值之間的值)是否可以更好地解決問題? –

+0

Hi @JamesThomas,「表示你正在以比他們可以發送更快的速度排隊記錄」,如果我根本不想排隊呢?我們的生產環境中會有大量流量,我們希望儘快發送數據。我們不希望看到這個過期。我們已將linger.ms設置爲默認值,但仍然出現此問題。 你說增加request.timeout.ms會增加批量範圍。 –

3

發送給代理之前控制時間的參數是linger.ms。其默認值是0(無延遲)。

+0

從原始問題可能不清楚我的情況發生了什麼例如,我試圖提供更多細節以使其更清晰。有關完整信息,請參閱下面的註釋。這是一個問題,我排隊記錄比我的上傳帶寬更快。 –

22

我在完全不同的上下文中得到了這個異常。

我已經設置了zookeeper vm,broker vm和producer/consumer vm的小型集羣。我打開了服務器(9092)和動物園管理員(2181)上的所有必需端口,然後嘗試從消費者/發佈者vm向代理髮布消息。我得到了OP提到的異常,但由於我迄今爲止只發布了一條消息(或者至少我嘗試過),所以解決方案不能增加超時或批量大小。所以我搜索了一下,發現這個郵件列表描述了我在嘗試使用消費者/生產者vm(ClosedChannelException)中的消息時遇到的類似問題:http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config 此郵件列表中的最後一篇文章實際上描述瞭如何解決該問題。

長話短說,如果你面對兩個ChannelClosedExceptionBatch Expired例外,你可能需要更改此行的server.config文件中的以下並重新啓動代理:

advertised.host.name=<broker public IP address> 

如果不是那麼它會回落到host.name屬性(它可能不會被設置),然後回退到Java類的規範主機名,最終當然不正確,因此會導致遠程節點混淆。

+4

我可以證實這位先生的回答 – x4k3p

+0

聽衆= PLAINTEXT://域名:9092端口= 9092 host.name =本地主機 advertised.host.name = domain_name.i可以在本地發送消息,但是當我讓卡夫卡服務器生效時!即時獲取此異常「org.apache.kafka.common.errors.TimeoutException:批量過期 java.util.concurrent.ExecutionException」。我在哪裏出錯 –

+0

如果其他人正在查看,配置文件位於'/config/server.properties'中用於'0.10.2.1'。 – Edd

-1

當您創建使用者設置ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG爲true時。

1

我正在使用Kafka Java客戶端版本0.11.0.0。我也開始看到沒有一致地產生大量消息的相同模式。它傳遞了很少的信息,並失敗了一些其他人。 (雖然通過和失敗的郵件都是相同的大小)。在我的情況下,每封郵件大小約60KB,這遠高於卡夫卡的默認batch.size 16kB,我的linger.ms被設置爲默認值0.此錯誤正在因爲生產者客戶端在它能夠從服務器接收到成功的響應之前超時。基本上,在我的代碼中,這個呼叫超時:kafkaProd.send(pr).get()。爲了解決這個問題,我不得不將生產者客戶端的默認request.timeout.ms增加到60000

相關問題