2015-02-23 38 views
1

我創建了一些多處理代碼 - 檢測問題非常簡單,並且發現了一些問題 - 隊列沒有與同步一起更新。爲什麼隊列不是多進程安全的?

# coding=utf-8 
import multiprocessing 

def do_work(input_queue, output_queue): 
    print multiprocessing.current_process().name 
    input_queue.put(1) 
    while not input_queue.empty(): 
    output_queue.put(input_queue.get() + 1) 

def main(): 
    input_queue = multiprocessing.Queue() 
    output_queue = multiprocessing.Queue() 
    for i in range(8): 
    input_queue.put(i) 

    processes = [] 
    for i in range(2): 
    process = multiprocessing.Process(name = str(i), 
             target = do_work, 
             args = (input_queue, 
               output_queue),) 
    processes.append(process) 
    process.start() 
    for process in processes: 
    process.join() 
    results = [] 
    while not output_queue.empty(): 
    results.append(output_queue.get()) 
    print len(results), results 

if __name__ == '__main__': 
    main() 

有時候結果是 - 什麼好看:

process 0 
process 1 
10 [2, 1, 3, 4, 6, 5, 8, 7, 2, 2] 

但有時結果就像值1不同的過程開始時沒有把:

process 0 
process 1 
9 [1, 2, 3, 4, 5, 6, 7, 8, 2] 

它看起來是不是問題與打印,因爲它是在主線程完成,但隊列不支持進程間鎖定。你能提出一些建議嗎?

+2

這不會使數據結構變得不安全;它是異步行爲,這是我完全希望發生的事情。 – Makoto 2015-02-23 23:55:17

+0

Makoto 10值= 10結果未知的順序,但不是9結果 - 再次看到我簡化了演示文稿。 – Chameleon 2015-02-24 00:02:01

+0

你從哪裏得到10? – Makoto 2015-02-24 00:03:53

回答

2

這裏是你的代碼略有變化:

# coding=utf-8 
import multiprocessing 

def do_work(input_queue, output_queue, lock): 
    with lock: 
    input_queue.put(1) 
    print input_queue.empty(), input_queue.qsize() 
    while not input_queue.empty(): 
     output_queue.put(input_queue.get() + 1) 

def main(): 
    input_queue = multiprocessing.Queue() 
    output_queue = multiprocessing.Queue() 
    lock = multiprocessing.Lock() 
    for i in range(8): 
    input_queue.put(i) 

    processes = [] 
    for i in range(2): 
    process = multiprocessing.Process(name = str(i), 
             target = do_work, 
             args = (input_queue, 
               output_queue, lock),) 
    processes.append(process) 
    process.start() 
    for process in processes: 
    process.join() 
    results = [] 
    while not output_queue.empty(): 
    results.append(output_queue.get()) 
    print len(results), results 

if __name__ == '__main__': 
    main() 

注意,現在整個過程是一個鎖下,所以沒有競爭條件是可能的,而且它也打印輸入隊列的大小和是否是空的或不。現在,這裏的運行的一個輸出:

False 9 
True 1 
9 [1, 2, 3, 4, 5, 6, 7, 8, 2] 

注二等過程是怎樣說,隊列爲空,但在同一時間有一個元素。原因是在文檔中:

empty()如果​​隊列爲空則返回True,否則返回False。由於 多線程/多處理語義,這是不可靠

要修復它,您可以用while input_queue.qsize() > 0替換您的條件while not input_queue.empty()。當你這樣做時,你會看到你的代碼掛起。這很有意義,因爲你首先檢查隊列的大小,然後嘗試彈出它。考慮以下情況:隊列中有一個元素,兩個線程都可以看到,並嘗試彈出。一個成功,另一個現在試圖從空隊列中彈出,並阻止。要解決這個問題,請嘗試做非阻塞彈出窗口,如果失敗則重試:

# coding=utf-8 
import multiprocessing 
import Queue 

def do_work(input_queue, output_queue): 
    input_queue.put(1) 
    while input_queue.qsize() > 0: 
    try: 
     output_queue.put(input_queue.get(False) + 1) 
    except Queue.Empty: 
     pass 

def main(): 
    input_queue = multiprocessing.Queue() 
    output_queue = multiprocessing.Queue() 
    for i in range(8): 
    input_queue.put(i) 

    processes = [] 
    for i in range(2): 
    process = multiprocessing.Process(name = str(i), 
             target = do_work, 
             args = (input_queue, 
               output_queue)) 
    processes.append(process) 
    process.start() 
    for process in processes: 
    process.join() 
    results = [] 
    while True: 
    try: 
     results.append(output_queue.get(False)) 
    except Queue.Empty: 
     break 
    print len(results), results 

if __name__ == '__main__': 
    main() 
+0

你對鎖定是正確的,但這並不能解釋爲什麼重複值出現在隊列中。 – Makoto 2015-02-24 00:15:11

+0

@Makoto,因爲他插入了三次,然後重新插入從輸入隊列到輸出隊列的所有內容。重複是非常有意義的 – Ishamael 2015-02-24 00:17:59

+0

@makato它是錯誤的代碼它會給10或9個結果 - 測試! – Chameleon 2015-02-24 00:19:37