2013-03-25 78 views
2

我有一大堆文件插入MongoDB(可能n> 100000)。我不想一次創建100000個延遲,但我不想按順序執行和等待每個查詢,因爲我有一個到MongoDB的連接池,我想充分利用它。所以我有一個發生器函數,它會產生一個DeferredLazyList消耗的延遲。達到最大遞歸深度的懶惰延遲列表

def generate_update_deferreds(collection, many_docs): 
    for doc in many_docs: 
     d = collection.update({'_id': doc['_id']}, doc, upsert=True) 
     yield d 

這是連接所述延遲upserts的生成的代碼,並且DeferredLazyList

@defer.inlineCallbacks 
def update_docs(collection, many_docs): 
    gen_deferreds = generate_update_deferreds(collection, many_docs) 
    results = yield DeferredLazyList(gen_deferreds, count=pool_size, consume_errors=True) 

DeferredLazyList類似於DeferredList,但不是接受deferreds的列表,以等待它接受一個迭代。從迭代器中檢索延遲,同時只有count延遲活動同時激活。這用於有效批量延期,因爲它們是在產生時創建的。

class DeferredLazyList(defer.Deferred): 
    """ 
    The ``DeferredLazyList`` class is used for collecting the results of 
    many deferreds. This is similar to ``DeferredList`` 
    (``twisted.internet.defer.DeferredList``) but works with an iterator 
    yielding deferreds. This will only maintain a certain number of 
    deferreds simultaneously. Once one of the deferreds finishes, another 
    will be obtained from the iterator. 
    """ 

    def __init__(self, deferreds, count=None, consume_errors=None): 
     defer.Deferred.__init__(self) 

     if count is None: 
      count = 1 

     self.__consume_errors = bool(consume_errors) 

     self.__iter = enumerate(deferreds) 
     self.__results = [] 
     for _i in xrange(count): 
      # Start specified number of simultaneous deferreds. 
      if not self.called: 
       self.__next_save_result(None, None, None) 
      else: 
       break 

    def __next_save_result(self, result, success, index): 
     """ 
     Called when a deferred completes. 
     """ 
     # Make sure we can save result at index. 
     if index is not None: 
      results_len = len(self.__results) 
      if results_len <= index: 
       self.__results += [NO_RESULT] * (index - results_len + 1) 
      # Save result. 
      self.__results[index] = (success, result) 

     # Get next deferred. 
     try: 
      i, d = self.__iter.next() 
      d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i)) 

     except StopIteration: 
      # Iterator is exhausted, callback self with results. 
      self.callback(self.__results) 

     # Pass through result. 
     return result if success or not self.__consume_errors else None 

的問題是當deferreds從generate_update_deferreds()他們.called已被設置爲True這是造成DeferredLazyList遞歸調用自身產生。

發生了什麼事是:

  1. DeferredLazyList.__init__()self.__next_save_result()被稱爲count時間(如5)。

  2. 每次調用self.__next_save_result()消耗1從self.__iter推遲,本身作爲回調添加。

  3. 由於產生遞延具有.called設置爲Trued.addCallbacks(self.__next_save_result, ...)立即調用self.__next_save_result()這個循環繼續,直到RuntimeError上升,因爲遞歸的深度已經達到。

我打印的堆棧跟蹤的遞歸限制達成,以確認這是問題的原因之前:

File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/server.py", line 937, in update_many_docs 
    results = yield DeferredLazyList(gen_deferreds, count=self.mongo_connections, consume_errors=True, return_results=True) 
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 157, in __init__ 
    self.__next_save_result(None, None, None) 
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result 
    d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i)) 
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 290, in addCallbacks 
    self._runCallbacks() 
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 551, in _runCallbacks 
    current.result = callback(current.result, *args, **kw) 
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result 
    d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i)) 
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 290, in addCallbacks 
    self._runCallbacks() 
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 551, in _runCallbacks 
    current.result = callback(current.result, *args, **kw) 
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result 
    d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i)) 
# Repeated until the RuntimeError 
exceptions.RuntimeError: maximum recursion depth exceeded 

任何幫助將不勝感激。順便說一句,我使用Twisted 12.1.0運行Python 2.7.3,而MongoDB的內容實際上只與理解上下文有關。


我想要的結果,從每推遲,但cooperate()不返回這些,所以我增加了一個回調到每推遲他們屈服於CooperativeTask年代以前:

from twisted.internet.defer import DeferredList, inlineCallbacks 
from twisted.internet.task import cooperate 

NO_RESULT = object() 

def generate_update_deferreds(collection, many_docs, save_results): 
    for i, doc in enumerate(update_docs): 
     d = collection.update({'_id': doc['_id']}, doc, upsert=True) 
     d.addBoth(save_result, i, save_results) # Save result 
     yield d 

def save_result(result, i, save_results): 
    save_results[i] = result 

@inlineCallbacks 
def update_docs(collection, many_docs): 
    save_results = [NO_RESULT] * len(many_docs) 
    gen_deferreds = generate_update_deferreds(collection, many_docs, save_results)) 
    workers = [cooperate(gen_deferreds).whenDone() for _i in xrange(count)] 
    yield defer.DeferredList(workers) 
    # Handle save_results... 

回答

3

有一些工具扭曲,這將幫助你更輕鬆地做到這一點。例如,合作:

from twisted.internet.task import cooperate 

def generate_update_deferreds(collection, many_docs): 
    for doc in update_docs: 
     d = collection.update({'_id': doc['_id']}, doc, upsert=True) 
     yield d 

work = generate_update_deferreds(...) 
worker_tasks = [] 
for i in range(count): 
    task = cooperate(work) 
    worker_tasks.append(task) 

all_done_deferred = DeferredList([task.whenDone() for task in worker_tasks]) 
+0

謝謝,解決了這個問題。 – cpburnz 2013-03-26 19:58:22