0

我不知道如何通過多處理工作來計算壞像素我到目前爲止沒有多處理並分析需要分析的10張圖片大約需要7分鐘...通過在Python中使用多處理來計算死像素

import random 
import time 

from multiprocessing import Process, Queue, current_process, freeze_support 
from PIL import Image, ImageDraw 

image1 = Image.open('MA_HA1_drawing_0.png') 
image2 = Image.open('MA_HA1_drawing_1.png') 
image2 = Image.open('MA_HA1_drawing_2.png') 
image3 = Image.open('MA_HA1_drawing_3.png') 
image4 = Image.open('MA_HA1_drawing_4.png') 
image5 = Image.open('MA_HA1_drawing_5.png') 
image6 = Image.open('MA_HA1_drawing_6.png') 
image7 = Image.open('MA_HA1_drawing_7.png') 
image8 = Image.open('MA_HA1_drawing_8.png') 
image9 = Image.open('MA_HA1_drawing_9.png') 

def analyze_picture(image): 
    time.sleep(0.5*random.random()) 
    counter = 0 
    for x in range(616,6446): 
     for y in range(756,3712): 
      r,g,b = image.getpixel((x,y)) 

      if r != 1 and g != 1 and b != 1: 
       counter += 1 
    return counter 

def test(): 
    NUMBER_OF_PROCESSES = 4 
    TASKS1 = [(analyze_picture(image1))] 
    TASKS2 = [(analyze_picture(image2))] 
    TASKS3 = [(analyze_picture(image2))] 
    TASKS4 = [(analyze_picture(image3))] 
    TASKS5 = [(analyze_picture(image4))] 
    TASKS6 = [(analyze_picture(image5))] 
    TASKS7 = [(analyze_picture(image6))] 
    TASKS8 = [(analyze_picture(image7))] 
    TASKS9 = [(analyze_picture(image8))] 
    TASKS10 = [(analyze_picture(image9))] 

    print TASKS1 

if __name__ == '__main__': 
    freeze_support() 
    test() 

他們給了我們一些功能來理解多處理並將其用於我們的任務,但我不理解他們,也不知道如何使用它們。

def worker(input, output): 
    for func, args in iter(input.get, 'STOP'): 
     result = calculate(func, args) 
     output.put(result) 

def calculate(func, args): 
    result = func(*args) 
    return '%s says that %s%s = %s' % \ 
     (current_process().name, func.__name__, args, result) 

def mul(a, b): 
    time.sleep(0.5*random.random()) 
    return a * b 

def plus(a, b): 
    time.sleep(0.5*random.random()) 
    return a + b 

# Create queues 
    task_queue = Queue() 
    done_queue = Queue() 

    # Submit tasks 
    for task in TASKS1: 
     task_queue.put(task) 

    # Start worker processes 
    for i in range(NUMBER_OF_PROCESSES): 
     Process(target=worker, args=(task_queue, done_queue)).start() 
     print i 

    # Get and print results 
    print 'Unordered results:' 
    for i in range(len(TASKS1)): 
     print '\t', done_queue.get() 

    # Add more tasks using `put()` 
    for task in TASKS2: 
     task_queue.put(task) 

    # Get and print some more results 
    for i in range(len(TASKS2)): 
     print '\t', done_queue.get() 

    # Tell child processes to stop 
    for i in range(NUMBER_OF_PROCESSES): 
     task_queue.put('STOP') 
     print 'process ', i, ' is stopped' 

編輯:新代碼

import random 
import time 

from multiprocessing import Process, Queue, current_process, freeze_support 
from PIL import Image, ImageDraw 


def worker(input, output): 
    for func, args in iter(input.get, 'STOP'): 
     result = calculate(func, args) 
     output.put(result) 

def calculate(func, args): 
    result = func(args) 
    return '%s says that %s%s has %s dead pixels\n' % \ 
     (current_process().name, func.__name__, args, result) 

def analyze_picture(image_name): 
    t1 = time.clock() 
    image = Image.open(image_name) 
    time.sleep(0.5*random.random()) 
    counter = 0 
    for x in range(616,6446): 
     for y in range(756,3712): 
      r,g,b = image.getpixel((x,y)) 

      if r != 1 and g != 1 and b != 1: 
       counter += 1 

    t2 = time.clock() 
    dt = t2 - t1 
    print '\tThe process takes ',dt,' seconds.\n Result:\n' 
    return counter 

def test(): 


    NUMBER_OF_PROCESSES = 4 

    TASKS1 = [(analyze_picture, image_names[i]) for i in range(10)] 

    print TASKS1 

    # Create queues 
    task_queue = Queue() 
    done_queue = Queue() 

    # Submit tasks 
    for task in TASKS1: 
     task_queue.put(task) 

    # Start worker processes 
    for i in range(NUMBER_OF_PROCESSES): 
     Process(target=worker, args=(task_queue, done_queue)).start() 
     print i 

    # Get and print results 
    print 'Unordered results:' 
    for i in range(len(TASKS1)): 
     print '\t', done_queue.get() 

    # Tell child processes to stop 
    for i in range(NUMBER_OF_PROCESSES): 
     task_queue.put('STOP') 
     print 'process ', i, ' is stopped' 

if __name__ == '__main__': 
    image_names =[('MA_HA1_drawing_'+str(i)+'.png') for i in range(10)] 
    freeze_support() 
    test() 

回答

0

背後多的想法:

  • 創建幾個工人可以分佈到不同的內核中並行執行。
  • 對於多進程這些工作人員是與他們單獨的內存空間(與線程相反)的進程。
  • 由於獨立的內存空間,它們無法通過內存進行通信(接收任務併發送結果)。因此,需要進行進程間通信的隊列。
  • 現在,任務通過隊列分配給工人。
  • 最後,收集工人通過隊列發送的結果。

如果是強制使用的發佈代碼,你可以如下做到這一點:

  1. 創建隊列
  2. 開始工作進程
  3. 提交任務
    這是非常導入提交任務工人創建。可能的是,隊列的緩衝區已滿並阻塞,直到從隊列中取出某些內容爲止,但只要沒有工作人員,隊列中也不會有任何內容 - > DEADLOCK。
    既然你想要並行執行所有的圖像,你的TASKS1(承認複數)必須是[(analyze_picture, (analyze_picture(image1),), (analyze_picture, (analyze_picture(image2),), ...]worker需要一個函數元組和tuple本身作爲一個元組)。
  4. 獲取和打印效果
  5. 告訴子進程停止

也許這就是你在問什麼。

畢竟,有提高性能(和代碼的可讀性)的另外三個方面:

  • 進程間的通信是相當昂貴的。因此,您應該儘量減少從工作人員轉移到工作人員的數據。
    就你而言,這意味着只傳遞圖像名稱而不是整個圖像。此外,這導致所有圖像的並行讀入,因爲工作人員讀取圖像。
  • 所有勞動者的東西是在multiprocessing.Pool,從而降低了對多行代碼兩個已實現的:
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) result = pool.map(analyze_picture, [image1, image2, ...])
  • 最後但並非最不重要的,由像素迭代像素是相當緩慢的。隨着NumPy(或更高級別的SciPy),你可以加速它很多。

最後,你的腳本可能看起來像以下,並將於7分鐘快得多:

import multiprocessing as mp 
import numpy as np 
from scipy import misc 

def analyze_picture(imagename): 
    image = misc.imread(imagename) # image[y, x, r/g/b] 
    return len(np.argwhere((a[756:,616:,0]!=1) & (a[756:,616:,1]!=1) & (a[756:,616:,2]!=1))) 

def main(): 
    pool = mp.Pool()     # default: number of logical cores 
    result = pool.map(analyze_picture, ("MA_HA1_drawing_{}.png".format(i) 
              for i in range(10))) 
    print(result) 

if __name__ == '__main__': 
    mp.freeze_support() 
    main() 

我不知道你的圖像看起來怎麼樣(該{r,g,b}!=1奇怪),但在reference of scipy.misc.imread你會發現適合你的圖像模式。

+0

嗨@Chickenmarkus非常感謝你的詳細答覆。我很感激它,我試過它,然後你給我的答案,我得到它的工作,但現在我有另一個問題,結果是這樣的https://pastebin.com/gsviTkLj ,但我想它像 該過程需要x秒 結果 圖像X滿足x壞點 而不是 過程耗時x秒 結果 這個過程需要x秒...... 我將編輯我的問題,用新的代碼,所以你能看到,也許是問題 –

+0

的在'analyze_picture()'中打印時間,'test()'中結果的打印是由於多處理而異步運行的。工人通常不應該印刷一些東西(因爲不確定的不同步,而且通常有很多工人)。讓worker只返回值(或字符串),而只在主進程test()中打印它。 – Chickenmarkus