2010-08-03 71 views
92

有沒有辦法,例如每n秒打印一次Hello World!?例如,程序會通過我有的任何代碼,然後一旦它已經5秒鐘(與time.sleep())它會執行該代碼。我會用它來更新文件,但不打印Hello World。每n秒運行一段代碼

例如:

startrepeat("print('Hello World')", .01) # Repeats print('Hello World') ever .01 seconds 

for i in range(5): 
    print(i) 

>> Hello World! 
>> 0 
>> 1 
>> 2 
>> Hello World! 
>> 3 
>> Hello World! 
>> 4 
+6

相關:什麼是Python中重複執行的函數,每x秒的最佳方式(http://stackoverflow.com/q/474528/4279) – jfs 2015-07-16 20:10:58

回答

205
import threading 

def printit(): 
    threading.Timer(5.0, printit).start() 
    print "Hello, World!" 

printit() 

# continue with the rest of your code 
+3

這也不起作用 - 它只會打印hello world,然後做任何我之後放置的代碼,而不會重複它自己。 – 2010-08-03 05:32:09

+0

你可以創建一個方法SetAlarm,它接受所有需要的參數來執行這個函數,這個方法會設置threading.Timer並且返回,我的意思就是這樣.......... timer = threading.Timer(你可以使用mx DateTime來計算時間間隔使用DateTime.RelativeDateTime(天=間隔), timer.start() – shahjapan 2010-08-03 05:42:34

+2

@Zonda,oops,忘了開始線程 - 編輯現在修復,抱歉。 – 2010-08-03 05:43:52

17
def update(): 
    import time 
    while True: 
     print 'Hello World!' 
     time.sleep(5) 

這將作爲一個功能運行。 while True:使它永遠運行。如果您需要,您可以隨時將其從功能中取出。

+1

不工作;它只是永遠運行,我不能做任何其他事情。 – 2010-08-03 05:16:03

+0

你想同時做什麼其他事情? – 2010-08-03 05:19:24

+12

那麼這只是在一個循環運行。你沒有在問題中指明你會在此期間做其他事情,所以我認爲這就是你需要的。 – avacariu 2010-08-03 05:54:14

1

你可以開始一個單獨的線程,其唯一的職責是計數5秒,更新文件,重複。你不希望這個單獨的線程干擾你的主線程。

79

我的卑微就這個題目,亞歷克斯·馬爾泰利的答案的推廣,用的start()和stop()控制:

from threading import Timer 

class RepeatedTimer(object): 
    def __init__(self, interval, function, *args, **kwargs): 
     self._timer  = None 
     self.interval = interval 
     self.function = function 
     self.args  = args 
     self.kwargs  = kwargs 
     self.is_running = False 
     self.start() 

    def _run(self): 
     self.is_running = False 
     self.start() 
     self.function(*self.args, **self.kwargs) 

    def start(self): 
     if not self.is_running: 
      self._timer = Timer(self.interval, self._run) 
      self._timer.start() 
      self.is_running = True 

    def stop(self): 
     self._timer.cancel() 
     self.is_running = False 

用法:

from time import sleep 

def hello(name): 
    print "Hello %s!" % name 

print "starting..." 
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start() 
try: 
    sleep(5) # your long-running job goes here... 
finally: 
    rt.stop() # better in a try/finally block to make sure the program ends! 

特點:

  • 只有標準庫,沒有外部依賴dencies
  • start()stop()是安全的,多次調用,即使定時器已經開始/停止
  • 要調用的函數可以有位置和命名參數
  • 您可以隨時更改interval,這將是下一個後生效跑。 argskwargs甚至function也一樣!
+0

不錯!使用它。 – Wapiti 2015-08-20 18:27:09

+0

我需要的是'sleep()'函數。謝謝。 – 2016-07-02 00:20:27

+0

@JossieCalderon:'sleep()'函數位於Python標準庫的'time'模塊中,除了'import'之外,不需要任何額外的代碼來使用它。但請注意,這是一次性阻塞呼叫,而不是OP所要求的重複多線程計時器。基本上,'sleep()'只是一個暫停,而不是一個計時器。 – MestreLion 2016-07-11 22:08:03

23

保存自己精神分裂症發作,並使用先進的Python調度: http://pythonhosted.org/APScheduler

的代碼是如此簡單:

from apscheduler.scheduler import Scheduler 

sched = Scheduler() 
sched.start() 

def some_job(): 
    print "Every 10 seconds" 

sched.add_interval_job(some_job, seconds = 10) 

.... 
sched.shutdown() 
+2

首先,子模塊被稱爲「調度程序」,帶有「s」。那裏沒有課程調度程序。也許BackgroundScheduler?無論如何,這個答案是不完整的,不起作用。 – 2014-08-26 09:04:12

+2

這已經有一段時間了,我想我會粘貼網絡用戶手冊中的代碼。上面的代碼現在已經得到糾正(仍然沒有測試過,但是它來自我自己的工作代碼,我經常使用它)。 PS:也許我們正在看不同的版本/模塊?我確定我的行是「從apscheduler.scheduler導入調度程序」與大寫S,而不是複數。 – 2014-08-30 08:03:15

+2

@MadsSjjern:我看到[2.1分支有'apscheduler.scheduler'(no's')模塊](https://bitbucket.org/agronholm/apscheduler/src/11c5c1e7b8c33205293168e044d90b7f2c84a21b/apscheduler/?at=2.1)。當前分支3沒有。 – jfs 2014-09-18 07:43:14

4

這裏是不創建一個新的線程版本每n秒:

from threading import Event, Thread 

def call_repeatedly(interval, func, *args): 
    stopped = Event() 
    def loop(): 
     while not stopped.wait(interval): # the first call is in `interval` secs 
      func(*args) 
    Thread(target=loop).start()  
    return stopped.set 

該事件用於停止th Ë重複:

cancel_future_calls = call_repeatedly(5, print, "Hello, World") 
# do something else here... 
cancel_future_calls() # stop future calls 

Improve current implementation of a setInterval python

+0

值得注意的是,雖然這確實允許別的東西在等待'func'再次運行時運行,但此解決方案不會說明'func'運行需要多長時間。所以這個解決方案在*運行之間等待'n'秒*,而不是每運行'n'秒。上面的其他解決方案至少更接近每'秒'運行,儘管此解決方案也有其優勢。 – Mike 2018-02-13 20:38:58

+0

@Mike:是的。如果你按照鏈接;你會發現它被明確提及並提出了相應的解決方案。這取決於具體情況哪種解決方案更可取。 – jfs 2018-02-13 21:04:24

8

下面是一個簡單的例子與APScheduler 3.00+兼容:

# note that there are many other schedulers available 
from apscheduler.schedulers.background import BackgroundScheduler 

sched = BackgroundScheduler() 

def some_job(): 
    print('Every 10 seconds') 

# seconds can be replaced with minutes, hours, or days 
sched.add_job(some_job, 'interval', seconds=10) 
sched.start() 

... 

sched.shutdown() 

或者,你可以使用下面的。與許多替代方案不同,此計時器將精確地執行所需的代碼(無論代碼執行所花費的時間),每隔012秒鐘執行一次所需的代碼。所以如果你無法承受任何漂移,這是一個很好的選擇。

import time 
from threading import Event, Thread 

class RepeatedTimer: 

    """Repeat `function` every `interval` seconds.""" 

    def __init__(self, interval, function, *args, **kwargs): 
     self.interval = interval 
     self.function = function 
     self.args = args 
     self.kwargs = kwargs 
     self.start = time.time() 
     self.event = Event() 
     self.thread = Thread(target=self._target) 
     self.thread.start() 

    def _target(self): 
     while not self.event.wait(self._time): 
      self.function(*self.args, **self.kwargs) 

    @property 
    def _time(self): 
     return self.interval - ((time.time() - self.start) % self.interval) 

    def stop(self): 
     self.event.set() 
     self.thread.join() 


# start timer 
timer = RepeatedTimer(10, print, 'Hello world') 

# stop timer 
timer.stop()