2016-07-29 121 views
10

我有一個多處理作業,我在排隊只讀numpy數組,作爲生產者消費者管道的一部分。只讀numpy陣列的快速隊列

目前他們正在醃製,因爲這是multiprocessing.Queue的默認行爲,這會降低性能。

是否有任何pythonic方式來傳遞引用共享內存,而不是酸洗數組?

不幸的是,數組正在消費者啓動後生成,並且沒有簡單的方法。 (所以全局變量的方法會很醜陋...)。

[請注意,在下面的代碼中,我們並不期待h(x0)和h(x1)被並行計算。相反,我們看到H(X0)和g,其中並行計算(如CPU的流水線)(H(X1))。]

from multiprocessing import Process, Queue 
import numpy as np 

class __EndToken(object): 
    pass 

def parrallel_pipeline(buffer_size=50): 
    def parrallel_pipeline_with_args(f): 
     def consumer(xs, q): 
      for x in xs: 
       q.put(x) 
      q.put(__EndToken()) 

     def parallel_generator(f_xs): 
      q = Queue(buffer_size) 
      consumer_process = Process(target=consumer,args=(f_xs,q,)) 
      consumer_process.start() 
      while True: 
       x = q.get() 
       if isinstance(x, __EndToken): 
        break 
       yield x 

     def f_wrapper(xs): 
      return parallel_generator(f(xs)) 

     return f_wrapper 
    return parrallel_pipeline_with_args 


@parrallel_pipeline(3) 
def f(xs): 
    for x in xs: 
     yield x + 1.0 

@parrallel_pipeline(3) 
def g(xs): 
    for x in xs: 
     yield x * 3 

@parrallel_pipeline(3) 
def h(xs): 
    for x in xs: 
     yield x * x 

def xs(): 
    for i in range(1000): 
     yield np.random.uniform(0,1,(500,2000)) 

if __name__ == "__main__": 
    rs = f(g(h(xs()))) 
    for r in rs: 
     print r 
+0

安裝您可以分享一些代碼嗎? –

+0

嗯,不是實際的代碼。會模擬出類似的東西。 –

+0

只要它是[最小,完整和可驗證的示例](http://stackoverflow.com/help/mcve)... –

回答

11

線程之間共享內存或處理

使用線程由於您使用numpy的多處理

代替,你可以採取的事實the global interpreter lock is released during numpy computations優勢。這意味着您可以使用標準線程和共享內存進行並行處理,而不是多處理和進程間通信。以下是您的代碼版本,調整爲使用threading.Thread和Queue.Queue,而不是多處理.Process和multiprocessing.Queue。這通過一個隊列的numpy ndarray沒有酸洗它。在我的電腦上,這比你的代碼運行速度快了3倍。 (然而,這是不是你的代碼的串行版本更快的只有20%左右。我已經提出了一些其他的方法進一步下跌。)

from threading import Thread 
from Queue import Queue 
import numpy as np 

class __EndToken(object): 
    pass 

def parallel_pipeline(buffer_size=50): 
    def parallel_pipeline_with_args(f): 
     def consumer(xs, q): 
      for x in xs: 
       q.put(x) 
      q.put(__EndToken()) 

     def parallel_generator(f_xs): 
      q = Queue(buffer_size) 
      consumer_process = Thread(target=consumer,args=(f_xs,q,)) 
      consumer_process.start() 
      while True: 
       x = q.get() 
       if isinstance(x, __EndToken): 
        break 
       yield x 

     def f_wrapper(xs): 
      return parallel_generator(f(xs)) 

     return f_wrapper 
    return parallel_pipeline_with_args 

@parallel_pipeline(3) 
def f(xs): 
    for x in xs: 
     yield x + 1.0 

@parallel_pipeline(3) 
def g(xs): 
    for x in xs: 
     yield x * 3 

@parallel_pipeline(3) 
def h(xs): 
    for x in xs: 
     yield x * x 

def xs(): 
    for i in range(1000): 
     yield np.random.uniform(0,1,(500,2000)) 

rs = f(g(h(xs()))) 
%time print sum(r.sum() for r in rs) # 12.2s 

商店numpy的陣列共享內存

另一種選擇,緊挨什麼你所要求的將是繼續使用多處理包,但是使用存儲在共享內存中的數組在進程之間傳遞數據。下面的代碼創建一個新的ArrayQueue類來做到這一點。應該在產生子進程之前創建ArrayQueue對象。它創建並管理由共享內存支持的numpy陣列池。當結果數組被推入隊列時,ArrayQueue將數組中的數據複製到現有的共享內存數組中,然後將共享內存數組的ID傳遞到隊列中。這比通過隊列發送整個數組要快得多,因爲它避免了酸洗數組。這與上面的線程版本(大約慢10%)具有相似的性能,並且如果全局解釋器鎖定是一個問題(即,您在函數中運行了很多python代碼),則可能會擴展得更好。樣品代替功能

上述代碼的

from multiprocessing import Process, Queue, Array 
import numpy as np 

class ArrayQueue(object): 
    def __init__(self, template, maxsize=0): 
     if type(template) is not np.ndarray: 
      raise ValueError('ArrayQueue(template, maxsize) must use a numpy.ndarray as the template.') 
     if maxsize == 0: 
      # this queue cannot be infinite, because it will be backed by real objects 
      raise ValueError('ArrayQueue(template, maxsize) must use a finite value for maxsize.') 

     # find the size and data type for the arrays 
     # note: every ndarray put on the queue must be this size 
     self.dtype = template.dtype 
     self.shape = template.shape 
     self.byte_count = len(template.data) 

     # make a pool of numpy arrays, each backed by shared memory, 
     # and create a queue to keep track of which ones are free 
     self.array_pool = [None] * maxsize 
     self.free_arrays = Queue(maxsize) 
     for i in range(maxsize): 
      buf = Array('c', self.byte_count, lock=False) 
      self.array_pool[i] = np.frombuffer(buf, dtype=self.dtype).reshape(self.shape) 
      self.free_arrays.put(i) 

     self.q = Queue(maxsize) 

    def put(self, item, *args, **kwargs): 
     if type(item) is np.ndarray: 
      if item.dtype == self.dtype and item.shape == self.shape and len(item.data)==self.byte_count: 
       # get the ID of an available shared-memory array 
       id = self.free_arrays.get() 
       # copy item to the shared-memory array 
       self.array_pool[id][:] = item 
       # put the array's id (not the whole array) onto the queue 
       new_item = id 
      else: 
       raise ValueError(
        'ndarray does not match type or shape of template used to initialize ArrayQueue' 
       ) 
     else: 
      # not an ndarray 
      # put the original item on the queue (as a tuple, so we know it's not an ID) 
      new_item = (item,) 
     self.q.put(new_item, *args, **kwargs) 

    def get(self, *args, **kwargs): 
     item = self.q.get(*args, **kwargs) 
     if type(item) is tuple: 
      # unpack the original item 
      return item[0] 
     else: 
      # item is the id of a shared-memory array 
      # copy the array 
      arr = self.array_pool[item].copy() 
      # put the shared-memory array back into the pool 
      self.free_arrays.put(item) 
      return arr 

class __EndToken(object): 
    pass 

def parallel_pipeline(buffer_size=50): 
    def parallel_pipeline_with_args(f): 
     def consumer(xs, q): 
      for x in xs: 
       q.put(x) 
      q.put(__EndToken()) 

     def parallel_generator(f_xs): 
      q = ArrayQueue(template=np.zeros(0,1,(500,2000)), maxsize=buffer_size) 
      consumer_process = Process(target=consumer,args=(f_xs,q,)) 
      consumer_process.start() 
      while True: 
       x = q.get() 
       if isinstance(x, __EndToken): 
        break 
       yield x 

     def f_wrapper(xs): 
      return parallel_generator(f(xs)) 

     return f_wrapper 
    return parallel_pipeline_with_args 


@parallel_pipeline(3) 
def f(xs): 
    for x in xs: 
     yield x + 1.0 

@parallel_pipeline(3) 
def g(xs): 
    for x in xs: 
     yield x * 3 

@parallel_pipeline(3) 
def h(xs): 
    for x in xs: 
     yield x * x 

def xs(): 
    for i in range(1000): 
     yield np.random.uniform(0,1,(500,2000)) 

print "multiprocessing with shared-memory arrays:" 
%time print sum(r.sum() for r in f(g(h(xs())))) # 13.5s 

並行處理比單線程版本快只有約20%(相對於12.2S下面所示的串行版本14.8s)。這是因爲每個函數都在單個線程或進程中運行,並且大部分工作都由xs()完成。上述示例的執行時間與剛剛運行%time print sum(1 for x in xs())時的執行時間幾乎相同。

如果您的真實項目有更多的中間功能和/或它們比您展示的更復雜,那麼工作負載可能會在處理器之間分配得更好,這可能不成問題。但是,如果您的工作負載確實與您提供的代碼相似,那麼您可能需要重構代碼,以便爲每個線程分配一個樣本而不是一個函數。這看起來就像下面的代碼(包括線程和多版本所示):

import multiprocessing 
import threading, Queue 
import numpy as np 

def f(x): 
    return x + 1.0 

def g(x): 
    return x * 3 

def h(x): 
    return x * x 

def final(i): 
    return f(g(h(x(i)))) 

def final_sum(i): 
    return f(g(h(x(i)))).sum() 

def x(i): 
    # produce sample number i 
    return np.random.uniform(0, 1, (500, 2000)) 

def rs_serial(func, n): 
    for i in range(n): 
     yield func(i) 

def rs_parallel_threaded(func, n): 
    todo = range(n) 
    q = Queue.Queue(2*n_workers) 

    def worker(): 
     while True: 
      try: 
       # the global interpreter lock ensures only one thread does this at a time 
       i = todo.pop() 
       q.put(func(i)) 
      except IndexError: 
       # none left to do 
       q.put(None) 
       break 

    threads = [] 
    for j in range(n_workers): 
     t = threading.Thread(target=worker) 
     t.daemon=False 
     threads.append(t) # in case it's needed later 
     t.start() 

    while True: 
     x = q.get() 
     if x is None: 
      break 
     else: 
      yield x 

def rs_parallel_mp(func, n): 
    pool = multiprocessing.Pool(n_workers) 
    return pool.imap_unordered(func, range(n)) 

n_workers = 4 
n_samples = 1000 

print "serial:" # 14.8s 
%time print sum(r.sum() for r in rs_serial(final, n_samples)) 
print "threaded:" # 10.1s 
%time print sum(r.sum() for r in rs_parallel_threaded(final, n_samples)) 

print "mp return arrays:" # 19.6s 
%time print sum(r.sum() for r in rs_parallel_mp(final, n_samples)) 
print "mp return results:" # 8.4s 
%time print sum(r_sum for r_sum in rs_parallel_mp(final_sum, n_samples)) 

這段代碼的線程版本只比我給的第一個例子更快,大約只有30%,比串行快版。這並不像我預期的那麼快;也許Python仍然受到GIL的部分困擾?

多處理版本執行速度明顯快於原始多處理代碼,主要是因爲所有函數都在單個進程中鏈接在一起,而不是排隊(和酸洗)中間結果。但是,它仍然比串行版本慢,因爲所有結果數組都必須在由imap_unordered返回之前進行pickle(在工作進程中)和unpickled(在主進程中)。但是,如果您可以安排它以便管道返回聚合結果而不是完整陣列,則可以避免酸洗開銷,並且多處理版本最快:比串行版本快大約43%。

好吧,現在爲了完整起見,下面是第二個例子的版本,它使用了多處理功能和原始的生成器函數,而不是上面顯示的更精細的函數。這使用一些技巧在多個進程之間傳播樣本,這可能使它不適用於許多工作流程。但使用發生器似乎比使用更精細的函數稍微快一些,與上面顯示的串行版本相比,此方法可使您的速度提高達54%。但是,只有當您不需要從工作函數返回完整數組時纔可用。

import multiprocessing, itertools, math 
import numpy as np 

def f(xs): 
    for x in xs: 
     yield x + 1.0 

def g(xs): 
    for x in xs: 
     yield x * 3 

def h(xs): 
    for x in xs: 
     yield x * x 

def xs(): 
    for i in range(1000): 
     yield np.random.uniform(0,1,(500,2000)) 

def final(): 
    return f(g(h(xs()))) 

def final_sum(): 
    for x in f(g(h(xs()))): 
     yield x.sum() 

def get_chunk(args): 
    """Retrieve n values (n=args[1]) from a generator function (f=args[0]) and return them as a list. 
    This runs in a worker process and does all the computation.""" 
    return list(itertools.islice(args[0](), args[1])) 

def parallelize(gen_func, max_items, n_workers=4, chunk_size=50): 
    """Pull up to max_items items from several copies of gen_func, in small groups in parallel processes. 
    chunk_size should be big enough to improve efficiency (one copy of gen_func will be run for each chunk) 
    but small enough to avoid exhausting memory (each worker will keep chunk_size items in memory).""" 

    pool = multiprocessing.Pool(n_workers) 

    # how many chunks will be needed to yield at least max_items items? 
    n_chunks = int(math.ceil(float(max_items)/float(chunk_size))) 

    # generate a suitable series of arguments for get_chunk() 
    args_list = itertools.repeat((gen_func, chunk_size), n_chunks) 

    # chunk_gen will yield a series of chunks (lists of results) from the generator function, 
    # totaling n_chunks * chunk_size items (which is >= max_items) 
    chunk_gen = pool.imap_unordered(get_chunk, args_list) 

    # parallel_gen flattens the chunks, and yields individual items 
    parallel_gen = itertools.chain.from_iterable(chunk_gen) 

    # limit the output to max_items items 
    return itertools.islice(parallel_gen, max_items) 


# in this case, the parallel version is slower than a single process, probably 
# due to overhead of gathering numpy arrays in imap_unordered (via pickle?) 
print "serial, return arrays:" # 15.3s 
%time print sum(r.sum() for r in final()) 
print "parallel, return arrays:" # 24.2s 
%time print sum(r.sum() for r in parallelize(final, max_items=1000)) 


# in this case, the parallel version is more than twice as fast as the single-thread version 
print "serial, return result:" # 15.1s 
%time print sum(r for r in final_sum()) 
print "parallel, return result:" # 6.8s 
%time print sum(r for r in parallelize(final_sum, max_items=1000)) 

+0

這個第一句話是我不知道的超級有用的東西。 +1 –

0

您的例子似乎並不在我的電腦上運行,雖然可能有與我正在運行Windows(事件醃製任何不在__main__命名空間(任何裝飾))的事實......這樣的事情會有幫助嗎? (你將不得不把它放進f(),g()和h()中的每一個裏面)

注意*我不確定這實際上會有更快的速度......只是刺別人建議..

from multiprocessing import Process, freeze_support 
from multiprocessing.sharedctypes import Value, Array 
import numpy as np 

def package(arr): 
    shape = Array('i', arr.shape, lock=False) 

    if arr.dtype == float: 
     ctype = Value('c', b'd') #d for double #f for single 
    if arr.dtype == int: 
     ctype = Value('c', b'i') #if statements could be avoided if data is always the same 
    data = Array(ctype.value, arr.reshape(-1),lock=False) 

    return data, shape 

def unpack(data, shape): 
    return np.array(data[:]).reshape(shape[:]) 

#test 
def f(args): 
    print(unpack(*args)) 

if __name__ == '__main__': 
    freeze_support() 

    a = np.array([1,2,3,4,5]) 
    a_packed = package(a) 
    print('array has been packaged') 

    p = Process(target=f, args=(a_packed,)) 
    print('passing to parallel process') 
    p.start() 

    print('joining to parent process') 
    p.join() 
    print('finished') 
0

退房的Pathos-multiprocessing project,避免酸洗生產標準multiprocessing依賴。這應該允許你避開酸洗的低效率,並且讓你訪問只讀共享資源的公共內存。請注意,儘管Pathos正在接近部署完整的PIP包,但在此期間,我建議使用pip install git+https://github.com/uqfoundation/pathos