我正在使用kafka-python來使用來自kafka隊列(kafka版本0.10.2.0)的消息。特別是我使用KafkaConsumer類型。 如果消費者停下來,經過一段時間後重新啓動,我想從最新生成的消息重新啓動,即刪除消費者關閉期間生成的所有消息。 我怎樣才能做到這一點?kafka-python在消費者重新啓動後從最後生成的消息中讀取
感謝
我正在使用kafka-python來使用來自kafka隊列(kafka版本0.10.2.0)的消息。特別是我使用KafkaConsumer類型。 如果消費者停下來,經過一段時間後重新啓動,我想從最新生成的消息重新啓動,即刪除消費者關閉期間生成的所有消息。 我怎樣才能做到這一點?kafka-python在消費者重新啓動後從最後生成的消息中讀取
感謝
你會不會seekToEnd()
到日誌的結尾。
請記住,您首先需要訂閱一個主題,然後才能尋找。另外,訂閱是懶惰的。因此,您也需要添加一個「虛擬輪詢」,然後才能查找。
consumer.subscribe(...)
consumer.poll() // dummy poll
consumer.seekToEnd()
// now enter your regular poll-loop
感謝,
它的作品!
這是我的代碼的簡化versione:
consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
#dummy poll
consumer.poll()
#go to end of the stream
consumer.seek_to_end()
#start iterate
for message in consumer:
print(message)
consumer.close()
The documentation指出poll()方法是與所述迭代器接口,其中我想不相容是一個我在循環結束時使用我的劇本。然而,從最初的測試來看,這段代碼看起來像正常工作。
安全使用它嗎?還是我誤解了文獻?
感謝
在回答你的問題在你的答案:
這是我的理解是,當你執行consumer.poll()
返回一本字典。所以,當我想輪詢信息時,我用循環遍歷字典。
consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
messages = consumer.poll()
data = []
for msg in messages:
for value in messages[msg]:
#Add just the values to the list
data.append(value[6])
我相信你在做什麼,也越來越與consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
迭代器,然後走在迭代器
#start iterate
for message in consumer:
print(message)
它看起來並不像你實際上得到剛剛從調查的500個結果。您可以通過將max_poll_records=5
添加到您的KafkaConsumer配置中進行確認。然後,當您運行代碼時,如果打印出超過5條消息,則可以確定您沒有使用輪詢功能。
希望有幫助!
這裏是有一個民意調查中以列表的所有郵件的快捷方法:
while True:
messages = [] # Store all messages
crs = [] # Store all consumer records
tpd = consumer.poll(timeout_ms=60000, max_records=1)
[ crs.extend(tp) for tp in tpd.values() ] # List of cr's
[ messages.extend([json.loads(cr.value)]) for cr in crs ]
print messages
注意的是,這裏消息是JSON,但一個可以跳過負載。 –