2017-04-13 215 views
0

我剛剛學會了python多處理。我想製作一個模型來模擬在網絡中發送和接收消息的過程。有向圖描述了兩個節點之間的關係,而一個字典描述了兩個節點之間的通信。該字典的值的數據類型是隊列。但是,我遇到了一些錯誤:python在多處理中共享dict()中共享隊列()

from concurrent.futures import ProcessPoolExecutor 
from multiprocessing import Manager 

PoolGroup=[('R1','R2','R3'),('N1','N2','N3'),('E1','E2','E3')] 
PoolElement=['R1','R2','R3','N1','N2','N3','E1','E2','E3'] 
graph={'R1':['N1','N2','N3'], 
    'R2':['N1','N2','N3'], 
    'R3':['N1','N2','N3'], 
    'N1':['E1','E2','E3'], 
    'N2':['E1','E2','E3'], 
    'N3':['E1','E2','E3'], 
    'E1':[], 
    'E2':[], 
    'E3':[]} 

def addSigal(target,information): 
    AllQueue[target].put(information) 
    print("Succeed in sending msg to "+target) 
    print(target+' now has ',AllQueue[target].qsize(),' signals') 


def pool1function(name,information): 
    targetlist=list(graph[name]) 
    print(name+" send information to "+str(targetlist))    
    with ProcessPoolExecutor() as pool1: 
     pool1.map(addSigal,targetlist,[information]*3) 


if __name__=='__main__': 
    m=Manager() 
    AllQueue=m.dict() 
    AllQueue.update({PE:m.Queue() for PE in PoolElement}) 
    with ProcessPoolExecutor() as pool:  
     pool.map(pool1function,PoolGroup[0],[1,2,3]) 

不幸的是,結果剛剛出現:

R1 send information to ['N1', 'N2', 'N3'] 
R2 send information to ['N1', 'N2', 'N3'] 
R3 send information to ['N1', 'N2', 'N3'] 

這意味着信息不被髮送到相應的節點。所以,我檢查AllQueue並發現了一些奇怪的:當我打印AllQueue [「R1」],它表明:

RemoteError: 
--------------------------------------------------------------------------- 
Unserializable message: ('#RETURN', <queue.Queue object at 0x10edd8dd8>) 
--------------------------------------------------------------------------- 

我還沒有把或AllQueue [「R1」]獲得元素,有什麼問題呢?

+0

我不知道'AllQueue'定義。另請編輯您的問題以刪除冗餘元素:是否需要所有進口? – quamrana

+0

我試圖把AllQueue的定義從主要的,或使其成爲全局變量,結果是一樣的...... – Coneain

+0

當你使用'ProcessPoolExecutor'或'multiprocessing.Pool'時,AllQueue似乎超出了範圍。您需要通過'map'調用將'AllQueue'作爲參數傳遞。 – quamrana

回答

0

這是通過字典中的任務的例子:

from concurrent.futures import ProcessPoolExecutor 
from multiprocessing import Manager 


def addSigal(target, information, q): 
    print(target,information,q) 
    q[target]=(information) 
    print("Succeed in sending msg to "+target) 
    print(target+' now has ',q[target]) 


if __name__=='__main__': 
    m = Manager() 
    AllQueue = m.dict() 
    AllQueue.update({'A':0,'B':1}) 
    with ProcessPoolExecutor() as pool: 
     pool.map(addSigal,'AB', [1, 2],[AllQueue,AllQueue]) 
    print(AllQueue) 
+0

是的,你是對的。這適用於一對一,但是多對一的情況如何?一個人收到一個數字序列而不是一個數字,字典的價值只是一個數字,恐怕這還不夠... – Coneain

+0

嗯,我認爲它是由你來工作。我已經證明,主要問題是不同的進程有自己的變量副本,解決方法是發送任何全局通過參數。你只需要繼續這樣做。 – quamrana

+0

好的,就是這一點!我知道了。謝謝! – Coneain