2017-05-04 48 views
0

我很奇怪,爲什麼我的CPU負荷是如此之低,即使我沒有得到一個高處理速度:multiprocessing.Pool縮放

import time 
from multiprocessing import Pool 
import numpy as np 
from skimage.transform import AffineTransform, SimilarityTransform, warp 

center_shift = 256/2 
tf_center = SimilarityTransform(translation=-center_shift) 
tf_uncenter = SimilarityTransform(translation=center_shift) 


def sample_gen_random_i(): 
    for i in range(10000000000000): 
     x = np.random.rand(256, 256, 4) 
     y = [0] 

     yield x, y 


def augment(sample): 
    x, y = sample 
    rotation = 2 * np.pi * np.random.random_sample() 
    translation = 5 * np.random.random_sample(), 5 * np.random.random_sample() 
    scale_factor = np.random.random_sample() * 0.2 + 0.9 
    scale = scale_factor, scale_factor 

    tf_augment = AffineTransform(scale=scale, rotation=rotation, translation=translation) 
    tf = tf_center + tf_augment + tf_uncenter 

    warped_x = warp(x, tf) 

    return warped_x, y 


def augment_parallel_sample_gen(samples): 
    p = Pool(4) 

    for sample in p.imap_unordered(augment, samples, chunksize=10): 
     yield sample 

    p.close() 
    p.join() 


def augment_sample_gen(samples): 
    for sample in samples: 
     yield augment(sample) 



# This is slow and the single cpu core has 100% load 
print('Single Thread --> Slow') 
samples = sample_gen_random_i() 
augmented = augment_sample_gen(samples) 

start = time.time() 
for i, sample in enumerate(augmented): 
    print(str(i) + '|' + str(i/(time.time() - start))[:6] + ' samples/second', end='\r') 
    if i >= 2000: 
     print(str(i) + '|' + str(i/(time.time() - start))[:6] + ' samples/second') 
     break 

# This is slow and there is only light load on the cpu cores 
print('Multithreaded --> Slow') 
samples = sample_gen_random_i() 
augmented = augment_parallel_sample_gen(samples) 

start = time.time() 
for i, sample in enumerate(augmented): 
    print(str(i) + '|' + str(i/(time.time() - start))[:6] + ' samples/second', end='\r') 
    if i >= 2000: 
     print(str(i) + '|' + str(i/(time.time() - start))[:6] + ' samples/second') 
     break 

我使用multiprocessing.Pool的IMAP,但我認爲有一些開銷。不使用增加和無多,150在與隆胸無多,我可以達到約500樣本/秒和170增強和多喜歡所以我懷疑一定有什麼毛病我的方法。代碼應該是可執行的和自我解釋的! :)

+1

[爲什麼多處理器只使用一個核心後我導入numpy的?(可能的重複http://stackoverflow.com/questions/1563977 9 /爲什麼 - 不 - 多處理 - 使用 - 只 - 一個單核心後的i-進口numpy的) – vks

回答

0

這個問題似乎是與

return warped_x, y 

傳遞圖像以將處理後和整個變換的圖像傳遞迴主過程似乎是瓶頸。如果我只給回例如第一像素

return x[0, 0, 0], y 

和移動樣本創建到子進程

def augment(y): 
    x = np.random.rand(256, 256, 4) 
    rotation = 2 * np.pi * np.random.random_sample() 
    ... 

速度將擴大近線性的內核數...

也許線程會比過程更(?)

+0

試一試,但要記住,在CPython的,在任何給定的時間只有一個線程不執行Python的字節碼。因此,如果在釋放全局解釋器鎖的C擴展中有足夠的工作正在進行,線程只能在這裏幫助。 – BlackJack