2015-07-11 57 views
11

乍一看,我非常喜歡Celery中的「批量」功能,因爲我需要在調用API之前對ID進行分組(否則我可能會被踢出)。芹菜鏈不能與批處理

不幸的是,當測試一點點時,批處理任務看起來與Canvas基元的其餘部分(在這種情況下爲連鎖)似乎沒有什麼關係。例如:

@a.task(base=Batches, flush_every=10, flush_interval=5) 
def get_price(requests): 
    for request in requests: 
     a.backend.mark_as_done(request.id, 42, request=request) 
     print "filter_by_price " + str([r.args[0] for r in requests]) 

@a.task 
def completed(): 
    print("complete") 

所以,這個簡單的工作流程:

chain(get_price.s("ID_1"), completed.si()).delay() 

我看到這樣的輸出:

[2015-07-11 16:16:20,348: INFO/MainProcess] Connected to redis://localhost:6379/0 
[2015-07-11 16:16:20,376: INFO/MainProcess] mingle: searching for neighbors 
[2015-07-11 16:16:21,406: INFO/MainProcess] mingle: all alone 
[2015-07-11 16:16:21,449: WARNING/MainProcess] [email protected] ready. 
[2015-07-11 16:16:34,093: WARNING/Worker-4] filter_by_price ['ID_1'] 

5秒後,filter_by_price()被觸發,就像預期。問題是completed()永遠不會被調用。

有什麼想法可以在這裏發生什麼? 如果不使用批次,有什麼可以解決這個問題的體面方法?

PS:我已經設置了CELERYD_PREFETCH_MULTIPLIER=0像docs說。

+0

爲了記錄,我需要非常糟糕的配料,我最終只使用RabbitMQ + Pika和一個非常簡單的緩衝消息的工作模板。如果有人有興趣,我有源代碼可用,歡呼。 –

回答

5

看起來批處理任務的行爲與正常任務顯着不同。批量任務甚至不會發出像task_success這樣的信號。

由於您需要在get_price之後致電completed任務,您可以直接從get_price本身調用它。

@a.task(base=Batches, flush_every=10, flush_interval=5) 
def get_price(requests): 
    for request in requests: 
     # do something 
    completed.delay()