2017-07-04 111 views
10

我正在研究一個Django應用程序。我有一個API端點,如果請求,必須執行必須重複幾次(直到某個條件成立)的函數。我如何處理它現在的問題是 -Django - 每x秒運行一個函數

def shut_down(request): 
    # Do some stuff 
    while True: 
    result = some_fn() 
    if result: 
     break 
    time.sleep(2) 

    return True 

雖然我知道這是一個可怕的辦法,我不應該阻止2秒,我不能想出如何繞過它。
這個工程,說了4秒鐘後。但是我想要讓循環在後臺運行,並且在some_fn返回True時停止。 (也可以肯定some_fn會返回True)

編輯 -
閱讀Oz123的迴應給了我一個似乎工作的想法。以下是我所做的 -

def shut_down(params): 
    # Do some stuff 
    # Offload the blocking job to a new thread 

    t = threading.Thread(target=some_fn, args=(id,), kwargs={}) 
    t.setDaemon(True) 
    t.start() 

    return True 

def some_fn(id): 
    while True: 
     # Do the job, get result in res 
     # If the job is done, return. Or sleep the thread for 2 seconds before trying again. 

     if res: 
      return 
     else: 
      time.sleep(2) 

這對我來說沒有工作。這很簡單,但我不知道多線程與Django結合的效率。
如果有人可以指出這個缺陷,批評是讚賞。

+1

您可以使用芹菜爲此。你可以在這裏找到手冊:https://realpython.com/blog/python/asynchronous-tasks-with-django-and-celery/#periodic-tasks – neverwalkaloner

+0

@neverwalkaloner聽起來像我所需要的。將嘗試使用它,謝謝。 :) – Zeokav

+1

正如@neverwalkaloner提到的那樣,您可以使用芹菜,您可以按計劃設置定期任務,查看文檔非常靈活http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html看看它的crontab模塊。 –

回答

5

對於許多小型項目,芹菜是一種超必殺。對於那些可以使用schedule的項目,它非常易於使用。

有了這個庫,你可以做任何功能執行定期任務:

import schedule 
import time 

def job(): 
    print("I'm working...") 

schedule.every(10).minutes.do(job) 
schedule.every().hour.do(job) 
schedule.every().day.at("10:30").do(job) 
schedule.every().monday.do(job) 
schedule.every().wednesday.at("13:15").do(job) 

while True: 
    schedule.run_pending() 
    time.sleep(1) 

的例子在阻塞的方式運行,但如果在常見問題看,你會發現,你還可以在運行任務並行線程,這樣你不會阻止,並刪除一旦不需要了任務:從日程安排進口計劃

def run_continuously(self, interval=1): 
    """Continuously run, while executing pending jobs at each elapsed 
    time interval. 
    @return cease_continuous_run: threading.Event which can be set to 
    cease continuous run. 
    Please note that it is *intended behavior that run_continuously() 
    does not run missed jobs*. For example, if you've registered a job 
    that should run every minute and you set a continuous run interval 
    of one hour then your job won't be run 60 times at each interval but 
    only once. 
    """ 

    cease_continuous_run = threading.Event() 

    class ScheduleThread(threading.Thread): 

     @classmethod 
     def run(cls): 
      while not cease_continuous_run.is_set(): 
       self.run_pending() 
       time.sleep(interval) 

    continuous_thread = ScheduleThread() 
    continuous_thread.setDaemon(True) 
    continuous_thread.start() 
    return cease_continuous_run 


Scheduler.run_continuously = run_continuously 

這裏 是一類方法的用法的例子:

def foo(self): 
     ... 
     if some_condition(): 
      return schedule.CancelJob # a job can dequeue it 

    # can be put in __enter__ or __init__ 
    self._job_stop = self.scheduler.run_continuously() 

    logger.debug("doing foo"...) 
    self.foo() # call foo 
    self.scheduler.every(5).seconds.do(
     self.foo) # schedule foo for running every 5 seconds 

    ... 
    # later on foo is not needed any more: 
    self._job_stop.set() 

    ... 

    def __exit__(self, exec_type, exc_value, traceback): 
     # if the jobs are not stop, you can stop them 
     self._job_stop.set() 
+0

事實上,我在尋找避免芹菜(現在)的方法,因爲我只需要在我的項目的一個或兩個地方安排時間。我剛剛遇到了這樣的哈哈!如果我有任何問題,我會嘗試實施並回復。感謝您的答覆! :) – Zeokav

+0

我在下面發佈了一個答案。如果你能告訴我這是否可能會導致問題,我會很感激。 – Zeokav

+0

有趣的約定 - 你爲什麼不創建一個繼承自'Scheduler'的類繼承'run_continuously'方法的原因?這會讓事情變得更加直接明瞭。 – Tim

1

我建議你使用芹菜的task管理。你可以參考this來設置這個應用程序(包,如果你是從javaScript背景)。

一旦設定,就可以改變代碼:

@app.task 
def check_shut_down(): 
    if not some_fun(): 
     # add task that'll run again after 2 secs 
     check_shut_down.delay((), countdown=3) 
    else: 
     # task completed; do something to notify yourself 
     return True