2014-10-16 80 views
0

我已經實現了一個Python套接字服務器。它將來自多臺攝像機的圖像數據發送到客戶端。我請求處理程序類的樣子:在Python中多次並行運行類方法

class RequestHandler(SocketServer.BaseRequestHandler): 
    def handle(self): 
     while True: 
      data = self.request.recv(1024) 
      if data.endswith('0000000050'): # client requests data 

       for camera_id, camera_path in _video_devices.iteritems(): 
        message = self.create_image_transfer_message(camera_id, camera_path) 
        self.request.sendto(message, self.client_address) 

    def create_image_transfer_message(self, camera_id, camera_path): 
     # somecode ... 

我不得不堅持,因爲客戶端的服務器socket。它可以工作,但問題在於它可以順序工作,所以上傳的照相機圖像之間存在很大的延遲。我想創建傳輸消息並行通話之間的一個小的延遲。

我試圖用pool classmultiprocessing

import multiprocessing 

class RequestHandler(SocketServer.BaseRequestHandler): 
    def handle(self): 

    ... 

    pool = multiprocessing.Pool(processes=4) 
    messages = [pool.apply(self.create_image_transfer_message, args=(camera_id, camera_path)) for camId, camPath in _video_devices.iteritems()] 

但這拋出:

PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

我想知道如果有另一種方式來創建一個並行的傳輸消息定義的通話間延遲?

編輯:

我使用來自多個相機的數據創建響應消息。問題是,如果我運行圖像抓取程序太靠近彼此,我會得到圖像僞像,因爲USB總線過載。我發現,以0.2秒的時間順序調用圖像抓取將解決問題。攝像頭在圖像抓取功能運行的整個過程中都不會發送數據,因此延遲的並行校準會生成良好的圖像,並且兩者之間只有很小的延遲。

+0

你已經遇到了Python的multiproces唱衰弱。 Python進程通過酸洗對象並通過管道來回發送(幕後)。 'pickle'對它可以處理的內容有限制(https://docs.python.org/2/library/pickle.html#what-c​​an-be-pickled-and-unpickled)。在你的情況下,你有一個函數或類在模塊級別沒有定義的地方,它正在扼殺pickler。 – 2014-10-17 13:52:09

回答

1

我認爲你已經走上了正確的道路,沒有必要丟掉你的工作。

這裏有一個答案how to use a class method with multiprocessing我通過谷歌找到搜索「multiprocessing class method

from multiprocessing import Pool 
import time 

pool = Pool(processes=2) 

def unwrap_self_f(arg, **kwarg): 
    return C.create_image_transfer_message(*arg, **kwarg) 

class RequestHandler(SocketServer.BaseRequestHandler): 

    @classmethod 
    def create_image_transfer_message(cls, camera_id, camera_path): 
     # your logic goes here 

    def handle(self): 
     while True: 
      data = self.request.recv(1024) 
      if not data.endswith('0000000050'): # client requests data 
       continue 

      pool.map(unwrap_self_f, 
       (
        (camera_id, camera_path) 
        for camera_id, camera_path in _video_devices.iteritems() 
       ) 
      ) 

注後,如果您想從工人返回值,那麼你就需要探索using a shared resource在這裏看到這樣的回答 - How can I recover the return value of a function passed to multiprocessing.Process?

+0

非常感謝您的提示,我會很樂意檢查出來。我唯一擔心的是,如果這個「解決方法」不是針對「好人」的話。我想知道是否有另一種方法,比將計時器放入方法中。 – karlitos 2014-10-16 16:58:46

+0

如果您不使用該類,只需將該方法移出課程。有一個快速的解決方法。如果你正在使用該類,那麼看看這個現有的答案 - http://stackoverflow.com/a/1816969/6084 – mjallday 2014-10-16 17:16:15

+0

我仍然不確定的時間。我想要實現的是在指定的時間間隔內運行帶有不同參數的類方法。我嘗試了你的代碼,並使用不同的設置(特別是process =參數)進行播放,但我無法獲得令人滿意的結果。該方法立即被稱爲處理時間,然後代碼等待,直到處理完成。我想有這樣的事情:調用一個方法,等待xx時間,調用一個方法而不必等待前一個結果... – karlitos 2014-10-16 20:41:15

0

此代碼爲我做的伎倆:

class RequestHandler(SocketServer.BaseRequestHandler): 
    def handle(self): 
     while True: 
      data = self.request.recv(1024) 
      if data.endswith('0000000050'): # client requests data 

       process_manager = multiprocessing.Manager() 
       messaging_queue = process_manager.Queue() 
       jobs = [] 

       for camId, camPath in _video_devices.iteritems(): 
        p = multiprocessing.Process(target=self.create_image_transfer_message, 
               args=(camera_id, camera_path, messaging_queue)) 
        jobs.append(p) 
        p.start() 
        time.sleep(0.3) 

       # wait for all processes to finish 
       for p in jobs: 
        p.join() 

       while not messaging_queue.empty(): 
        self.request.sendto(messaging_queue.get(), self.client_address)