2013-04-04 184 views
5

這不是很重要,只是一個愚蠢的實驗。我想創建自己的消息傳遞。 我想有一個隊列字典,其中每個鍵是進程的PID。 因爲我想讓流程(由Process()創建)交換將消息插入他們想要發送給它的進程的隊列中(知道它的pid)。 這是一個愚蠢的代碼:python字典之間的進程隊列

from multiprocessing import Process, Manager, Queue 
from os import getpid 
from time import sleep 

def begin(dic, manager, parentQ): 
    parentQ.put(getpid()) 
    dic[getpid()] = manager.Queue() 
    dic[getpid()].put("Something...") 

if __name__== '__main__': 
    manager = Manager() 
    dic = manager.dict() 
    parentQ = Queue() 

    p = Process(target = begin, args=(dic, manager, parentQ)) 
    p.start() 
    son = parentQ.get() 
    print son 
    sleep(2) 
    print dic[son].get() 

dic[getpid()] = manager.Queue(),這工作正常。但是,當我執行 dic[son].put()/get()我得到這個消息:

Process Process-2: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap 
    self.run() 
    File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run 
    self._target(*self._args, **self._kwargs) 
    File "mps.py", line 8, in begin 
    dic[getpid()].put("Something...") 
    File "<string>", line 2, in __getitem__ 
    File "/usr/lib/python2.7/multiprocessing/managers.py", line 773, in _callmethod 
    raise convert_to_error(kind, result) 
RemoteError: 
--------------------------------------------------------------------------- 
Unserializable message: ('#RETURN', <Queue.Queue instance at 0x8a92d0c>) 
--------------------------------------------------------------------------- 

你知道什麼是應該做的正確方法?

回答

1

我相信你的代碼失敗了,因爲隊列不可序列化,就像回溯說的那樣。 multiprocessing.Manager()對象可以爲你創建一個共享的字典,就像你在這裏完成的那樣,但是存儲在字典中的值仍然需要可序列化(或picklable in Pythonese)。如果沒有問題與不具有訪問對方的隊列的子過程,那麼這應該爲你工作:

from multiprocessing import Process, Manager, Queue 
from os import getpid 

number_of_subprocesses_i_want = 5 

def begin(myQ): 
    myQ.put("Something sentimental from your friend, PID {0}".format(getpid())) 
    return 

if __name__== '__main__': 
    queue_dic = {} 
    queue_manager = Manager() 

    process_list = [] 

    for i in xrange(number_of_subprocesses_i_want): 
     child_queue = queue_manager.Queue() 

     p = Process(target = begin, args=(child_queue,)) 
     p.start() 
     queue_dic[p.pid] = child_queue 
     process_list.append(p) 

    for p in process_list: 
     print(queue_dic[p.pid].get()) 
     p.join() 

這給你留下一本字典的鍵是子進程,和值是其各自的隊列,可以從主進程中使用。

我不認爲你的原始目標是可以通過隊列實現的,因爲你希望子進程使用的隊列在創建時必須傳遞給進程,所以當你啓動更多的進程時,你沒有辦法給出現有進程訪問新隊列。

一種可能的方式有進程間通信將讓每個人都共享同一個隊列回傳遞消息給你用某種頭的,比如在一個元組捆綁的主要過程:

(destination_pid, sender_pid, message) 

..並且主要讀取destination_pid並將(sender_pid,消息)指向該子進程的隊列。當然,這意味着當需要與新進程通信時,您需要一種通知現有進程的方法。