2012-07-27 48 views
2

我正在開發一個系統,爲AI項目構建一個龐大的n-gram模型。
我的管道如下:
資源輸入 - >獲取數據 - >解析器 - >教練
資源輸入(基本上是必須被解析的網址)不是常數,這意味着我可以一次引入大量的資源,稍後再引入大量的資源,等等。爲了最大限度地提高員工生產力,芹菜任務的正確方法是什麼?

我的想法是將管道的每個步驟都作爲Celery任務實施,並將其部署到雲上(例如,使用Heroku的工人dynos)。但我對Celery是新手,我對如何排列這些任務以使我的工作人員100%工作並同時維護系統的完整性存在懷疑。
直接前進的方法是在前一個任務完成後立即開始排隊任務,例如,如果我得到1000個項目的資源輸入,我會安排1000個「獲取數據」任務,並且每個任務都會排隊完成後的「解析」任務等。但是這會導致一個巨大的隊列,因爲在這些任務完成之前會有更多的資源進來,而且我知道構建模型需要幾個月的時間(如果它完成了!)。

所以我不確定Celery是否可以處理所有這些問題,而不會陷入記憶問題(Heroku有其侷限性)或任何其他我現在無法想象的問題。或者,也許我應該使用更復雜的技術,例如每X分鐘計劃一大塊任務,將部分結果存儲在數據庫中等等,這可能會避免這些問題中的一些,但我不會讓我的工作人員有100%的時間。

有什麼想法?
謝謝!


編輯

的回答我的問題實際上是在公認的答案

回答

2

的意見通過爲每個任務不同的隊列,並運行一個專門的工作人員爲每個隊列可以保證您的系統將同樣利用100%的系統資源來關注每項任務。此外,您可以添加工作人員以基於任務運行時間平衡任務處理。

例如,定義任務

@celery.task 
def fetch(url): 
    # fetch url 
    return html 

@celery.task 
def parse(html): 
    pass 

和配置automatic routing

CELERY_ROUTES = { 'tasks.fetch':{ '隊列': 'fetch_queue'}, 「tasks.parse ':{' 排隊「: 'parse_queue'}}

和運行工:

$ celery worker -Q fetch_queue 

$ celery worker -Q parse_queue 

對於每種任務類型,您將擁有單獨的工作人員。

使用回調,你可以取後容易解析:

fetch.apply_async((url), link=parse.subtask()) 

附:對於提取工作者,您可以使用Eventlet pool來利用異步IO。

+0

但是這些任務不是同樣昂貴(在cpu時間)。想象一下,「訓練」任務需要兩次「取回」任務。有了這個解決方案,培訓工作人員將100%的時間工作,而取工作者只佔50%的時間。我錯了嗎? – 2012-07-29 14:37:26

+0

正如我所提到的,您可以通過爲更便宜的任務添加更多工作人員來平衡負載。在你的例子中,你可以讓2名工作人員「獲取」任務,一名工作人員獲得「培訓」。 – mher 2012-07-29 19:23:37

+0

我認爲你的意思是相反的,2名工人的最昂貴的任務。無論如何,我沒有看到您的解決方案優於我在問題中提到的簡單(sf)解決方案。此外,sf解決方案具有很高的可擴展性,因爲我會從1名可以處理所有類型任務的工作人員開始,然後在看到速度非常慢時進行擴展。我對sf解決方案的疑問是,Celery能否在幾個月內處理數千個任務的隊列(實際上,是一個無限的隊列),或者如果這會產生某種問題... 順便說一句,感謝您指點我Evenlet,它看起來非常好=) – 2012-07-29 21:02:38

相關問題