2011-03-30 45 views
3

我已經嘗試了python2.6 Ubuntu 包(__version__說0.70a1)和最新的PyPI(2.6.2.1)中包含的多處理。 在這兩種情況下,我都不知道如何正確使用imap - 它會導致整個解釋器停止響應ctrl-C(儘管如此,地圖工作正常)。 pdb顯示next()掛在條件變量wait()呼叫IMapIterator,所以沒有人喚醒我們。任何提示?提前致謝 。多處理Pool.imap破?

$ cat /tmp/go3.py 
import multiprocessing as mp 
print mp.Pool(1).map(abs, range(3)) 
print list(mp.Pool(1).imap(abs, range(3))) 

$ python /tmp/go3.py 
[0, 1, 2] 
^C^C^C^C^C^\Quit 

回答

9

首先注意這個工程:

import multiprocessing as mp 
import multiprocessing.util as util 
pool=mp.Pool(1) 
print list(pool.imap(abs, range(3))) 

不同的是,pool沒有得到最終確定後調用pool.imap()結束。

相比之下,

print(list(mp.Pool(1).imap(abs, range(3)))) 

導致Pool實例的imap通話結束後不久完成。 缺少引用會導致調用Finalizer(在Pool類中稱爲self._terminate)。這設置了一系列命令,將任務處理程序線程,結果處理程序線程,工作程序子進程等關閉。

這一切都發生得如此之快,至少在大多數運行中,任務發送到任務處理程序沒有完成。

這裏是代碼的相關位:

從/usr/lib/python2.6/multiprocessing/pool.py:

class Pool(object): 
    def __init__(self, processes=None, initializer=None, initargs=()): 
     ... 
     self._terminate = Finalize(
      self, self._terminate_pool, 
      args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 
        self._task_handler, self._result_handler, self._cache), 
      exitpriority=15 
      ) 

/usr/lib/python2.6/multiprocessing/ util.py:

class Finalize(object): 
    ''' 
    Class which supports object finalization using weakrefs 
    ''' 
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 
     ... 
     if obj is not None: 
      self._weakref = weakref.ref(obj, self) 

weakref.ref(obj,self)原因self()obj即將定稿被調用。

我使用調試命令util.log_to_stderr(util.SUBDEBUG)來了解事件的順序。例如:

import multiprocessing as mp 
import multiprocessing.util as util 
util.log_to_stderr(util.SUBDEBUG) 

print(list(mp.Pool(1).imap(abs, range(3)))) 

產生

[DEBUG/MainProcess] created semlock with handle 3077013504 
[DEBUG/MainProcess] created semlock with handle 3077009408 
[DEBUG/MainProcess] created semlock with handle 3077005312 
[DEBUG/MainProcess] created semlock with handle 3077001216 
[INFO/PoolWorker-1] child process calling self.run() 
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x9d6e62c>, <multiprocessing.queues.SimpleQueue object at 0x9cf04cc>, <multiprocessing.queues.SimpleQueue object at 0x9d6e40c>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1217967248)>, <Thread(Thread-2, started daemon -1226359952)>, {0: <multiprocessing.pool.IMapIterator object at 0x9d6eaec>}) and kwargs {} 
[DEBUG/MainProcess] finalizing pool 
... 

和比較,與

import multiprocessing as mp 
import multiprocessing.util as util 
util.log_to_stderr(util.SUBDEBUG) 
pool=mp.Pool(1) 
print list(pool.imap(abs, range(3))) 

其產生

[DEBUG/MainProcess] created semlock with handle 3078684672 
[DEBUG/MainProcess] created semlock with handle 3078680576 
[DEBUG/MainProcess] created semlock with handle 3078676480 
[DEBUG/MainProcess] created semlock with handle 3078672384 
[INFO/PoolWorker-1] child process calling self.run() 
[DEBUG/MainProcess] doing set_length() 
[0, 1, 2] 
[INFO/MainProcess] process shutting down 
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0 
[SUBDEBUG/MainProcess] calling <Finalize object, callback=_terminate_pool, args=(<Queue.Queue instance at 0xb763e60c>, <multiprocessing.queues.SimpleQueue object at 0xb76c94ac>, <multiprocessing.queues.SimpleQueue object at 0xb763e3ec>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1218274448)>, <Thread(Thread-2, started daemon -1226667152)>, {}), exitprority=15> 
... 
[DEBUG/MainProcess] finalizing pool