我有一大堆文件插入MongoDB(可能n> 100000)。我不想一次創建100000個延遲,但我不想按順序執行和等待每個查詢,因爲我有一個到MongoDB的連接池,我想充分利用它。所以我有一個發生器函數,它會產生一個DeferredLazyList
消耗的延遲。達到最大遞歸深度的懶惰延遲列表
def generate_update_deferreds(collection, many_docs):
for doc in many_docs:
d = collection.update({'_id': doc['_id']}, doc, upsert=True)
yield d
這是連接所述延遲upserts的生成的代碼,並且DeferredLazyList
。
@defer.inlineCallbacks
def update_docs(collection, many_docs):
gen_deferreds = generate_update_deferreds(collection, many_docs)
results = yield DeferredLazyList(gen_deferreds, count=pool_size, consume_errors=True)
的DeferredLazyList
類似於DeferredList,但不是接受deferreds的列表,以等待它接受一個迭代。從迭代器中檢索延遲,同時只有count
延遲活動同時激活。這用於有效批量延期,因爲它們是在產生時創建的。
class DeferredLazyList(defer.Deferred):
"""
The ``DeferredLazyList`` class is used for collecting the results of
many deferreds. This is similar to ``DeferredList``
(``twisted.internet.defer.DeferredList``) but works with an iterator
yielding deferreds. This will only maintain a certain number of
deferreds simultaneously. Once one of the deferreds finishes, another
will be obtained from the iterator.
"""
def __init__(self, deferreds, count=None, consume_errors=None):
defer.Deferred.__init__(self)
if count is None:
count = 1
self.__consume_errors = bool(consume_errors)
self.__iter = enumerate(deferreds)
self.__results = []
for _i in xrange(count):
# Start specified number of simultaneous deferreds.
if not self.called:
self.__next_save_result(None, None, None)
else:
break
def __next_save_result(self, result, success, index):
"""
Called when a deferred completes.
"""
# Make sure we can save result at index.
if index is not None:
results_len = len(self.__results)
if results_len <= index:
self.__results += [NO_RESULT] * (index - results_len + 1)
# Save result.
self.__results[index] = (success, result)
# Get next deferred.
try:
i, d = self.__iter.next()
d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
except StopIteration:
# Iterator is exhausted, callback self with results.
self.callback(self.__results)
# Pass through result.
return result if success or not self.__consume_errors else None
的問題是當deferreds從generate_update_deferreds()
他們.called
已被設置爲True
這是造成DeferredLazyList
遞歸調用自身產生。
發生了什麼事是:
在
DeferredLazyList.__init__()
,self.__next_save_result()
被稱爲count
時間(如5)。每次調用
self.__next_save_result()
消耗1從self.__iter
推遲,本身作爲回調添加。由於產生遞延具有
.called
設置爲True
,d.addCallbacks(self.__next_save_result, ...)
立即調用self.__next_save_result()
這個循環繼續,直到RuntimeError
上升,因爲遞歸的深度已經達到。
我打印的堆棧跟蹤的遞歸限制達成,以確認這是問題的原因之前:
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/server.py", line 937, in update_many_docs
results = yield DeferredLazyList(gen_deferreds, count=self.mongo_connections, consume_errors=True, return_results=True)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 157, in __init__
self.__next_save_result(None, None, None)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result
d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 290, in addCallbacks
self._runCallbacks()
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 551, in _runCallbacks
current.result = callback(current.result, *args, **kw)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result
d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 290, in addCallbacks
self._runCallbacks()
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 551, in _runCallbacks
current.result = callback(current.result, *args, **kw)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result
d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
# Repeated until the RuntimeError
exceptions.RuntimeError: maximum recursion depth exceeded
任何幫助將不勝感激。順便說一句,我使用Twisted 12.1.0運行Python 2.7.3,而MongoDB的內容實際上只與理解上下文有關。
我想要的結果,從每推遲,但cooperate()
不返回這些,所以我增加了一個回調到每推遲他們屈服於CooperativeTask
年代以前:
from twisted.internet.defer import DeferredList, inlineCallbacks
from twisted.internet.task import cooperate
NO_RESULT = object()
def generate_update_deferreds(collection, many_docs, save_results):
for i, doc in enumerate(update_docs):
d = collection.update({'_id': doc['_id']}, doc, upsert=True)
d.addBoth(save_result, i, save_results) # Save result
yield d
def save_result(result, i, save_results):
save_results[i] = result
@inlineCallbacks
def update_docs(collection, many_docs):
save_results = [NO_RESULT] * len(many_docs)
gen_deferreds = generate_update_deferreds(collection, many_docs, save_results))
workers = [cooperate(gen_deferreds).whenDone() for _i in xrange(count)]
yield defer.DeferredList(workers)
# Handle save_results...
謝謝,解決了這個問題。 – cpburnz 2013-03-26 19:58:22