2016-11-18 63 views
0

所以我試圖找出一個通用的解決方案,它將收集函數中的所有值並將它們附加到稍後可訪問的列表中。這將在concurrent.futuresthreading類型任務期間使用。這裏是一個解決方案,我已經使用全局master_list如何在不使用全局變量的情況下在多線程中收集函數返回值?

from concurrent.futures import ThreadPoolExecutor 

master_list = [] 
def return_from_multithreaded(func): 
    # master_list = [] 
    def wrapper(*args, **kwargs): 
     # nonlocal master_list 
     global master_list 
     master_list += func(*args, **kwargs) 
    return wrapper 


@return_from_multithreaded 
def f(n): 
    return [n] 


with ThreadPoolExecutor(max_workers=20) as exec: 
    exec.map(f, range(1, 100)) 

print(master_list) 

我想找到一個解決方案,不包括全局,也許可以返回註釋掉master_list存儲爲一個封閉?

回答

2

如果你不想使用全局變量,不要丟棄的map結果。 map讓你回到每個函數返回的值,你只是忽略它們。這段代碼可以通過使用map根據其預期目的進行簡單得多:

def f(n): 
    return n # No need to wrap in list 

with ThreadPoolExecutor(max_workers=20) as exec: 
    master_list = list(exec.map(f, range(1, 100))) 

print(master_list) 

如果你需要一個master_list顯示,到目前爲止(也許其他線程都在注視着它)計算出的結果,你只要把循環明確:

def f(n): 
    return n # No need to wrap in list 

master_list = [] 
with ThreadPoolExecutor(max_workers=20) as exec: 
    for result in exec.map(f, range(1, 100)): 
     master_list.append(result) 

print(master_list) 

這是Executor模型的設計目的;普通線程並不打算返回值,但Executors提供了一個返回值的通道,所以你不必自己管理它。在內部,這是使用某種形式的隊列或其他元數據來保持結果的順序,但您不需要處理這種複雜性;從你的角度來看,它相當於常規的map函數,它只是平行工作。


更新覆蓋處理異常:

map將提高在工人提出的任何異常時,結果被擊中。因此,如書面所述,如果任何任務失敗,則第一組代碼將不會存儲任何內容(list將部分構建,但在異常提升時丟棄)。第二個例子只會在拋出第一個異常之前保留結果,其餘部分將被丟棄(您必須存儲迭代器map並使用一些不太合適的代碼來避免)。如果你需要存儲所有成功的結果,忽略失敗(或者只是記錄它們以某種方式),這是最容易使用submit創建Futurelist一個對象,然後等待他們,順序或按完成的順序,包裹.result()請撥打try/except以避免丟棄良好結果。例如,存儲的結果提交的順序,你會怎麼做:

master_list = [] 
with ThreadPoolExecutor(max_workers=20) as exec: 
    futures = [exec.submit(f, i) for i in range(1, 100)] 
    exec.shutdown(False) # Optional: workers terminate as soon as all futures finish, 
          # rather than waiting for all results to be processed 
    for fut in futures: 
     try: 
      master_list.append(fut.result()) 
     except Exception: 
      ... log error here ... 

爲了更有效的代碼,你可以完成,沒有提交的順序檢索結果,使用concurrent.futures.as_completed熱切檢索結果他們完成。從上面的代碼中唯一的變化是:

for fut in futures: 

變爲:

for fut in concurrent.futures.as_completed(futures): 

其中as_completed儘快做的yield工作ING完成/取消期貨作爲他們完整的,而不是推遲,直到所有期貨提交之前完成並得到處理。

有更復雜的選項涉及使用add_done_callback,所以主線程根本不涉及顯式處理結果,但這通常是不必要的,並且經常會令人困惑,因此如果可能,最好避免使用。

+0

+1用於分享好的信息。我有一個疑問,如果通過函數引發異常,它的行爲如何?它處理了嗎? –

+0

@Moinuddin根據我的經驗,使用ThreadPoolExcecutors而不是'map'來處理錯誤,您可以使用submit來返回未來,然後在完成後調用future.result()。這會引發任何被發現的例外。 – flybonzai

+0

@flybonzai:亞爾。創建'Future'的'list',然後,如果結果順序很重要,只需循環訪問'list'並調用'result'(包含在'try' /'except'中以處理worker中出現的異常)。如果結果順序無關緊要,[使用'concurrent.futures.as_completed'](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed),這會產生'未來的對象,因爲他們完成(成功或由於例外);再次,你可以在'try' /'except'塊中調用'result'來處理錯誤。如果命令不重要,後者通常更有效率。 – ShadowRanger

2

我以前遇到過這個問題:Running multiple asynchronous function and get the returned value of each function。這是我的方法去做:

def async_call(func_list): 
    """ 
    Runs the list of function asynchronously. 

    :param func_list: Expects list of lists to be of format 
     [[func1, args1, kwargs1], [func2, args2, kwargs2], ...] 
    :return: List of output of the functions 
     [output1, output2, ...] 
    """ 
    def worker(function, f_args, f_kwargs, queue, index): 
     """ 
     Runs the function and appends the output to list, and the Exception in the case of error 
     """ 
     response = { 
      'index': index, # For tracking the index of each function in actual list. 
          # Since, this function is called asynchronously, order in 
          # queue may differ 
      'data': None, 
      'error': None 
     } 

     # Handle error in the function call 
     try: 
      response['data'] = function(*f_args, **f_kwargs) 
     except Exception as e: 
      response['error'] = e # send back the exception along with the queue 

     queue.put(response) 
    queue = Queue() 
    processes = [Process(target=worker, args=(func, args, kwargs, queue, i)) \ 
        for i, (func, args, kwargs) in enumerate(func_list)] 

    for process in processes: 
     process.start() 

    response_list = [] 
    for process in processes: 
     # Wait for process to finish 
     process.join() 

     # Get back the response from the queue 
     response = queue.get() 
     if response['error']: 
      raise response['error'] # Raise exception if the function call failed 
     response_list.append(response) 

    return [content['data'] for content in sorted(response_list, key=lambda x: x['index'])] 

採樣運行:

def my_sum(x, y): 
    return x + y 

def your_mul(x, y): 
    return x*y 

my_func_list = [[my_sum, [1], {'y': 2}], [your_mul, [], {'x':1, 'y':2}]] 

async_call(my_func_list) 
# Value returned: [3, 2] 
相關問題