2017-04-05 205 views

回答

0

你會不會seekToEnd()到日誌的結尾。

請記住,您首先需要訂閱一個主題,然後才能尋找。另外,訂閱是懶惰的。因此,您也需要添加一個「虛擬輪詢」,然後才能查找。

consumer.subscribe(...) 
consumer.poll() // dummy poll 
consumer.seekToEnd() 

// now enter your regular poll-loop 
1

感謝,

它的作品!

這是我的代碼的簡化versione:

consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True) 
#dummy poll 
consumer.poll() 
#go to end of the stream 
consumer.seek_to_end() 
#start iterate 
for message in consumer: 
    print(message) 

consumer.close() 

The documentation指出poll()方法是與所述迭代器接口,其中我想不相容是一個我在循環結束時使用我的劇本。然而,從最初的測試來看,這段代碼看起來像正常工作。

安全使用它嗎?還是我誤解了文獻?

感謝

0

在回答你的問題在你的答案:

這是我的理解是,當你執行consumer.poll()返回一本字典。所以,當我想輪詢信息時,我用循環遍歷字典。

consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True) 
messages = consumer.poll() 
data = [] 
for msg in messages: 
    for value in messages[msg]: 
     #Add just the values to the list 
     data.append(value[6]) 

我相信你在做什麼,也越來越與consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)迭代器,然後走在迭代器

#start iterate 
for message in consumer: 
    print(message) 

它看起來並不像你實際上得到剛剛從調查的500個結果。您可以通過將max_poll_records=5添加到您的KafkaConsumer配置中進行確認。然後,當您運行代碼時,如果打印出超過5條消息,則可以確定您沒有使用輪詢功能。

希望有幫助!

0

這裏是有一個民意調查中以列表的所有郵件的快捷方法:

while True: 
    messages = [] # Store all messages 
    crs = [] # Store all consumer records 
    tpd = consumer.poll(timeout_ms=60000, max_records=1) 
    [ crs.extend(tp) for tp in tpd.values() ] # List of cr's 
    [ messages.extend([json.loads(cr.value)]) for cr in crs ] 
    print messages 
+0

注意的是,這裏消息是JSON,但一個可以跳過負載。 –

相關問題