2010-08-04 113 views
9

我有一些線程在等待某個事件,執行一些操作,然後再次等待該事件。另一個線程會在適當時觸發事件。Python threading.Event() - 確保所有等待的線程在event.set()上醒來

我找不出一種方法來確保每個等待的線程在設置事件時只觸發一次。我目前有觸發線程設置它,睡一會兒,然後清除它。不幸的是,這會導致等待的線程多次抓取設置的事件,或者根本沒有。

我不能簡單地讓觸發線程產生響應線程來運行它們一次,因爲它們是對來自其他地方的請求的響應。

簡而言之:在Python中,如何讓一個線程設置一個事件,並確保每個等待線程在事件清除之前只對其執行一次事件?

更新:

我已經嘗試設置它使用鎖和一個隊列,但它不工作。下面是我有:

# Globals - used to synch threads 
waitingOnEvent = Queue.Queue 
MainEvent = threading.Event() 
MainEvent.clear() # Not sure this is necessary, but figured I'd be safe 
mainLock = threading.Lock() 

def waitCall(): 
    mainLock.acquire() 
    waitingOnEvent.put("waiting") 
    mainLock.release() 
    MainEvent.wait() 
    waitingOnEvent.get(False) 
    waitingOnEvent.task_done() 
    #do stuff 
    return 

def triggerCall(): 
    mainLock.acquire() 
    itemsinq = waitingOnEvent.qsize() 
    MainEvent.set() 
    waitingOnEvent.join() 
    MainEvent.clear() 
    mainLock.release() 
    return 

第一次,itemsinq正確反映了許多呼叫的方式等待,但只有前等待的線程進行調用將使它通過。從那時起,itemsinq總是1,等待的線程輪流;每次發生觸發呼叫時,下一次都會經過。

更新2 看來好像只有event.wait()的一個線程被喚醒 ,然而在queue.join()工作。這向我建議,一個等待線程喚醒,從隊列中抓取並調用task_done(),並且單個get()/ task_done()以某種方式清空隊列並允許join()。觸發線程然後完成join(),清除事件,從而阻止其他等待線程通過。但是,爲什麼只有一次get/task_done調用後,隊列纔會被註冊爲空/完成?

即使我註釋掉queue.get()和queue.task_done()並掛起觸發器,以至於無法清除事件,似乎只有一個人正在醒來。

+0

這聽起來像是一個可能的設計缺陷,可能有更好的方法去實現你想要完成的任務。你能發佈關於線程應該做什麼的更多細節嗎? – 2010-08-04 21:36:29

回答

8

你不需要一個事件,並且你不需要一個鎖和一個隊列。所有你需要的是一個隊列。

致電queue.put可以放入消息而無需等待消息發送或處理。在工作者線程中調用queue.get以等待消息到達。

import threading 
import Queue 

active_queues = [] 

class Worker(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 
     self.mailbox = Queue.Queue() 
     active_queues.append(self.mailbox) 

    def run(self): 
     while True: 
      data = self.mailbox.get() 
      if data == 'shutdown': 
       print self, 'shutting down' 
       return 
      print self, 'received a message:', data 

    def stop(self): 
     active_queues.remove(self.mailbox) 
     self.mailbox.put("shutdown") 
     self.join() 


def broadcast_event(data): 
    for q in active_queues: 
     q.put(data) 

t1 = Worker() 
t2 = Worker() 
t1.start() 
t2.start() 
broadcast_event("first event") 
broadcast_event("second event") 
broadcast_event("shutdown") 

t1.stop() 
t2.stop() 

消息不一定是字符串;他們可以是任何Python對象。

+0

這假定有一個靜態數量的等待線程在開始時都定義了,但情況並非如此。等待的線程由請求生成,因此該數字是動態的並且是未知的。該鎖旨在保持額外的等待線程添加他們自己,查看事件集,刪除他們自己,並經歷 - 特別是因爲這可以在理論上防止隊列排空。 – culhantr 2010-08-10 16:15:12

+0

@culhantr好的,我重寫了這個例子。 – 2010-08-10 16:29:55

1

我不是一個python程序員,但是如果一個事件只能被處理一次,也許你需要用適當的鎖定來切換到一個消息隊列,以便當一個線程被喚醒並接收到事件消息時它會處理它並將其從隊列中刪除,以便在其他線程喚醒並查看隊列時不在那裏。

2

我過去使用的一種解決方案是用於線程間通信的Queue類。這是線程安全的,可用於在使用多處理庫和線程庫時易於在線程之間進行通信。您可以讓子線程等待某些內容進入隊列,然後處理新條目。 Queue類也有一個get()方法,它使用了一個方便的阻塞參數。

3

如果你想要離散的原子事件,可以由每個線程按順序處理,然後做krs1 & bot403建議並使用一個隊列。 Python Queue類是同步的 - 您不必擔心鎖定使用它。

但是,如果您的需求更簡單(事件告訴您有數據可供讀取等),您可以訂閱/註冊您的線程作爲負責觸發事件的對象的觀察者。該對象將維護觀察者對象列表threading.Event。在觸發器上,它可以在列表中的所有threading.Event對象上調用set()。