2014-11-03 98 views
0

我想返回一個數組。python代碼後終於沒有運行

我可以將消息數組打印到控制檯,我可以看到它已填充。 但是終於出現的代碼似乎無法訪問。我究竟做錯了什麼?

def kafka_messages(topic, partition): 
    messages = [] 

    try: 
     consumer = SimpleConsumer(kafka, b"consumer-group" 
            , bytes(topic, "UTF-8") 
            , partitions=[partition]) 
     consumer.provide_partition_info() 
     consumer.seek(0, 0) 

     for message in consumer: 
      messages.append(message) # Messages has values 

    finally: 
     if kafka: 
      kafka.close() 

    print(messages) # Never even gets run 
    return messages 
+0

是什麼'kafka.close()'做什麼?它可能掛?或者「消費者」迭代器可能是無窮無盡的? – 2014-11-03 14:02:00

+0

'''consumer'''迭代器似乎是無限的 – Lee 2014-11-03 14:07:52

+0

看起來像代碼中的某處,異常被吞噬。在'finally:'之前直接添加一個'print()'語句,這樣你就可以確保循環正常終止 – 2014-11-03 14:09:23

回答

1

有此行爲的兩個可能的原因:

  1. 環路不會終止(即consumer不停止返回元件)
  2. 代碼拋出異常。

在行finally:的前面加上一個print('Loop terminated'),以確定該循環是否終止。

如果沒有,那麼您需要閱讀SimpleConsumer的文檔以瞭解如何檢查它是否有更多元素,以便終止循環。

[編輯]望着source for SimpleConsumer,似乎有一個超時(默認爲ITER_TIMEOUT_SECONDS)時,有沒有消息,但該代碼看起來很奇怪/斷:如果iter_timeout is None,則代碼會睡和環永不終止。

因此,嘗試在創建實例時將iter_timeout設置爲小值,並且應停止循環。

+0

SimpleConsumer返回元素的數量,我用它來擺脫循環 – Lee 2014-11-03 14:57:04

+0

@JohnDoe:你應該發佈你的解決方案,否則人們會想知道它是什麼。 – 2014-11-03 15:33:49

0

這裏是我做過什麼:

def kafka_messages(topic, partition): 
    messages = [] 

    try: 
     consumer = SimpleConsumer(kafka, b"consumer-group" 
            , bytes(topic, "UTF-8") 
            , partitions=[partition]) 
     consumer.provide_partition_info() 
     consumer.seek(0, 0) 
     pending = consumer.pending(partitions=[partition]) # Comes with the API being used 

     count = 1 
     for message in consumer: 
      if count == pending: 
       break # Simply break out when you have iterated through all the items 
      messages.append(message) 
      count += 1 

    finally: 
     if kafka: 
      kafka.close() 

    return messages