2015-11-04 78 views
1

我想獲得一些代碼工作,我可以使用gevent實現登錄到多線程程序。我想要做的是設置自定義日誌記錄處理程序,以將日誌事件放入隊列中,而偵聽器進程則持續監視新的日誌事件以進行適當處理。我在過去使用Multiprocessing完成了這個任務,但從未使用過Gevent。Python Gevent共享隊列(監聽器進程)

我在那裏的程序是在無限循環(監聽進程)被逮住的問題,而不是讓其他線程「做工作」 ......

理想的情況下,工作進程後完成後,我可以將任意值傳遞給偵聽器進程,告訴它打破循環,然後將所有進程連接在一起。這是我到目前爲止有:

import gevent 
from gevent.pool import Pool 
import Queue 
import random 
import time 

def listener(q): 
    while True: 
     if not q.empty(): 
      num = q.get() 
      print "The number is: %s" % num 
      if num <= 100: 
       print q.get() 
      # got passed 101, break out 
      else: 
       break 
     else: 
      continue 
def worker(pid,q): 
    if pid == 0: 
     listener(q) 
    else: 
     gevent.sleep(random.randint(0,2)*0.001) 
     num = random.randint(1,100) 
     q.put(num) 

def main(): 
    q = Queue.Queue() 
    all_threads = [] 
    all_threads = [gevent.spawn(worker, pid,q) for pid in xrange(10)] 
    gevent.wait(all_threads[1:]) 
    q.put(101) 
    gevent.joinall(all_threads) 

if __name__ == '__main__': 
    main() 

正如我所說,該計劃似乎得到掛了後第一個過程,並且不允許其他工人做自己的事情。我也嘗試完全獨立地產生偵聽器進程本身(這實際上是我寧願這樣做),但這似乎並沒有工作,所以我嘗試了這種方式。

任何幫助將不勝感激,感覺就像我可能只是失去了一些明顯的gevent後端的東西。

感謝

回答

1

的第一個問題是,你的聽衆是永不服輸如果隊列最初是空的。你產生的第一個任務是你的聽衆。當它開始時,有一個while True:,q將是空的,所以你去到else分支,它繼續,循環回到while循環的開始,然後q仍然是空的。所以你只需坐在第一個線程中不斷檢查q是空的。

這裏的關鍵是gevent不使用「原生」線程或進程。與「真實」的線程不同,後者可以隨時切換到任何時間(比如操作系統調度程序),gevent使用「greenlet」,這需要您做一些事情來「控制」其他任務。無論gevent認爲會阻止什麼,例如從網絡,磁盤讀取數據,或者使用其中一種阻止gevent操作。

一個粗略的解決方法是在pid == 9而不是0時啓動您的偵聽器。通過最後產生它,q中會有項目,它將進入主if分支。缺點是這不能解決邏輯問題,所以當隊列第一次爲空時,你會再次陷入無限循環。

更正確的解決將是把gevent.sleep()而不是continue。睡眠是一種阻塞操作,所以你的其他任務將有機會運行。沒有參數,它會立即等待,但如果它準備好運行,仍然有機會決定切換到另一個任務。這仍然不是很有效,不過,好像隊列是空的,它會花費大量的時間毫無意義檢查一遍又一遍,並要求儘快,因爲它可以再次運行。睡眠時間超過默認值0會更有效,但會延遲處理您的日誌消息。

但是,您可以利用gevent的許多類型(如Queue)可以以更多Pythonic方式使用,並使您的代碼更簡單,更容易理解以及更高效。

import gevent 
from gevent.queue import Queue 

def listener(q): 
    for msg in q: 
     print "the number is %d" % msg 

def worker(pid,q): 
    gevent.sleep(random.randint(0,2)*0.001) 
    num = random.randint(1,100) 
    q.put(num) 

def main(): 
    q = Queue() 
    listener_task = gevent.spawn(listener, q) 
    worker_tasks = [gevent.spawn(worker, pid, q) for pid in xrange(1, 10)] 
    gevent.wait(worker_tasks) 
    q.put(StopIteration) 
    gevent.join(listener_task) 

這裏,Queue可以如在for循環的迭代操作。只要有消息,它就會得到一個項目,運行循環,然後等待另一個項目。如果沒有物品,它只會阻塞並四處走動,直到下一個到達。但是,由於它阻塞了,gevent將切換到其他任務之一運行,避免示例代碼帶來的無限循環問題。

因爲此版本使用Queue作爲for循環迭代器,所以還會自動生成一個很好的標記值,我們可以將它放入隊列以使偵聽器任務退出。如果一個循環從它的迭代器得到StopIteration,它會完全退出。所以,當我們的for循環這是一個從Q進行讀取從Q得到StopIteration,它退出,然後函數退出,並催生了任務完​​成。