6

我試圖做一個文件像對象,這意味着分配給sys.stdout/sys.stderr在測試過程中提供確定性的輸出。這並不意味着要快,只是可靠。我到目前爲止的差不多的作品,但我需要一些幫助擺脫最後幾個邊緣案例的錯誤。Python多處理:同步類文件對象

這是我目前的實施。

try: 
    from cStringIO import StringIO 
except ImportError: 
    from StringIO import StringIO 

from os import getpid 
class MultiProcessFile(object): 
    """ 
    helper for testing multiprocessing 

    multiprocessing poses a problem for doctests, since the strategy 
    of replacing sys.stdout/stderr with file-like objects then 
    inspecting the results won't work: the child processes will 
    write to the objects, but the data will not be reflected 
    in the parent doctest-ing process. 

    The solution is to create file-like objects which will interact with 
    multiprocessing in a more desirable way. 

    All processes can write to this object, but only the creator can read. 
    This allows the testing system to see a unified picture of I/O. 
    """ 
    def __init__(self): 
     # per advice at: 
     # http://docs.python.org/library/multiprocessing.html#all-platforms 
     from multiprocessing import Queue 
     self.__master = getpid() 
     self.__queue = Queue() 
     self.__buffer = StringIO() 
     self.softspace = 0 

    def buffer(self): 
     if getpid() != self.__master: 
      return 

     from Queue import Empty 
     from collections import defaultdict 
     cache = defaultdict(str) 
     while True: 
      try: 
       pid, data = self.__queue.get_nowait() 
      except Empty: 
       break 
      cache[pid] += data 
     for pid in sorted(cache): 
      self.__buffer.write('%s wrote: %r\n' % (pid, cache[pid])) 
    def write(self, data): 
     self.__queue.put((getpid(), data)) 
    def __iter__(self): 
     "getattr doesn't work for iter()" 
     self.buffer() 
     return self.__buffer 
    def getvalue(self): 
     self.buffer() 
     return self.__buffer.getvalue() 
    def flush(self): 
     "meaningless" 
     pass 

...和快速測試腳本:

#!/usr/bin/python2.6 

from multiprocessing import Process 
from mpfile import MultiProcessFile 

def printer(msg): 
    print msg 

processes = [] 
for i in range(20): 
    processes.append(Process(target=printer, args=(i,), name='printer')) 

print 'START' 
import sys 
buffer = MultiProcessFile() 
sys.stdout = buffer 

for p in processes: 
    p.start() 
for p in processes: 
    p.join() 

for i in range(20): 
    print i, 
print 

sys.stdout = sys.__stdout__ 
sys.stderr = sys.__stderr__ 
print 
print 'DONE' 
print 
buffer.buffer() 
print buffer.getvalue() 

這工作的時間完美95%,但它有三個邊的情況下的問題。我必須在快速的while循環中運行測試腳本來重現這些。

  1. 3%的時間,父進程輸出沒有完全反映。我認爲這是因爲在隊列清洗線程可以趕上之前數據正在被消耗。我沒有辦法等待線程沒有死鎖。 0%的時間裏,有一個從多進程回溯.Queue執行
  2. .01%的時間,PIDs環繞,所以按PID排序給出了錯誤的排序。

在很最壞的情況下(可能性:一個70億美元),輸出應該是這樣的:

START 

DONE 

302 wrote: '19\n' 
32731 wrote: '0 1 2 3 4 5 6 7 8 ' 
32732 wrote: '0\n' 
32734 wrote: '1\n' 
32735 wrote: '2\n' 
32736 wrote: '3\n' 
32737 wrote: '4\n' 
32738 wrote: '5\n' 
32743 wrote: '6\n' 
32744 wrote: '7\n' 
32745 wrote: '8\n' 
32749 wrote: '9\n' 
32751 wrote: '10\n' 
32752 wrote: '11\n' 
32753 wrote: '12\n' 
32754 wrote: '13\n' 
32756 wrote: '14\n' 
32757 wrote: '15\n' 
32759 wrote: '16\n' 
32760 wrote: '17\n' 
32761 wrote: '18\n' 

Exception in thread QueueFeederThread (most likely raised during interpreter shutdown): 
Traceback (most recent call last): 
    File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner 
    File "/usr/lib/python2.6/threading.py", line 484, in run 
     File "/usr/lib/python2.6/multiprocessing/queues.py", line 233, in _feed 
<type 'exceptions.TypeError'>: 'NoneType' object is not callable 

在python2.7例外略有不同:

Exception in thread QueueFeederThread (most likely raised during interpreter shutdown): 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner 
    File "/usr/lib/python2.7/threading.py", line 505, in run 
    File "/usr/lib/python2.7/multiprocessing/queues.py", line 268, in _feed 
<type 'exceptions.IOError'>: [Errno 32] Broken pipe 

我該如何擺脫這些邊緣情況?

+1

你問的實際問題是什麼?爲什麼你得到這些例外?爲什麼每個邊緣案例都在發生? – 2011-04-28 16:31:45

+0

@丹尼爾:如何擺脫這三個問題。我想我通過在介紹中加入一句話來使自己更加清楚。這有幫助嗎?。 – bukzor 2011-04-28 16:43:12

回答

8

該解決方案分爲兩部分。我已經成功運行了20萬次測試程序,沒有任何輸出變化。

最簡單的部分是使用multiprocessing.current_process()._標識對消息進行排序。這不是已發佈API的一部分,但它是每個進程的唯一確定性標識符。這解決了PID封裝並給出錯誤排序的問題。

解決方案的其他部分是使用multiprocessing.Manager()。Queue()而不是multiprocessing.Queue。這解決了上面的問題#2,因爲管理器位於單獨的進程中,因此在使用來自擁有進程的隊列時避免了一些不好的特殊情況。 #3是固定的,因爲Queue完全耗盡,並且在python開始關閉並關閉標準輸入前,饋線線程自然死亡。

+4

multiprocessing.Manager()隊列(),而不是multiprocessing.Queue擺脫了的 「<類型 'exceptions.IOError'>:[錯誤32]中斷的管道」 錯誤在python 2.7我 – 2013-11-09 00:13:17

+0

@JoshuaRichardson使用'多.Manager()。Queue()'也爲我解決了。但是我的測試花了大約7倍於'mutliprocessing.queues.Queue()'的時間。 – Bengt 2014-01-07 23:24:14

+0

@Bengt:我希望你沒有爲每個隊列做一名經理。你只需要一個。你能告訴我們一個最小的基準嗎? – bukzor 2014-01-08 03:29:20

0

我使用Python 2.7比使用Python 2.6時遇到了更少的multiprocessing錯誤。說了這樣的話,我用來避免「Exception in thread QueueFeederThread」問題的解決方案在Queue的每個過程中瞬間可能爲0.0124秒,可能爲sleep。誠然,使用sleep是不可取的或者甚至是不可靠的,但是對於我來說,觀察到規定的持續時間對於實踐來說工作得非常好。你也可以嘗試0.1s。

+2

嗜睡症從來不是一個可靠的解決方案。 – bukzor 2012-07-18 17:04:11