我正在開發一個簡單的客戶端 - 服務器應用程序在Python中。我正在使用管理器來設置共享隊列,但我無法弄清楚如何將任意對象從服務器傳遞到客戶端。我懷疑它與manager.register函數有關,但在multiprocessing documentation中沒有很好解釋。唯一的例子是使用隊列,沒有別的。如何使用遠程管理器傳遞python對象?
這裏是我的代碼:
#manager demo.py
from multiprocessing import Process, Queue, managers
from multiprocessing.managers import SyncManager
import time
class MyObject():
def __init__(self, p, f):
self.parameter = p
self.processor_function = f
class MyServer():
def __init__(self, server_info, obj):
print '=== Launching Server ... ====='
(ip, port, pw) = server_info
self.object = obj #Parameters for task processing
#Define queues
self._process_queue = Queue() #Queue of tasks to be processed
self._results_queue = Queue() #Queue of processed tasks to be stored
#Set up IS_Manager class and register server functions
class IS_Manager(managers.BaseManager): pass
IS_Manager.register('get_processQ', callable=self.get_process_queue)
IS_Manager.register('get_resultsQ', callable=self.get_results_queue)
IS_Manager.register('get_object', callable=self.get_object)
#Initialize manager and server
self.manager = IS_Manager(address=(ip, port), authkey=pw)
self.server = self.manager.get_server()
self.server_process = Process(target=self.server.serve_forever)
self.server_process.start()
def get_process_queue(self): return self._process_queue
def get_results_queue(self): return self._results_queue
def get_object(self): return self.object
def runUntilDone(self, task_list):
#Fill the initial queue
for t in task_list:
self._process_queue.put(t)
#Main loop
total_tasks = len(task_list)
while not self._results_queue.qsize()==total_tasks:
time.sleep(.5)
print self._process_queue.qsize(), '\t', self._results_queue.qsize()
if not self._results_queue.empty():
print '\t', self._results_queue.get()
#Do stuff
pass
class MyClient():
def __init__(self, server_info):
(ip, port, pw) = server_info
print '=== Launching Client ... ====='
class IS_Manager(managers.BaseManager): pass
IS_Manager.register('get_processQ')
IS_Manager.register('get_resultsQ')
IS_Manager.register('get_object')
#Set up manager, pool
print '\tConnecting to server...'
manager = IS_Manager(address=(ip, port), authkey=pw)
manager.connect()
self._process_queue = manager.get_processQ()
self._results_queue = manager.get_resultsQ()
self.object = manager.get_object()
print '\tConnected.'
def runUntilDone(self):#, parameters):
print 'Starting client main loop...'
#Main loop
while 1:
if self._process_queue.empty():
print 'I\'m bored here!'
time.sleep(.5)
else:
task = self._process_queue.get()
print task, '\t', self.object.processor_function(task, self.object.parameter)
print 'Client process is quitting. Bye!'
self._clients_queue.get()
和一個簡單的服務器...
from manager_demo import *
def myProcessor(x, parameter):
return x + parameter
if __name__ == '__main__':
my_object = MyObject(100, myProcessor)
my_task_list = range(1,20)
my_server_info = ('127.0.0.1', 8081, 'my_pw')
my_crawl_server = MyServer(my_server_info, my_object)
my_crawl_server.runUntilDone(my_task_list)
和一個簡單的客戶端...
from manager_demo import *
if __name__ == '__main__':
my_server_info = ('127.0.0.1', 8081, 'my_pw')
my_client = MyClient(my_server_info)
my_client.runUntilDone()
當我運行這個它崩潰上:
[email protected]:~/Desktop$ python client.py
=== Launching Client ... =====
Connecting to server...
Connected.
Starting client main loop...
2 Traceback (most recent call last):
File "client.py", line 5, in <module>
my_client.runUntilDone()
File "/home/erin/Desktop/manager_demo.py", line 84, in runUntilDone
print task, '\t', self.object.processor_function(task, self.object.parameter)
AttributeError: 'AutoProxy[get_object]' object has no attribute 'parameter'
爲什麼python沒有Queues或processor_function的問題,但窒息了對象參數?謝謝!