2016-11-08 70 views
1

add_done_callback方法最近被添加到分佈式Future對象中,它允許您在將來完成後採取一些操作,而不管它是否成功。如果你想直接調用任何方法resultexceptiontraceback傳遞的未來對象如何在回調中獲得未來的結果?

http://distributed.readthedocs.io/en/latest/api.html?highlight=add_done_callback#distributed.client.Future.add_done_callback

回調函數將掛起。

的例外,回溯可以但是在回調訪問如下: fut._exception().result() fut._traceback().result()

嘗試,結果同樣的模式 - 即fut._result().result()引發了一個異常:

File "C:\Python\lib\site-packages\tornado\concurrent.py", line 316, in _check_done 
    raise Exception("DummyFuture does not support blocking for results") 
Exception: DummyFuture does not support blocking for results 

如果不能在回調中訪問未來的結果,能夠添加回調對我來說是有限的使用。

我錯過了什麼 - 有沒有辦法在回調中獲得未來的結果?

在ASYNCIO文檔似乎舉個例子,其中直接訪問result方法是可行的:

​​

...我不知道如何與龍捲風/分佈式的,但它將是非常有用能夠做到這一點。

from distributed import Client 


client = Client("127.0.0.1:8786") 

def f(delay): 
    from time import sleep 
    from numpy.random import randn 
    sleep(delay) 
    if randn() > 1: 
     1/0 
    return delay 

def callback(fut): 
    import logging 
    logger = logging.getLogger('distributed') 
    if fut.status == 'finished': 
     res = future._result().result() # <-------------- Doesn't work! 
     logger.info("{!r} - {!s}".format(fut, res)) 
    else: 
     logger.info("{!r} - {!s}".format(fut, fut.status)) 


args = rand(10) 
futs = client.map(f, args) 
for fut in futs: 
    fut.add_done_callback(callback) 

回答

1

當前您的回調在Tornado事件循環中被調用。如果你想獲得未來的結果,你將不得不使用Tornado API。

下面是一個小例子:

In [1]: from distributed import Client 
In [2]: client = Client() 
In [3]: def inc(x): 
    ...:  return x + 1 
    ...: 
In [4]: from tornado import gen 

In [5]: @gen.coroutine 
    ...: def callback(future): 
    ...:  result = yield future._result() 
    ...:  print(result * 10) 
    ...:  
In [6]: future = client.submit(inc, 1) 

In [7]: future.add_done_callback(callback) 

20 

然而,你的問題突出了也許這並不是用戶與add_done_callback交互最直觀的方式,因此,如果我們引入了一個破我也不會感到驚訝更改爲更高版本。

In [8]: import distributed 

In [8]: distributed.__version__ 
Out[8]: '1.14.0'