3

(Python 3.4,Linux)。跨進程共享多處理同步原語

我有一個主進程「P」,它分叉了8個進程('C1'到'C8')。我想創建multiprocessing.Barrier,以確保所有8個子進程在某個點處同步。

一切工作正常,如果我定義同步在父進程中原始的,所以,當我用叉子叉子進程是正確繼承:

import multiprocessing as mp 
barrier = mp.Barrier(8) 

def f(): 
    # do something 
    barrier.wait() 
    # do more stuff 

def main(): 
    for i in range(8): 
    p = mp.Process(target = f) 
    p.start() 

if __name__ == '__main__': 
    main() 

但在我的情況,我不知道所需的詳細信息創建Barrier對象,直到子進程啓動後(我不知道我想作爲參數action傳遞的參數)。因此,我想在其中一個子進程中創建Barrier,但我不知道如何使其可用於其他子進程。下面將當然不是工作,因爲在孩子過程中的8個Barrier對象是完全相互獨立的:

import multiprocessing as mp 

def f(): 
    global barrier 
    # do something 
    barrier = mp.Barrier(8) 
    barrier.wait() 
    # do more stuff 

def main(): 
    for i in range(8): 
    p = mp.Process(target = f) 
    p.start() 

if __name__ == '__main__': 
    main() 

我想在其中一個子進程來創建barrier並用它傳遞給他人multiprocessing.Queue(或者如果Queue不接受Barrier對象,則使用multiprocessing.Manager().Barrier)。但是,即使這樣做,我也不知道如何確保只有一個進程實際上是同步原語的(7個副本)到隊列中,而其他的將只有get他們。 (當然,我可以在父進程中創建一個又一個同步原語來完成這個任務,但是我可能會重構我的代碼,畢竟在父進程中創建原始的Barrier。)

+0

*「但是,我可能會重構我的代碼,在父進程中創建原始Barrier。」*這可能是最好的選擇。試圖將父母的'Manager'傳遞給子進程將無法正常工作(當您嘗試使用它的'Proxy'對象時,您會遇到奇怪的異常)。我認爲你唯一的選擇是讓孩子告訴父母使用什麼'action',然後讓父母創建'Barrier',然後將父母的Manager()。Barrier'傳遞給孩子。但是,這需要將「mp.Queue」和「mp.Lock」傳遞給所有的孩子...... – dano 2015-04-02 22:36:33

+0

......這很複雜。重構代碼可能更好,因此您可以在創建子項之前創建屏障。 – dano 2015-04-02 22:37:21

+0

其實......有一種方法可以通過將孩子連接到遠程'管理器'來實現。但它仍然可能最終比它的價值更復雜。 – dano 2015-04-02 22:42:15

回答

1

下面是一個示例通過在一個孩子中創建一個multiprocessing.managers.BaseManager,然後從所有其他孩子連接到該經理,可以做到這一點。請注意,它需要將家長的multiprocessing.Lock傳遞給所有孩子以達到同步的目的,您提到的是您希望避免的。不過,我不確定還有其他選擇。

import multiprocessing as mp 
from multiprocessing.managers import BaseManager 

class MyManager(BaseManager): 
    pass 

def f(lock): 
    # do something 
    with lock: 
     try: 
      MyManager.register('get_barrier') 
      m = MyManager(address=('localhost', 5555), authkey=b'akey') 
      m.connect() 
      b = m.get_barrier() 
      print("Got the barrier from the manager") 
     except OSError as e: 
      # We are the first. Create the manager, register 
      # a mp.Barrier instance with it, and start it up. 
      print("Creating the manager...") 
      b = mp.Barrier(8) 
      MyManager.register('get_barrier', callable=lambda:b) 
      m = MyManager(address=('localhost', 5555), authkey=b'akey') 
      m.start() 
    b.wait() 
    print("Done!") 
    # do more stuff 

def main(): 
    lock = mp.Lock() 
    for i in range(8): 
     p = mp.Process(target=f, args=(lock,)) 
     p.start() 

if __name__ == '__main__': 
    main() 

輸出:

Creating the manager... 
Got the barrier from the manager 
Got the barrier from the manager 
Got the barrier from the manager 
Got the barrier from the manager 
Got the barrier from the manager 
Got the barrier from the manager 
Got the barrier from the manager 
Done! 
Done! 
Done! 
Done! 
Done! 
Done! 
Done! 
Done! 
+0

Thx!我想@BiRico解決方案對於我的具體情況更簡單。您的答案OTOH提供了使用'Manager'將同步對象從一個子進程發送到另一個進程的一般解決方案。我想甚至有可能擺脫'lock'。例如,我可以爲每個新創建的進程傳遞一個額外的標識參數('0',...,'7')。接收'0'的進程可以進行創建,其他進程可以循環嘗試/除非嘗試連接到'Manager'。 – max 2017-05-07 01:29:50

1

是否可以簡單地捕獲進程的ID和manaully打電話給你的行動只是其中之一?像這樣?

import multiprocessing as mp 
barrier = mp.Barrier(8) 

def f(): 
    # create action 
    def action(): 
     print("action was run on process {}.".format(id)) 

    # do something 
    print("Hello from process {}.".format(id)) 
    id = barrier.wait() 
    if id == 0: 
     action() 
    barrier.wait() 

    # Do more stuff 

def main(): 
    for i in range(8): 
    p = mp.Process(target = f) 
    p.start() 

if __name__ == '__main__': 
    main() 
+0

對不起,延遲!我只想要執行的「動作」在執行其中一個子進程時變得已知。您的方法取代了將'Barrier'實例從一個孩子傳遞給所有其他孩子的問題,並且帶來了(更容易)傳遞'action'函數的問題。對於簡單的功能,它將毫不費力地工作。對於更復雜的情況,我可以嘗試進行適當的重構:https://stevenengelhardt.com/2013/01/16/python-multiprocessing-module-and-closures/; http://stackoverflow.com/questions/1816958/cant-pickle-type-in​​stancemethod-when-using-pythons-multiprocessing-pool-ma – max 2017-05-07 01:14:22