2017-03-05 83 views
1

我想分割下面的代碼,以允許在python中進行多處理,它對我來說真的變成了一項令人沮喪的任務 - 我是多處理新手,已閱讀文檔和儘可能多的示例發現但仍然沒有找到一種解決方案,一次可以在所有cpu內核上運行。Python中迭代多重處理

我想將迭代器拆分成四分之一,並讓它平行計算測試。

我單線程例如:

import itertools as it 
import numpy as np 

wmod = np.array([[0,1,2],[3,4,5],[6,7,3]]) 
pmod = np.array([[0,1,2],[3,4,5],[6,7,3]]) 

plines1 = it.product(wmod[0],wmod[1],wmod[2]) 
plines2 = it.product(pmod[0],pmod[1],pmod[2]) 

check = .915 
result = [] 

for count, (A,B) in enumerate(zip(plines1,plines2)): 
    pass 

    test = (sum(B)+10)/(sum(A)+12) 
    if test > check: 
     result = np.append(result,[A,B]) 
print('results: ',result) 

我意識到這是一對3×3矩陣的一個非常小的例子,但我想將它應用到一對較大的矩陣,並採取有關一小時計算。我很欣賞給出的任何建議。

+0

嗯,一方面,我會將'result = np.append(result,[A,B])'從循環內部移出。你爲什麼在這裏使用'numpy'數組而不是'list'?像這樣追加對於數組和列表來說效率非常低。奇怪的是你也使用'result = []'... –

+0

爲了擴展性和效率,我決定使用numpy。正如我所說的,3x3矩陣僅用於此示例。而for循環是一個迭代,它不會保留數據,除非我以某種方式檢索它。 –

+0

是的,但'numpy'不會讓你的代碼更具可伸縮性。像這樣使用'numpy'會產生相反的效果。 –

回答

0

我會建議使用隊列來轉儲您的iterables。類似的東西:

import multiprocessing as mp 
import numpy as np 
import itertools as it 


def worker(in_queue, out_queue): 
    check = 0.915 
    for a in iter(in_queue.get, 'STOP'): 
     A = a[0] 
     B = a[1] 
     test = (sum(B)+10)/(sum(A)+12) 
     if test > check: 
      out_queue.put([A,B]) 
     else: 
      out_queue.put('') 

if __name__ == "__main__": 
    wmod = np.array([[0,1,2],[3,4,5],[6,7,3]]) 
    pmod = np.array([[0,1,2],[3,4,5],[6,7,3]]) 

    plines1 = it.product(wmod[0],wmod[1],wmod[2]) 
    plines2 = it.product(pmod[0],pmod[1],pmod[2]) 

    # determine length of your iterator 
    counts = 26 

    # setup iterator 
    it = zip(plines1,plines2) 

    in_queue = mp.Queue() 
    out_queue = mp.Queue() 

    # setup workers 
    numProc = 2 
    process = [mp.Process(target=worker, 
          args=(in_queue, out_queue), daemon=True) for x in range(numProc)] 

    # run processes 
    for p in process: 
     p.start() 

    results = [] 
    control = True 

    # fill queue and get data 
    # code fills the queue until a new element is available in the output 
    # fill blocks if no slot is available in the in_queue 
    for idx in range(counts): 
     while out_queue.empty() and control: 
      # fill the queue 
      try: 
       in_queue.put(next(it), block=True) 
      except StopIteration: 
       # signals for processes stop 
       for p in process: 
        print('stopping') 
        in_queue.put('STOP') 
       control = False 
       break 
     results.append(out_queue.get(timeout=10)) 

    # wait for processes to finish 
    for p in process: 
     p.join() 

    print(results) 

    print('finished') 

但是,你將不得不首先確定你的任務列表將是多久。

+0

我試圖理解所有代碼,然後將其實施到我的項目中,但是當我嘗試運行示例時,出現錯誤,指出對象int不可迭代。 –

+0

你能指出他在抱怨哪一行嗎?也許這是導致問題的'工作人員'功能。嘗試在你的'test'中加入一個簡單的打印命令。 – RaJa

+0

我已經通過一個工作示例或測試函數更正了我的答案。它在我的python 3.5上運行時沒有錯誤。 – RaJa