2014-11-21 77 views
8

我遇到問題時發現了一些似乎對我來說會比較簡單的事情。使用Celery初始化一個帶參數的工作人員

我使用芹菜3.1與Python 3,我想用參數初始化我的工人,以便他們可以使用這些細節進行設置。

具體而言:這些工作人員將使用需要使用認證憑證與第三方API進行交互的任務。在消費任何任務之前,工作人員必須將認證詳細信息傳遞給API服務器(認證詳細信息在第一次認證請求後存儲在cookie中)。

我想通過CLI啓動這些登錄憑證給工作人員。然後,我希望工作人員使用它們進行身份驗證並存儲會話以供將來的任務使用(理想情況下,這將存儲在可以從任務訪問的屬性中)。

芹菜可能嗎?

作爲一個便箋,我已經考慮將requests.session對象(來自Python requests庫)作爲任務參數傳遞,但這需要序列化,看起來像是皺眉。

回答

16

我建議使用抽象任務基類並緩存requests.session

從芹菜文檔:

任務不是實例爲每個請求,但被註冊在任務註冊表作爲一個全球性的實例。

這意味着__init__構造函數只會在每個進程中調用一次,並且該任務類在語義上更接近Actor。

這對緩存資源也很有用......

@app.task(base=APITask, bind=True) 
def call_api(self, url): 
    # self will refer to the task instance (because we're using bind=True) 
    self.session.get(url) 

您也可以使用app.task裝飾作爲將在設置一個額外的參數通過API認證選項:當您創建,這將使API請求的任務

import requests 
from celery import Task 

class APITask(Task): 
    """API requests task class.""" 

    abstract = True 

    # the cached requests.session object 
    _session = None 

    def __init__(self): 
     # since this class is instantiated once, use this method 
     # to initialize and cache resources like a requests.session 
     # or use a property like the example below which will create 
     # a requests.session only the first time it's accessed 

    @property 
    def session(self): 
     if self._session is None: 
      # store the session object for the first time 
      session = requests.Session() 
      session.auth = ('user', 'pass') 

      self._session = session 

     return self._session 

現在任務__dict__,例如:

# pass a custom auth argument 
@app.task(base=APITask, bind=True, auth=('user', 'pass')) 
def call_api(self, url): 
    pass 

,使基類使用通過了uthentication選項:

class APITask(Task): 
    """API requests task class.""" 

    abstract = True 

    # the cached requests.session object 
    _session = None 

    # the API authentication 
    auth =() 

    @property 
    def session(self): 
     if self._session is None: 
      # store the session object for the first time 
      session = requests.Session() 
      # use the authentication that was passed to the task 
      session.auth = self.auth 

      self._session = session 

     return self._session 

你可以閱讀更多的芹菜文檔網站:

現在回到你原來的問題這是傳遞額外的參數給來自命令行的工作人員:

有關於這在芹菜文檔Adding new command-line options一個部分,這裏是傳遞一個用戶名和密碼在命令行工作人員的例子:

$ celery worker -A appname --username user --password pass 

代碼:

from celery import bootsteps 
from celery.bin import Option 


app.user_options['worker'].add(
    Option('--username', dest='api_username', default=None, help='API username.') 
) 

app.user_options['worker'].add(
    Option('--password', dest='api_password', default=None, help='API password.') 
) 


class CustomArgs(bootsteps.Step): 

    def __init__(self, worker, api_username, api_password, **options): 
     # store the api authentication 
     APITask.auth = (api_username, api_password) 


app.steps['worker'].add(CustomArgs) 
+0

非常好,我很難破譯文檔中的所有內容。謝謝你把它鋪好。 – 2014-11-24 22:39:51

+0

對不起,再次挖掘一次,你能否澄清如何將命令行參數從Boostep傳遞給任務初始化(以便我可以使用命令提供的用戶名和密碼初始化任務會話對象-線)。目標是不以純文本存儲我的API憑據。 – 2014-11-28 06:08:32

+0

@JoshuaGilman對於延遲抱歉,我用一個例子更新了答案。 – Pierre 2014-12-01 12:20:02

0

我想你可以調用你使用命令行參數編寫的腳本。像下面這樣:

my_script.py username password 

裏面你的腳本,你可以有你的主要功能封裝在一個@celery.task@app.task裝飾。

import sys 

from celery import Celery 

cel = Celery() # put whatever config info you need in here 

@celery.task 
def main(): 
    username, password = sys.argv[1], sys.argv[2] 

這樣的事情應該讓你開始。請務必查看Python的argparse以獲得更復雜的參數解析。

+0

謝謝,但您無法通過調用python腳本來啓動工作進程。你必須像這樣調用芹菜:'celery -A proj worker -l info' – 2014-11-22 00:18:38

+0

我們在這裏必須有一個非常奇怪的設置,然後......因爲這看起來就像它的工作。我必須研究我們正在做的更多。 – 2014-11-24 14:22:56