2016-11-07 27 views
3

TL; DRPython的芹菜增加併發對工人

是否有可能增加或在運行芹菜工人減少併發無重新啓動呢?

我用芹菜4.0.0與RabbitMQ的作爲經紀人在Ubuntu 14.10

我USECASE

我經常面臨的一個大隊列的任務,其中大部分主要執行的基於HTTP請求並做一些小處理。我讓工作人員在相當強大的機器上運行,並希望最大限度地利用資源。這在大多數情況下都不是問題,除非處理大量的HTTP請求,這可能會超時或需要很長時間才能響應等。處理這些問題時,我想暫時增加--concurrency - 參數,而不必重新啓動worker。

目前我正在運行芹菜--concurrency 150,但這隻會讓服務器瓶頸(CPU)的利用率降低約10%。我想一種解決方案是在那段時間內產生另一個150個併發工作者,並在稍後將其殺死,但這可能會增加複雜性。如果可能的話,我想堅持1個工人/機器。

+1

如何對自動縮放選項:http://docs.celeryproject.org/en/latest/userguide/workers.html#autoscaling –

+0

原則上我喜歡使用自動縮放的想法比我想出的更好。然而,這些文檔並不清楚如何進行子類化和添加自己的邏輯。 請注意,當我試圖超越169個進程時,芹菜在任何地方都會拋出錯誤,但我認爲這個事實可能是GitHub的一個特例。 – MoorzTech

回答

2

這可能是可能的使用內置的autoscaling(感謝菲利普Tzou)的子類。不幸的是,自動縮放功能的文檔記錄很差。

但是,在做了一些更多的挖掘之後,我遇到了celery.app.control,這些(除其他外)允許通過RabbitMQ將消息發送給工作人員進行縮放。這裏有一個如何去這一個小例子:

import os, time 
from celery import Celery 
from celery.app.control import Control 

app = Celery() 
controller = Control(app) 

while True: 
    n=5 # the numer of processes to add/remove 
    upper_load_threshold = 6 
    lower_load_threshold = 4 
    if os.getloadavg()[0] <= lower_load_threshold: # we're looking at the 5 min load avg here 
     controller.pool_grow(n) 
    elif os.getloadavg()[0] >= upper_load_threshold: 
     controller.pool_shrink(n) 
    time.sleep(10)