2011-04-18 106 views
6
 
from twisted.internet import reactor 
from twisted.internet import threads 
from twisted.internet import defer 
import time 

def worker(arg): 
    print 'Hello world' 
    time.sleep(10) 
    return 1 

def run(): 
    print 'Starting workers' 
    l = [] 
    for x in range(2): 
     l.append(threads.deferToThread(worker, x)) 
    return defer.DeferredList(l) 

def res(results): 
    print results 
    reactor.stop() 

d = run() 
d.addCallback(res) 
reactor.run() 

如何通過超時阻止工作?如何添加超時扭曲延遲

回答

5

線程不能被打斷,除非他們與你合作。 time.sleep(10)不會合作,所以我不認爲你可以打斷這個工人。如果你有另一種具有幾個獨立的階段,或者在一個循環中對一些任務操作工人的,那麼你可以做這樣的事情:

def worker(stop, jobs): 
    for j in jobs: 
     if stop: 
      break 
     j.do() 

stop = [] 
d = deferToThread(worker) 

# This will make the list eval to true and break out of the loop. 
stop.append(None) 

這不是扭曲具體的,無論是。這就是線程在Python中的工作方式。

3

儘管可能無法中斷線程,但可以通過cancel函數來停止延遲,我認爲該函數可用於Twisted 10.1.0和更高版本。

我已經使用了下面的類來使Deferreds回調一個特定的函數,如果Deferred在一段時間後還沒有被觸發。對於與OP中主題相同的問題可能會有用。

編輯:正如下面的評論建議,最好不要從defer.Deferred繼承。因此,我更改了代碼以使用實現相同效果的包裝器。超時前

class DeferredWrapperWithTimeout(object): 
    ''' 
    Holds a deferred that allows a specified function to be called-back 
    if the deferred does not fire before some specified timeout. 
    ''' 
    def __init__(self, canceller=None): 
     self._def = defer.Deferred(canceller) 

    def _finish(self, r, t): 
     ''' 
     Function to be called (internally) after the Deferred 
     has fired, in order to cancel the timeout. 
     ''' 
     if ((t!=None) and (t.active())): 
      t.cancel() 
     return r 

    def getDeferred(self): 
     return self._def 

    def addTimeoutCallback(self, reactr, timeout, 
          callUponTimeout, *args, **kw): 
     ''' 
     The function 'callUponTimeout' (with optional args or keywords) 
     will be called after 'timeout' seconds, unless the Deferred fires. 
     ''' 

     def timeoutCallback(): 
      self._def.cancel() 
      callUponTimeout(*args, **kw) 
     toc = reactr.callLater(timeout, timeoutCallback) 
     return self._def.addCallback(self._finish, toc) 

回調:

from twisted.internet import reactor 

from DeferredWithTimeout import * 

dw = DeferredWrapperWithTimeout() 
d = dw.getDeferred() 

def testCallback(x=None): 
    print "called" 

def testTimeout(x=None): 
    print "timedout" 

d.addCallback(testCallback) 
dw.addTimeoutCallback(reactor, 20, testTimeout, "to") 
reactor.callLater(2, d.callback, "cb") 
reactor.run() 

打印 「叫」,別無其他。回調之前

超時:

from twisted.internet import reactor 

from DeferredWithTimeout import * 

dw = DeferredWrapperWithTimeout() 
d = dw.getDeferred() 

def testCallback(x=None): 
    print "called" 

def testTimeout(x=None): 
    print "timedout" 

d.addCallback(testCallback) 
dw.addTimeoutCallback(reactor, 20, testTimeout, "to") 
reactor.run() 

打印 「已逾時」 20秒後,並沒有別的。

+2

你真的不應該繼承'延遲'。實現這個功能作爲一個單獨的幫手,而不是一個子類。 http://pyvideo.org/video/1684/the-end-of-object-inheritance-the-beginning-of – 2013-09-26 11:33:21

+0

除了關於不繼承事物的常見警告之外,「延遲」是一個*特別*壞事子類,因爲它的行爲假定它自己的實現非常具體,並且不會很好地反應某些方法被覆蓋。 – Glyph 2013-09-26 19:15:02

+0

感謝您觀看該視頻的鏈接!它完全改變了我設計代碼的方式。 – Corey 2015-04-18 00:43:32

0

嗯,我的回答是不是線程,但有人說,你可以實現暫停功能作爲一個單獨的幫手:

from twisted.internet import defer 

def add_watchdog(deferred, timeout=0.05): 

    def callback(value): 
     if not watchdog.called: 
      watchdog.cancel() 
     return value 

    deferred.addBoth(callback) 

    from twisted.internet import reactor 
    watchdog = reactor.callLater(timeout, defer.timeout, deferred) 

d = defer.Deferred() 
add_watchdog(d) 

然後你就可以捕獲defer.TimeoutError遞延的errback可如果你需要。

+0

嗯,好像取消缺失 – 2015-09-15 17:22:12

+0

@ CarlD'Halluin,謹慎地闡述,或建議編輯? – TCAllen07 2016-01-04 00:52:38

0

我們這樣做使用裝飾。這種方法的優點是延遲在達到超時時被取消。這應該成爲扭曲庫的一部分imho

from twisted.internet import defer, reactor 

def timeout(secs): 
    """Decorator to add timeout to Deferred calls""" 
    def wrap(func): 
     @defer.inlineCallbacks 
     def _timeout(*args, **kwargs): 
      raw_d = func(*args, **kwargs) 
      if not isinstance(raw_d, defer.Deferred): 
       defer.returnValue(raw_d) 

      timeout_d = defer.Deferred() 
      times_up = reactor.callLater(secs, timeout_d.callback, None) 

      try: 
       raw_result, timeout_result = yield defer.DeferredList(
        [raw_d, timeout_d], fireOnOneCallback=True, fireOnOneErrback=True, 
        consumeErrors=True) 
      except defer.FirstError as e: # Only raw_d should raise an exception 
       assert e.index == 0 
       times_up.cancel() 
       e.subFailure.raiseException() 
      else: # timeout 
       if timeout_d.called: 
        raw_d.cancel() 
        raise Exception("%s secs have expired" % secs) 

      # no timeout 
      times_up.cancel() 
      defer.returnValue(raw_result) 
     return _timeout 
return wrap