2017-04-23 47 views
15

我讀出OSX上一個攝像頭,它正常工作與這個簡單的腳本:如何在OSX的獨立進程中讀取攝像頭?

import cv2 
camera = cv2.VideoCapture(0) 

while True: 
    try: 
     (grabbed, frame) = camera.read() # grab the current frame 
     frame = cv2.resize(frame, (640, 480)) # resize the frame 
     cv2.imshow("Frame", frame) # show the frame to our screen 
     cv2.waitKey(1) # Display it at least one ms before going to the next frame 
    except KeyboardInterrupt: 
     # cleanup the camera and close any open windows 
     camera.release() 
     cv2.destroyAllWindows() 
     print "\n\nBye bye\n" 
     break 

我現在想讀在一個單獨的進程的視頻,我有腳本這是一個很長時間和正確地讀出在Linux上的一個單獨的進程中視頻:

import numpy as np 
import time 
import ctypes 
import argparse 

from multiprocessing import Array, Value, Process 
import cv2 


class VideoCapture: 
    """ 
    Class that handles video capture from device or video file 
    """ 
    def __init__(self, device=0, delay=0.): 
     """ 
     :param device: device index or video filename 
     :param delay: delay between frame captures in seconds(floating point is allowed) 
     """ 
     self._cap = cv2.VideoCapture(device) 
     self._delay = delay 

    def _proper_frame(self, delay=None): 
     """ 
     :param delay: delay between frames capture(in seconds) 
     :param finished: synchronized wrapper for int(see multiprocessing.Value) 
     :return: frame 
     """ 
     snapshot = None 
     correct_img = False 
     fail_counter = -1 
     while not correct_img: 
      # Capture the frame 
      correct_img, snapshot = self._cap.read() 
      fail_counter += 1 
      # Raise exception if there's no output from the device 
      if fail_counter > 10: 
       raise Exception("Capture: exceeded number of tries to capture the frame.") 
      # Delay before we get a new frame 
      time.sleep(delay) 
     return snapshot 

    def get_size(self): 
     """ 
     :return: size of the captured image 
     """ 
     return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))), 
       int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3) 

    def get_stream_function(self): 
     """ 
     Returns stream_function object function 
     """ 

     def stream_function(image, finished): 
      """ 
      Function keeps capturing frames until finished = 1 
      :param image: shared numpy array for multiprocessing(see multiprocessing.Array) 
      :param finished: synchronized wrapper for int(see multiprocessing.Value) 
      :return: nothing 
      """ 
      # Incorrect input array 
      if image.shape != self.get_size(): 
       raise Exception("Capture: improper size of the input image") 
      print("Capture: start streaming") 
      # Capture frame until we get finished flag set to True 
      while not finished.value: 
       image[:, :, :] = self._proper_frame(self._delay) 
      # Release the device 
      self.release() 

     return stream_function 

    def release(self): 
     self._cap.release() 


def main(): 
    # Add program arguments 
    parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter) 
    parser.add_argument('-output', dest="output", default="output.avi", help='name of the output video file') 
    parser.add_argument('-log', dest="log", default="frames.log", help='name of the log file') 
    parser.add_argument('-fps', dest="fps", default=25., help='frames per second value') 

    # Read the arguments if any 
    result = parser.parse_args() 
    fps = float(result.fps) 
    output = result.output 
    log = result.log 

    # Initialize VideoCapture object and auxilary objects 
    cap = VideoCapture() 
    shape = cap.get_size() 
    stream = cap.get_stream_function() 

    # Define shared variables(which are synchronised so race condition is excluded) 
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2]) 
    frame = np.ctypeslib.as_array(shared_array_base.get_obj()) 
    frame = frame.reshape(shape[0], shape[1], shape[2]) 
    finished = Value('i', 0) 

    # Start processes which run in parallel 
    video_process = Process(target=stream, args=(frame, finished)) 
    video_process.start() # Launch capture process 

    # Sleep for some time to allow videocapture start working first 
    time.sleep(2) 

    # Termination function 
    def terminate(): 
     print("Main: termination") 
     finished.value = True 
     # Wait for all processes to finish 
     time.sleep(1) 
     # Terminate working processes 
     video_process.terminate() 

    # The capturing works until keyboard interrupt is pressed. 
    while True: 
     try: 
      # Display the resulting frame 
      cv2.imshow('frame', frame) 
      cv2.waitKey(1) # Display it at least one ms before going to the next frame 
      time.sleep(0.1) 

     except KeyboardInterrupt: 
      cv2.destroyAllWindows() 
      terminate() 
      break 

if __name__ == '__main__': 
    main() 

這工作在Linux上很好,但在OSX我有麻煩,因爲它似乎無法做到創建cv2.VideoCapture(device)對象的.read()(存儲在var self._cap)。

經過一番搜索後,我發現this SO answer,它建議使用Billiard,這是pythons多處理的替代品,據推測它有一些非常有用的改進。因此,在該文件的頂部,我只是我以前多進口後增加進口(有效覆蓋multiprocessing.Process):

from billiard import Process, forking_enable 

,只是在video_process變量的實例之前,我用forking_enable如下:

forking_enable(0) # Supposedly this is all I need for billiard to do it's magic 
video_process = Process(target=stream, args=(frame, finished)) 

所以在這個版本(here on pastebin)然後,我又跑了文件,這給了我這個錯誤:

pickle.PicklingError: Can't pickle : it's not found as main.stream_function

搜索該錯誤導致我到an SO question with a long list of answers其中一個給了我建議使用dill serialization lib來解決這個問題。但是,該庫應該與Pathos multiprocessing fork一起使用。所以,我只是試圖改變我的多進口線從

from multiprocessing import Array, Value, Process 

from pathos.multiprocessing import Array, Value, Process 

但這些都不ArrayValueProcess似乎在pathos.multiprocessing包存在。

從這一點我完全失去了。我正在尋找那些我幾乎沒有足夠知識的東西,而且我甚至不知道我需要再去搜索或調試的方向。

那麼比我更光明的靈魂能幫助我在單獨的過程中捕捉視頻嗎?歡迎所有提示!

+0

嘗試'mp4v'作爲你的fourcc。 –

+0

@MarkSetchell - 但在多處理代碼中,我甚至沒有試圖寫視頻,因爲我甚至無法從網絡攝像頭讀出它。問題是閱讀,而不是寫作。我還會刪除最初腳本中的文字,以免混淆人物。你有什麼想法在多處理代碼中讀取網絡攝像頭有什麼問題嗎? – kramer65

+0

我是'pathos'和'dill'作者。你可能想嘗試'multiprocess',它是'pathos'下的庫,但是與'multiprocessing'具有完全相同的接口。你會發現'Array','Value'和'Process'對象。 –

回答

4

multiprocessing的主要挑戰是理解分離內存地址空間情況下的內存模型。

Python使事情變得更加混亂,因爲它抽象了很多這些方面隱藏幾個無辜的看API的幾個機制。

當你寫這樣的邏輯:

# Initialize VideoCapture object and auxilary objects 
cap = VideoCapture() 
shape = cap.get_size() 
stream = cap.get_stream_function() 

... 

# Start processes which run in parallel 
video_process = Process(target=stream, args=(frame, finished)) 
video_process.start() # Launch capture process 

您傳遞給Processstream_function這是指VideoCapture類(self.get_size),但是,更重要的是,這是不是可以作爲top level function的內部組件。

子進程將無法重新構造所需的對象,因爲它接收它只是一個函數的名稱。它試圖尋找它的主模塊因此消息:

pickle.PicklingError: Can't pickle : it's not found as main.stream_function

子進程正在試圖解決的主要模塊的功能main.stream_function和查找失敗。

我的第一個建議是改變你的邏輯,以便你傳遞給子進程返回stream_function的方法。

video_process = Process(target=cap.get_stream_function, args=(...)) 

然而,你仍然可能遇到的問題,你是兩個進程之間sharing state

當我們在Python中使用多處理範例時,我通常建議人們將流程看作是在分離的機器上運行。在這些情況下,顯然你的架構是有問題的。

我建議你分開兩個過程的責任,確保一個過程(孩子)正在處理視頻的整個捕捉,另一個過程(父母或第三個過程)正在處理處理的框架。

這種範例被稱爲Producer and Consumer Problem,它非常適合您的系統。視頻捕捉過程將是生產者,另一個是消費者。簡單的multiprocessing.Pipemultiprocessing.Queue可確保幀在製作完成後立即從製作人轉移到用戶。

在僞代碼中添加示例,因爲我不知道視頻捕獲API。重點是處理生產者過程中的整個視頻捕捉邏輯,從消費者中抽象出來。只有消費者需要知道的是它通過管道接收一個框架對象。

def capture_video(writer): 
    """This runs in the producer process.""" 
    # The VideoCapture class wraps the video acquisition logic 
    cap = VideoCapture() 

    while True: 
     frame = cap.get_next_frame() # the method returns the next frame 
     writer.send(frame) # send the new frame to the consumer process 

def main(): 
    reader, writer = multiprocessing.Pipe(False) 

    # producer process 
    video_process = Process(target=capture_video, args=[writer]) 
    video_process.start() # Launch capture process 

    while True: 
     try: 
      frame = reader.recv() # receive next frame from the producer 
      process_frame(frame) 
     except KeyboardInterrupt: 
      video_process.terminate() 
      break 

注意過程之間沒有共享狀態(不需要共享任何數組)。溝通通過管道,是單向的,使得邏輯非常簡單。正如我上面所說,這個邏輯也可以在不同的機器上工作。你只需要用套接字替換管道。

您可能需要一個更清潔的生產者進程終止方法。我建議你使用multiprocessing.Event。只需從KeyboardInterrupt塊中的父項中進行設置,並在每次迭代中檢查其在子項中的狀態(while not event.is_set())。

+0

編輯了一下希望聽起來更清楚的答案。 – noxdafox

+0

感謝您的回答。但爲什麼這在Linux上完美運行呢? – kramer65

+1

[Apple fork manpage](https://developer.apple.com/legacy/library/documentation/Darwin/Reference/ManPages/man2/fork.2.html),看看CAVEATS章節。不同的操作系統如何實現流程創建和管理存在覈心差異。因此,如果可移植性是應用程序的一個重要方面,那麼儘可能地堅持已知的設計模式並保持簡單至關重要。 – noxdafox

4

您的第一個問題是您無法訪問forked進程中的攝像頭。當外部庫與fork一起使用時會出現一些問題,因爲fork操作不會清除父進程打開的所有文件描述符,從而導致出現奇怪的行爲。在Linux上,這種類型的問題通常更加健壯,但在2進程之間共享IO對象(如cv2.VideoCapture)並不是一個好主意。

當您使用billard.forking_enabled並將其設置爲False,你問的圖書館不使用fork他們關閉所有的文件描述符,但也比較慢,開始產生新的過程,但spawnforkserver方法,即清潔,這在你的情況下不應該是一個問題。如果您使用的是python3.4+,則可以使用multiprocessing.set_start_method('forkserver')來實現。

當您使用這些方法之一時,目標函數和參數需要序列化以傳遞給子進程。序列化默認情況下使用pickle,它有幾個流程,如您所提到的那樣,不能序列化本地定義的對象,也可以使用cv2.VideoCapture。但是你可以簡化你的程序來爲你的picklelisable提供所有的參數。這是暫時的解決你的問題:

import numpy as np 
import time 
import ctypes 

from multiprocessing import set_start_method 
from multiprocessing import Process, Array, Value 
import cv2 


class VideoCapture: 
    """ 
    Class that handles video capture from device or video file 
    """ 
    def __init__(self, device=0, delay=0.): 
     """ 
     :param device: device index or video filename 
     :param delay: delay between frame captures in seconds(float allowed) 
     """ 
     self._delay = delay 
     self._device = device 
     self._cap = cv2.VideoCapture(device) 
     assert self._cap.isOpened() 

    def __getstate__(self): 
     self._cap.release() 
     return (self._delay, self._device) 

    def __setstate__(self, state): 
     self._delay, self._device = state 
     self._cap = cv2.VideoCapture(self._device) 
     assert self._cap.grab(), "The child could not grab the video capture" 

    def _proper_frame(self, delay=None): 
     """ 
     :param delay: delay between frames capture(in seconds) 
     :param finished: synchronized wrapper for int 
     :return: frame 
     """ 
     snapshot = None 
     correct_img = False 
     fail_counter = -1 
     while not correct_img: 
      # Capture the frame 
      correct_img, snapshot = self._cap.read() 
      fail_counter += 1 
      # Raise exception if there's no output from the device 
      if fail_counter > 10: 
       raise Exception("Capture: exceeded number of tries to capture " 
           "the frame.") 
      # Delay before we get a new frame 
      time.sleep(delay) 
     return snapshot 

    def get_size(self): 
     """ 
     :return: size of the captured image 
     """ 
     return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))), 
       int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3) 

    def release(self): 
     self._cap.release() 


def stream(capturer, image, finished): 
    """ 
    Function keeps capturing frames until finished = 1 
    :param image: shared numpy array for multiprocessing 
    :param finished: synchronized wrapper for int 
    :return: nothing 
    """ 
    shape = capturer.get_size() 

    # Define shared variables 
    frame = np.ctypeslib.as_array(image.get_obj()) 
    frame = frame.reshape(shape[0], shape[1], shape[2]) 

    # Incorrect input array 
    if frame.shape != capturer.get_size(): 
     raise Exception("Capture: improper size of the input image") 
    print("Capture: start streaming") 
    # Capture frame until we get finished flag set to True 
    while not finished.value: 
     frame[:, :, :] = capturer._proper_frame(capturer._delay) 

    # Release the device 
    capturer.release() 


def main(): 

    # Initialize VideoCapture object and auxilary objects 
    cap = VideoCapture() 
    shape = cap.get_size() 

    # Define shared variables 
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2]) 
    frame = np.ctypeslib.as_array(shared_array_base.get_obj()) 
    frame = frame.reshape(shape[0], shape[1], shape[2]) 
    finished = Value('i', 0) 

    # Start processes which run in parallel 
    video_process = Process(target=stream, 
          args=(cap, shared_array_base, finished)) 
    video_process.start() # Launch capture process 

    # Sleep for some time to allow videocapture start working first 
    time.sleep(2) 

    # Termination function 
    def terminate(): 
     print("Main: termination") 
     finished.value = True 
     # Wait for all processes to finish 
     time.sleep(1) 
     # Terminate working processes 
     video_process.join() 

    # The capturing works until keyboard interrupt is pressed. 
    while True: 
     try: 
      # Display the resulting frame 
      cv2.imshow('frame', frame) 
      # Display it at least one ms before going to the next frame 
      time.sleep(0.1) 
      cv2.waitKey(1) 

     except KeyboardInterrupt: 
      cv2.destroyAllWindows() 
      terminate() 
      break 


if __name__ == '__main__': 
    set_start_method("spawn") 
    main() 

我不能在此刻測試它在Mac上,因此可能無法正常工作開箱即用,但不應該有multiprocessing相關的錯誤。一些注意事項:

  • 我實例在新的子cv2.VideoCapture對象和搶相機,因爲只有一個進程應該從相機讀取。
  • 也許在第一個程序fork的問題只是由於共享cv2.VideoCapture,並在stream函數中重新創建它可以解決您的問題。
  • 你不能將numpy包裝傳遞給孩子,因爲它不會共享mp.Array緩衝區(這真的很奇怪,我花了一段時間才弄清楚)。您需要明確傳遞Array並重新創建一個包裝器。
  • 也許你的第一個程序fork的問題只是由於共享cv2.VideoCapture,並在stream函數重新創建它可以解決您的問題。

  • 我以爲你是在python3.4+運行你的代碼,所以我沒有用billard但使用的forking_enabled(False)代替set_start_method應該是那種類似。

讓我知道這個工作!

+0

嗨托馬斯,謝謝你的回答。我實際上使用的是Python 2.7,因此我嘗試通過使用'forking_enable(0)'來調整代碼,但是接着發生錯誤,說'RuntimeError:SynchronizedArray對象只能通過繼承在進程之間共享。任何想法如何解決? – kramer65

+0

這真的很奇怪,因爲這段代碼通過繼承共享'SynchronizedArray'(當你使用'Process'參數傳遞它時)。你可以嘗試用billard中的'set_start_method',使用'spawn'。如果這不起作用,你可以試試這個庫['loky'](https://github.com/tomMoral/loky),它爲python2.7提交'spawn'(免責聲明,我是這個庫的維護者) –

+0

經過一番思考:這是正常的。由於'forking_enable',有一些奇怪的行爲。您應該使用'set_start_method'(存在於'billard'中)獲得更好的結果,因爲它將使用與您的'Process'兼容的'Array'的實現。這是由於上下文管理。當你爲'Array'使用vanilla導入時,你不能控制你正在使用的實現。你也可以創建一個上下文'ctx = get_context('spawn')',然後使用'ctx.Array','ctx.Value'和'ctx.Process'來確保你有兼容的對象。 –