2011-03-15 53 views
1

我正在開發一個簡單的客戶端 - 服務器應用程序在Python中。我正在使用管理器來設置共享隊列,但我無法弄清楚如何將任意對象從服務器傳遞到客戶端。我懷疑它與manager.register函數有關,但在multiprocessing documentation中沒有很好解釋。唯一的例子是使用隊列,沒有別的。如何使用遠程管理器傳遞python對象?

這裏是我的代碼:

#manager demo.py 
from multiprocessing import Process, Queue, managers 
from multiprocessing.managers import SyncManager 
import time 

class MyObject(): 
    def __init__(self, p, f): 
     self.parameter = p 
     self.processor_function = f 

class MyServer(): 
    def __init__(self, server_info, obj): 
     print '=== Launching Server ... =====' 
     (ip, port, pw) = server_info 
     self.object = obj  #Parameters for task processing 

     #Define queues 
     self._process_queue = Queue()  #Queue of tasks to be processed 
     self._results_queue = Queue()  #Queue of processed tasks to be stored 

     #Set up IS_Manager class and register server functions 
     class IS_Manager(managers.BaseManager): pass 
     IS_Manager.register('get_processQ', callable=self.get_process_queue) 
     IS_Manager.register('get_resultsQ', callable=self.get_results_queue) 
     IS_Manager.register('get_object', callable=self.get_object) 

     #Initialize manager and server 
     self.manager = IS_Manager(address=(ip, port), authkey=pw) 
     self.server = self.manager.get_server() 

     self.server_process = Process(target=self.server.serve_forever) 
     self.server_process.start() 

    def get_process_queue(self): return self._process_queue 
    def get_results_queue(self): return self._results_queue 
    def get_object(self): return self.object 

    def runUntilDone(self, task_list): 
     #Fill the initial queue 
     for t in task_list: 
      self._process_queue.put(t) 

     #Main loop 
     total_tasks = len(task_list) 
     while not self._results_queue.qsize()==total_tasks: 
      time.sleep(.5) 
      print self._process_queue.qsize(), '\t', self._results_queue.qsize() 
      if not self._results_queue.empty(): 
       print '\t', self._results_queue.get() 
      #Do stuff 
      pass 

class MyClient(): 
    def __init__(self, server_info): 
     (ip, port, pw) = server_info 
     print '=== Launching Client ... =====' 

     class IS_Manager(managers.BaseManager): pass 

     IS_Manager.register('get_processQ') 
     IS_Manager.register('get_resultsQ') 
     IS_Manager.register('get_object') 

     #Set up manager, pool 
     print '\tConnecting to server...' 
     manager = IS_Manager(address=(ip, port), authkey=pw) 
     manager.connect() 

     self._process_queue = manager.get_processQ() 
     self._results_queue = manager.get_resultsQ() 
     self.object = manager.get_object() 

     print '\tConnected.' 

    def runUntilDone(self):#, parameters): 
     print 'Starting client main loop...' 

     #Main loop 
     while 1: 
      if self._process_queue.empty(): 
       print 'I\'m bored here!' 
       time.sleep(.5) 
      else: 
       task = self._process_queue.get() 
       print task, '\t', self.object.processor_function(task, self.object.parameter) 

     print 'Client process is quitting. Bye!' 
     self._clients_queue.get() 

和一個簡單的服務器...

from manager_demo import * 

def myProcessor(x, parameter): 
    return x + parameter 

if __name__ == '__main__': 
    my_object = MyObject(100, myProcessor) 
    my_task_list = range(1,20) 
    my_server_info = ('127.0.0.1', 8081, 'my_pw') 

    my_crawl_server = MyServer(my_server_info, my_object) 
    my_crawl_server.runUntilDone(my_task_list) 

和一個簡單的客戶端...

from manager_demo import * 
if __name__ == '__main__': 
    my_server_info = ('127.0.0.1', 8081, 'my_pw') 
    my_client = MyClient(my_server_info) 
    my_client.runUntilDone() 

當我運行這個它崩潰上:

[email protected]:~/Desktop$ python client.py 
=== Launching Client ... ===== 
    Connecting to server... 
    Connected. 
Starting client main loop... 
2 Traceback (most recent call last): 
    File "client.py", line 5, in <module> 
    my_client.runUntilDone() 
    File "/home/erin/Desktop/manager_demo.py", line 84, in runUntilDone 
    print task, '\t', self.object.processor_function(task, self.object.parameter) 
AttributeError: 'AutoProxy[get_object]' object has no attribute 'parameter' 

爲什麼python沒有Queues或processor_function的問題,但窒息了對象參數?謝謝!

回答

2

您遇到此問題,因爲您的MyObject()類中的parameter屬性不可調用。

documentation指出,_exposed_用於指定代表此typeid的方法名稱序列。 在未指定暴露列表的情況下,共享對象的所有「公共方法」都將可訪問。 (這裏的「公共方法」是指具有一個__ 呼叫 __()方法的屬性,其名稱開頭不是「_」。)

所以,你需要手動曝光parameter屬性上MyObject,據推測,作爲一種方法,通過改變你的MyObject()

class MyObject(): 
    def __init__(self, p, f): 
     self._parameter = p 
     self.processor_function = f 

    def parameter(self): 
     return self._parameter 

此外,您還需要將任務更改爲:

self.object.processor_function(task, self.object.parameter()) 

HTH。