2

我的要求是類似Multiple producers, single consumer 除非我需要它在Python生產者/消費者多生產者和單個消費者書面文件的Python

我已經創建了一個派生5個併發進程對應用程序(我的利用多重庫)這5個過程獨立地以字典格式生成輸出。

早些時候我打印輸出到控制檯,但現在想輸出到一個文件。

我正在尋找一種模式,我的所有5個生產者都寫入支持併發寫入的共享隊列。

而單個消費者進程也可以訪問此隊列並從中消耗數據,並且可以在生產者完成任務時等待沒有數據寫入和終止。

感謝Anuj

回答

1

,因爲你已經在使用多進程,所有你需要的是Queue類

和樣品(從隊列文檔修改)

from multiprocessing import Process, Queue 

def child(q, url): 
    result = my_process(url) 
    q.put(result) 

if __name__ == '__main__': 
    q = Queue() 
    urls = [...] 
    children = [] 
    for url in urls: 
     p = Process(target=child, args=(q,url)) 
     p.start() 
     children.append(p) 
    for p in children: 
     p.join() 
     print q.get() #or write to file (might not be the answer from this child) 

編輯: 對於每個孩子可多選將最後一個for循環替換爲:

while 0 != multiprocessing.active_children(): 
    print q.get() 
+0

不應該這個childs.append(child)是childs.append(p) –

+0

請讓我知道當我從隊列中獲取對象時的行爲讓我們考慮當隊列很大時的情況,我想連續輪詢結果隊列,當它爲空時等待它。 –

+0

@anuj singh - 編輯,謝謝 –

1

我實現了在Python這種模式在一個主管進程生成一堆的進程,然後消耗從所有這些日誌消息,並寫入這些日誌信息到一個日誌文件。

基本上,我用execve來描述每個進程的stderr被連接到PTY的進程。然後我的主管打開了所有的主人PTY,並用select從循環中讀取他們。 PTY的行是由tty行規定的行緩衝的,你可以在它們上使用readline來進行non = blocking讀取。我相信我也使用了PTY的fcntl來設置os.O_NONBLOCK。

工程很好。唯一的麻煩是,當你從選擇輪詢返回時,你需要每pty讀多條行,否則你可能會失去輸出(假設你有一些收穫子進程並重新啓動)。通過閱讀每個PTY上可用的所有行,您還可以避免跟蹤與其他消息交錯。

如果您確實需要發送對象而不是文本行,那麼您最好使用真正的pub-sub消息系統,如AMQP或ZeroMQ。 AMQP是一個比你需要的更大的錘子,所以如果你希望構建許多類似的應用程序,只檢查一下。否則,請嘗試更簡單的0MQ http://www.zeromq.org/intro:read-the-manual,它只是一個使套接字更易於使用的消息傳遞庫。

+0

您也可以在合併過程中執行一些行緩衝,insertin根據需要將g橫幅放入日誌輸出中,這樣可以保持輸出順暢。 –

+0

感謝Micheal,在我的情況下,我沒有真正的日誌,我的每個進程都會被賦予一個URL並使用selenium webdriver API來瀏覽URL,一旦完成,我將以dict格式從DOM中獲取某些數據。每個進程都會從輸入url生成一個字典,並將其放入隊列中,插入順序並不重要。在寫入時,單個使用者將嘗試從隊列和塊中獲取5個對象(如果不可用)並將寫入文件。如果你也可以包含一些代碼,你的回答很有幫助。 –