11
乍一看,我非常喜歡Celery中的「批量」功能,因爲我需要在調用API之前對ID進行分組(否則我可能會被踢出)。芹菜鏈不能與批處理
不幸的是,當測試一點點時,批處理任務看起來與Canvas基元的其餘部分(在這種情況下爲連鎖)似乎沒有什麼關係。例如:
@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
for request in requests:
a.backend.mark_as_done(request.id, 42, request=request)
print "filter_by_price " + str([r.args[0] for r in requests])
@a.task
def completed():
print("complete")
所以,這個簡單的工作流程:
chain(get_price.s("ID_1"), completed.si()).delay()
我看到這樣的輸出:
[2015-07-11 16:16:20,348: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-07-11 16:16:20,376: INFO/MainProcess] mingle: searching for neighbors
[2015-07-11 16:16:21,406: INFO/MainProcess] mingle: all alone
[2015-07-11 16:16:21,449: WARNING/MainProcess] [email protected] ready.
[2015-07-11 16:16:34,093: WARNING/Worker-4] filter_by_price ['ID_1']
5秒後,filter_by_price()被觸發,就像預期。問題是completed()永遠不會被調用。
有什麼想法可以在這裏發生什麼? 如果不使用批次,有什麼可以解決這個問題的體面方法?
PS:我已經設置了CELERYD_PREFETCH_MULTIPLIER=0
像docs說。
爲了記錄,我需要非常糟糕的配料,我最終只使用RabbitMQ + Pika和一個非常簡單的緩衝消息的工作模板。如果有人有興趣,我有源代碼可用,歡呼。 –