2016-09-21 89 views
3

我使用pykafka從kafka主題獲取消息,然後執行一些過程並更新到mongodb。由於pymongodb每次只能更新一個項目,所以我啓動了100個進程。但是當開始時,一些進程發生錯誤「PartitionOwnedError和ConsumerStoppedException」。我不知道爲什麼。 謝謝。爲什麼我在啓動一些使用者時遇到錯誤PartitionOwnedError和ConsumerStoppedException

kafka_cfg = conf['kafka'] 
kafka_client = KafkaClient(kafka_cfg['broker_list'])       
topic = kafka_client.topics[topic_name]     

balanced_consumer = topic.get_balanced_consumer(
consumer_group=group, 
auto_commit_enable=kafka_cfg['auto_commit_enable'], 
zookeeper_connect=kafka_cfg['zookeeper_list'], 
zookeeper_connection_timeout_ms = kafka_cfg['zookeeper_conn_timeout_ms'], 
consumer_timeout_ms = kafka_cfg['consumer_timeout_ms'], 
) 
while(1): 
    for msg in balanced_consumer: 
     if msg is not None: 
      try: 
       value = eval(msg.value) 
       id = long(value.pop("id")) 
       value["when_update"] = datetime.datetime.now() 
       query = {"_id": id}} 

       result = collection.update_one(query, {"$set": value}, True) 
      except Exception, e: 
       log.error("Fail to update: %s, msg: %s", e, msg.value) 

>

Traceback (most recent call last): 
    File "dump_daily_summary.py", line 182, in <module> 
    dump_daily_summary.run() 
    File "dump_daily_summary.py", line 133, in run 
    for msg in self.balanced_consumer: 
    File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__ 
    message = self.consume(block=True) 
    File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 734, in consume 
    raise ConsumerStoppedException 
pykafka.exceptions.ConsumerStoppedException 

>

Traceback (most recent call last): 
    File "dump_daily_summary.py", line 182, in <module> 
    dump_daily_summary.run() 
    File "dump_daily_summary.py", line 133, in run 
    for msg in self.balanced_consumer: 
    File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__ 
    message = self.consume(block=True) 
    File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 726, in consume 
    self._raise_worker_exceptions() 
    File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 271, in _raise_worker_exceptions 
    raise ex 
pykafka.exceptions.PartitionOwnedError 

回答

1

PartitionOwnedError:檢查是否有一些後臺進程在同一consumer_group消費,也許是啓動另一個沒有足夠的可用分區消費者。

ConsumerStoppedException:你可以嘗試升級pykafka版本(https://github.com/Parsely/pykafka/issues/574

+0

感謝您的回答。主題中有100個分區,同一組中沒有其他進程。我認爲也許我開始的流程與100個分區保持平衡,新流程將抓住分區並打破平衡? – raymond

0

我遇到了同樣的問題,像你這樣的。但是,我對其他人的解決方案感到困惑,比如爲消費者添加足夠的分區或更新pykafka的版本。其實我的上面滿足了這些條件。

下面是工具的版本:

蟒蛇2.7.10

卡夫卡2.11-0.10.0.0

飼養員3.4.8

pykafka 2.5.0

這是我的代碼:

class KafkaService(object): 
    def __init__(self, topic): 
     self.client_hosts = get_conf("kafka_conf", "client_host", "string") 
     self.topic = topic 
     self.con_group = topic 
     self.zk_connect = get_conf("kafka_conf", "zk_connect", "string") 

    def kafka_consumer(self): 
     """kafka-consumer client, using pykafka 

     :return: {"id": 1, "url": "www.baidu.com", "sitename": "baidu"} 
     """ 
     from pykafka import KafkaClient 
     consumer = "" 
     try: 
      kafka = KafkaClient(hosts=str(self.client_hosts)) 
      topic = kafka.topics[self.topic] 

      consumer = topic.get_balanced_consumer(
       consumer_group=self.con_group, 
       auto_commit_enable=True, 
       zookeeper_connect=self.zk_connect, 
      ) 
     except Exception as e: 
      logger.error(str(e)) 

     while True: 
      message = consumer.consume(block=False) 
      if message: 
       print "message:", message.value 
       yield message.value 

兩個例外(ConsumerStoppedExceptionPartitionOwnedError),由函數的pykafka.balancedconsumerconsum(block=True)上升。

當然,我建議您閱讀該函數的源代碼。

有一種說法塊=真,它改變到後,程序就不會落入例外。

然後卡夫卡消費者工作正常。

0

此行爲受最近發現並正在解決的長期存在的bug影響。我們在Parse.ly中使用的解決方法是在一個環境中運行我們的消費者,當它們碰到這些錯誤時,它們會自動重新啓動它們,直到擁有所有分區。

相關問題