我剛剛學會了python多處理。我想製作一個模型來模擬在網絡中發送和接收消息的過程。有向圖描述了兩個節點之間的關係,而一個字典描述了兩個節點之間的通信。該字典的值的數據類型是隊列。但是,我遇到了一些錯誤:python在多處理中共享dict()中共享隊列()
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
PoolGroup=[('R1','R2','R3'),('N1','N2','N3'),('E1','E2','E3')]
PoolElement=['R1','R2','R3','N1','N2','N3','E1','E2','E3']
graph={'R1':['N1','N2','N3'],
'R2':['N1','N2','N3'],
'R3':['N1','N2','N3'],
'N1':['E1','E2','E3'],
'N2':['E1','E2','E3'],
'N3':['E1','E2','E3'],
'E1':[],
'E2':[],
'E3':[]}
def addSigal(target,information):
AllQueue[target].put(information)
print("Succeed in sending msg to "+target)
print(target+' now has ',AllQueue[target].qsize(),' signals')
def pool1function(name,information):
targetlist=list(graph[name])
print(name+" send information to "+str(targetlist))
with ProcessPoolExecutor() as pool1:
pool1.map(addSigal,targetlist,[information]*3)
if __name__=='__main__':
m=Manager()
AllQueue=m.dict()
AllQueue.update({PE:m.Queue() for PE in PoolElement})
with ProcessPoolExecutor() as pool:
pool.map(pool1function,PoolGroup[0],[1,2,3])
不幸的是,結果剛剛出現:
R1 send information to ['N1', 'N2', 'N3']
R2 send information to ['N1', 'N2', 'N3']
R3 send information to ['N1', 'N2', 'N3']
這意味着信息不被髮送到相應的節點。所以,我檢查AllQueue並發現了一些奇怪的:當我打印AllQueue [「R1」],它表明:
RemoteError:
---------------------------------------------------------------------------
Unserializable message: ('#RETURN', <queue.Queue object at 0x10edd8dd8>)
---------------------------------------------------------------------------
我還沒有把或AllQueue [「R1」]獲得元素,有什麼問題呢?
我不知道'AllQueue'定義。另請編輯您的問題以刪除冗餘元素:是否需要所有進口? – quamrana
我試圖把AllQueue的定義從主要的,或使其成爲全局變量,結果是一樣的...... – Coneain
當你使用'ProcessPoolExecutor'或'multiprocessing.Pool'時,AllQueue似乎超出了範圍。您需要通過'map'調用將'AllQueue'作爲參數傳遞。 – quamrana