2012-04-04 53 views
2

我試圖把握與NDB介紹異步操作,我想用@ndb.tasklet到異步我的一些工作。如何重寫一個異步NDB方法,寫自己的tasklet

的簡單的例子就是在重寫get_or_insert_async

被string_id代這是一個正確的方式來的東西呢?這裏可以改進什麼?

@classmethod 
@ndb.tasklet 
def get_or_insert_async(cls, *args): 
    id = cls.make_string_id(*args) 
    model = yield super(MyModel, cls).get_or_insert_async(id) 
    raise ndb.Return(model) 

另一個例子會在扇形的循環中做某事。它是否正確?

@classmethod 
@ndb.tasklet 
def do_stuff(cls, some_collection): 

    @ndb.tasklet 
    def internal_tasklet(data): 
     do_some_long_taking_stuff(data) 
     id = make_stuff_needed_for_id(data) 
     model = yield cls.get_or_insert_async(id) 
     model.long_processing(data) 
     yield model.put_async() 
     raise ndb.Return(None) 

    for data in some_collection: 
     # will it parallelise internal_tasklet execution? 
     yield internal_tasklet(data) 

    raise ndb.Return(None) 

編輯:

如所理解的整個概念,yields在這裏提供一個Future對象其然後在平行(如果可能)收集並執行異步。我對麼?

尼克的提示後,(是你的意思?):

@classmethod 
@ndb.tasklet 
def do_stuff(cls, some_collection): 

    @ndb.tasklet 
    def internal_tasklet(data): 
     do_some_long_taking_stuff(data) 
     id = make_stuff_needed_for_id(data) 
     model = yield cls.get_or_insert_async(id) 
     model.long_processing(data) 
     raise ndb.Return(model)    # change here 

    models = [] 
    for data in some_collection: 
     # will it parallelise internal_tasklet execution? 
     m = yield internal_tasklet(data)  # change here 
     models.appedn(m)      # change here 

    keys = yield ndb.put_multi_async(models) # change here 
    raise ndb.Return(keys)      # change here 

編輯:

新修訂版本...

@classmethod 
@ndb.tasklet 
def do_stuff(cls, some_collection): 

    @ndb.tasklet 
    def internal_tasklet(data): 
     do_some_long_taking_stuff(data) 
     id = make_stuff_needed_for_id(data) 
     model = yield cls.get_or_insert_async(id) 
     model.long_processing(data) 
     raise ndb.Return(model)     

    futures = [] 
    for data in some_collection: 
     # tasklets won't run in parallel but while 
     # one is waiting on a yield (and RPC underneath) 
     # the other will advance it's execution 
     # up to a next yield or return 
     fut = internal_tasklet(data))   # change here 
     futures.append(fut)     # change here 

    Future.wait_all(futures)     # change here 

    models = [fut.get_result() for fut in futures] 
    keys = yield ndb.put_multi_async(models) # change here 
    raise ndb.Return(keys)      # change here 

回答

1

你並不需要使用微進程,如果所有你想要做的就是調用異步的東西用不同的參數 - 只返回被包裝的函數的返回值,就像這樣:

def get_or_insert_async(cls, *args): 
    id = cls.make_string_id(*args) 
    return super(MyModel, cls).get_or_insert_async(id) 

我是持謹慎態度的原因很多,但:你改變的意義內置的功能,這通常是一個壞主意,你改變了簽名(位置參數,但沒有關鍵字參數) ,而且您不會將其他參數傳遞給原始函數。

對於你的第二個例子中,產生的東西一次一個將迫使NDB等待他們完成 - 「成品率」與「等待」的代名詞。相反,對集合中的每個元素執行tasklet函數,然後同時等待所有元素(通過調用列表中的yield)。

+0

不可能將其在'get_or_insert_async'同步,只有底層調用原來'get_or_insert_async'真的異步'make_string_id'電話嗎? – 2012-04-05 08:03:44

+0

你可以在你的答案中重寫第二個例子嗎?我不確定女巫的產量是否下降,因此循環不會等待每個元素和所有(或大部分)內部並行處理的'internal_tasklet'執行。 – 2012-04-05 08:07:37

+0

@WooYek是的,但make_string_id在你的代碼片段中是同步的。如果他們沒有製作RPC,那麼異步執行它們就毫無意義 - 只有當時間用於RPC時,tasklet纔有用。關於第二個例子 - 問題是在循環中調用yield的反模式。這樣做等待每個單獨的任務完成後再進入下一個任務。任何時候你有一個帶有yield的循環,你應該調用沒有yield的函數,彙總一個結果列表,並在該列表中調用yield。 – 2012-04-05 08:46:40