2012-08-04 108 views
11

我一直在使用Flask爲我的k8055 USB接口板提供簡單的Web API;相當標準的得分手和推杆,而Flask真的讓我的生活變得更輕鬆。如何在Python中使用Flask執行定期任務

但我希望能夠註冊乳清發生時狀態的變化。例如,如果我有一個連接到電路板的按鈕,我可以輪詢該特定端口的api。但是如果我想讓輸出直接反映輸出結果,不管有人是否在與api交談,我會有這樣的事情。

while True: 
    board.read() 
    board.digital_outputs = board.digital_inputs 
    board.read() 
    time.sleep(1) 

而且每隔一秒,輸出將被更新以匹配輸入。

Flask下有什麼辦法做這種事嗎?我之前在Twisted中做過類似的事情,但Flask對於這個特殊的應用來說太放手了,現在就放棄它了...

謝謝。

回答

11

您可以使用cron執行簡單的任務。

爲您的任務創建燒瓶視圖。

# a separate view for periodic task 
@app.route('/task') 
def task(): 
    board.read() 
    board.digital_outputs = board.digital_inputs 

然後使用cron的,下載來自該網址定期

# cron task to run each minute 
0-59 * * * * run_task.sh 

凡run_task.sh內容

wget http://localhost/task 

cron是無法運行更頻繁一次一分鐘。如果你需要更高的頻率,(比如說,分別5秒期間=每分鐘12次),你都必須做在tun_task.sh通過以下方式

# loop 12 times with a delay 
for i in 1 2 3 4 5 6 7 8 9 10 11 12 
do 
    # download url in background for not to affect delay interval much 
    wget -b http://localhost/task 
    sleep 5s 
done 
-1

不,在Flask中沒有任何支持,但是您可以使用flask-celery或只需在單獨的線程(greenlet)中運行您的函數。

+3

感謝您的建議。 我去了gevent/greenlet路線,但它似乎'主'線程不屈服於循環線程(使用gevent.sleep而不是上述時間) 至於芹菜,肯定實施消息隊列服務器是過度殺毒爲什麼這麼'簡單'(TM)? – Bolster 2012-08-05 11:54:48

1

對於我的瓶的應用程序,我考慮使用所描述的方法的cron由Pashka在他的answerschedule庫和APScheduler

我發現APScheduler很簡單,並且服務於定期任務運行目的,所以繼續使用APScheduler。

示例代碼:

from flask import Flask, jsonify 

from apscheduler.schedulers.background import BackgroundScheduler 


app = Flask(__name__) 

def test_job(): 
    print('I am working...') 

scheduler = BackgroundScheduler() 
job = scheduler.add_job(test_job, 'interval', minutes=1) 
scheduler.start() 
相關問題