使用kafka-python api向主題發送一串消息。消息的一部分獲得成功發送到主題,但該方案與以下錯誤消息終止之前不是所有的人都送:KeyError:kafka.producer.record_accumulator.RecordBatch
KeyError: <kafka.producer.record_accumulator.RecordBatch object at 0x143d290>
Batch is already closed -- ignoring batch.done()
Error processing errback
Traceback (most recent call last):
File "/usr/lib/python2.6/site-packages/kafka/future.py", line 79, in _call_backs
f(value)
File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 185, in _failed_produce
self._complete_batch(batch, error, -1, None)
File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 243, in _complete_batch
self._accumulator.deallocate(batch)
File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 507, in deallocate
self._incomplete.remove(batch)
File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 587, in remove
return self._incomplete.remove(batch)
每運行我的主題實際上是獲得了不同數量的消息。問題似乎是kafka producer.send調用在程序結束前沒有完成發送。
根據卡夫卡單證producer.send是一個異步方法,它可能是根本原因 - 不是所有的異步線程完成發送進程被殺死之前:
The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
有許多這種天真的解決方案(例如將batch.size
設置爲較低的數字),這可能會導致性能瓶頸。
你會如何解決這個問題而不影響性能太多?
這是我嘗試的第一件事。正如在描述中指出的那樣,生產者看起來已經派遣了異步調用來發送消息,但他們還沒有完成,可能是因爲批量大小不夠小。 – r2d2oid
這不提供問題的答案。一旦你有足夠的[聲譽](https://stackoverflow.com/help/whats-reputation),你將可以[對任何帖子發表評論](https://stackoverflow.com/help/privileges/comment);相反,[提供不需要提問者澄清的答案](https://meta.stackexchange.com/questions/214173/why-do-i-need-50-reputation-to-comment-what-can- I-DO-代替)。 - [來自評論](/ review/low-quality-posts/17956884) – demonplus
@ r2d2oid你的問題很簡單。你自己解釋它 - 生產者發送所有消息之前的程序退出。試圖通過批量處理來解決這個問題是不正確的。如果你有緩衝區中的東西,你必須等到發送完成。 [flush](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)會阻止你的程序,直到所有的東西都會被髮送,就是這樣。如果您在發送最後一條記錄後調用'producer.flush()'後遇到此錯誤,那麼我錯了,只是不理解您的問題(kafka-python中的錯誤?)。 – Loki