2013-04-29 81 views
15

我想要類似於executor.map的東西,除了迭代結果時,我想按照完成順序迭代它們,例如,首先完成的工作項目應該首先出現在迭代中,等等。這樣,如果序列中的每個工作項目尚未完成,迭代將會阻止。Python的`concurrent.futures`:按照完成順序迭代期貨

我知道如何使用隊列來實現這一點,但我想知道是否有可能使用futures框架。

(我大多采用基於線程的執行者,所以我想這適用於這些答案,但一般的答案會受到歡迎,以及。)

UPDATE:謝謝你的答案!你能解釋我如何使用as_completedexecutor.mapexecutor.map是使用期貨時最有用和最簡潔的工具,我不願意手動開始使用Future對象。

+0

你很幸運! – damzam 2013-05-02 04:40:41

回答

25

executor.map(),象內建map(),只返回在迭代的順序結果,所以很遺憾,你不能用它來確定完成的順序。 concurrent.futures.as_completed()是你要找的東西 - 這裏有一個例子:

import time 
import concurrent.futures 

times = [3, 1, 2] 

def sleeper(secs): 
    time.sleep(secs) 
    print('I slept for {} seconds'.format(secs)) 
    return secs 

# returns in the order given 
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: 
    print(list(executor.map(sleeper, times))) 

# I slept for 1 seconds 
# I slept for 2 seconds 
# I slept for 3 seconds 
# [3, 1, 2] 

# returns in the order completed 
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: 
    futs = [executor.submit(sleeper, secs) for secs in times] 
    print([fut.result() for fut in concurrent.futures.as_completed(futs)]) 

# I slept for 1 seconds 
# I slept for 2 seconds 
# I slept for 3 seconds 
# [1, 2, 3] 

當然,如果你需要使用的地圖界面,你可以創建自己的map_as_completed()函數封裝上面(也許它添加到子類Executor()),但我認爲通過executor.submit()創建期貨實例是一種更簡單/更乾淨的方式(也允許您提供無參數,kwargs)。

0

From python doc

concurrent.futures.as_completed(fs, timeout=None)¶ 

返回一個迭代 了由產生期貨作爲他們完成(完成 FS給未來的情況下(可能由不同的執行者創造 實例)或被取消)。在as_completed() 之前完成的任何期貨將首先被取消。返回的迭代器引發一個 TimeoutError,如果接下來()被調用,並且在原始調用到as_completed()的超時秒數後結果不可用 。 超時可以是一個int或float。如果沒有指定超時或無, 等待時間沒有限制。

您需要了解executor.map()executor.submit()之間的差異。第一個將一個函數映射到一個參數向量。它與map非常相似,但異步啓動任務。每次通話時,submit(func, arg)啓動一項任務。在此任務中,應用func

這是一個使用as_completed()submit()的例子,我可以在python 3上運行。0

from concurrent import futures 
import urllib.request 

URLS = ['http://www.foxnews.com/', 
     'http://www.cnn.com/', 
     'http://europe.wsj.com/', 
     'http://www.bbc.co.uk/', 
     'http://some-made-up-domain.com/'] 

def load_url(url, timeout): 
    return urllib.request.urlopen(url, timeout=timeout).read() 

def main(): 
    with futures.ThreadPoolExecutor(max_workers=5) as executor: 
     future_to_url = dict(
      (executor.submit(load_url, url, 60), url) 
      for url in URLS) 

     for future in futures.as_completed(future_to_url): 
      url = future_to_url[future] 
      try: 
       print('%r page is %d bytes' % (
          url, len(future.result()))) 
      except Exception as e: 
       print('%r generated an exception: %s' % (
          url, e)) 

if __name__ == '__main__': 
    main() 

沒有map()用在這裏,任務與submit運行和as_completed()

返回了能產生 期貨作爲他們完成FS給出的未來實例的迭代器(成品或已取消) 。