2017-10-10 86 views
0

使用kafka-python api向主題發送一串消息。消息的一部分獲得成功發送到主題,但該方案與以下錯誤消息終止之前不是所有的人都送:KeyError:kafka.producer.record_accumulator.RecordBatch

KeyError: <kafka.producer.record_accumulator.RecordBatch object at 0x143d290> 
Batch is already closed -- ignoring batch.done() 
Error processing errback 
Traceback (most recent call last): 
    File "/usr/lib/python2.6/site-packages/kafka/future.py", line 79, in _call_backs 
    f(value) 
    File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 185, in _failed_produce 
    self._complete_batch(batch, error, -1, None) 
    File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 243, in _complete_batch 
    self._accumulator.deallocate(batch) 
    File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 507, in deallocate 
    self._incomplete.remove(batch) 
    File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 587, in remove 
    return self._incomplete.remove(batch) 

每運行我的主題實際上是獲得了不同數量的消息。問題似乎是kafka producer.send調用在程序結束前沒有完成發送。

根據卡夫卡單證producer.send是一個異步方法,它可能是根本原因 - 不是所有的異步線程完成發送進程被殺死之前:

The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

有許多這種天真的解決方案(例如將batch.size設置爲較低的數字),這可能會導致性能瓶頸。

你會如何解決這個問題而不影響性能太多

回答

0

退出前請致電producer.flush()

+0

這是我嘗試的第一件事。正如在描述中指出的那樣,生產者看起來已經派遣了異步調用來發送消息,但他們還沒有完成,可能是因爲批量大小不夠小。 – r2d2oid

+0

這不提供問題的答案。一旦你有足夠的[聲譽](https://stackoverflow.com/help/whats-reputation),你將可以[對任何帖子發表評論](https://stackoverflow.com/help/privileges/comment);相反,[提供不需要提問者澄清的答案](https://meta.stackexchange.com/questions/214173/why-do-i-need-50-reputation-to-comment-what-c​​an- I-DO-代替)。 - [來自評論](/ review/low-quality-posts/17956884) – demonplus

+1

@ r2d2oid你的問題很簡單。你自己解釋它 - 生產者發送所有消息之前的程序退出。試圖通過批量處理來解決這個問題是不正確的。如果你有緩衝區中的東西,你必須等到發送完成。 [flush](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)會阻止你的程序,直到所有的東西都會被髮送,就是這樣。如果您在發送最後一條記錄後調用'producer.flush()'後遇到此錯誤,那麼我錯了,只是不理解您的問題(kafka-python中的錯誤?)。 – Loki