2016-09-17 69 views
2

我有這種場景,我從數千個來源接收事件。每個來源都發送有關其當前狀態的信息。雖然我想處理所有事件,但首先處理每個源的最新事件更爲重要,以便當前視圖處於最新狀態。所以我想使用ConcurrentHashMap,每個源的標識符作爲關鍵字,並且使用LIFO隊列(堆棧)作爲值。然後,我將遍歷Map的鍵,並從每個源的堆棧中彈出一個項目。從多個生產者密鑰公平排隊隊列

我擔心的是,當我遍歷鍵並從每個鍵的隊列中取出項目時,生產者可能會在隊列中發佈新事件,從而潛在地創建併發問題。生產者也可以在地圖上添加新的鍵,並且遍歷MapentrySet似乎是微弱一致的。這不是一個大問題,因爲新項目將在後續迭代中處理。理想情況下,我也可以在entrySet的流上使用一些並行處理來加速此過程。

我想知道是否有一個更清潔的方法。事實上,我可以使用LIFO BlockingDequeue並且首先處理最新的事件,但是這種方法的問題在於有一個來源可能發送比其他來源更多的事件的風險,因此可能比其他來源處理更多的事件。

有沒有其他的數據結構,我可以看看提供這種行爲?基本上我正在尋找的是一種優先處理來自每個來源的事件的方式,同時給消費者處理每個來源的公平機會。

回答

0

您是否想過LIFO隊列的FIFO隊列?每個源添加到它的LIFO隊列中,並且處理您從FIFO隊列中取出第一個LIFO隊列,處理一個事件,然後將其放回到FIFO隊列中。這樣你也應該沒有新來源的問題,因爲他們的LIFO隊列將被簡單地添加到FIFO隊列中。

爲了將事件添加到正確的LIFO隊列中,您可以維護一個額外的HashMap,該HashMap知道每個源的隊列,並且如果發生新的源不在Map中,您必須將其LIFO隊列添加到FIFO隊列。

+0

這很有趣。然而,問題是我們需要能夠將新事件添加到堆棧(LIFO隊列),因爲它們是針對特定源進行的,這意味着我們需要在隊列中找到相應的堆棧。我想我可以通過額外的'ConcurrentHashMap '將源的鍵映射到源的LIFO隊列來解決這個問題。 – jbx

+0

是的,如果源不知道把事件放在哪裏,那麼HashMap應該這樣做。 – Vampire

+0

是的,所以我期望構建的數據結構看起來像一個外部的隊列,只是添加和刪除,但內部它正在做相應優先級的邏輯。我會嘗試將你的建議與hashmap結合起來。 – jbx

0

我建議構建自己的結構來管理它,因爲它爲您的用例增加了靈活性(和速度)。

我會用循環隊列來存儲每個LIFO隊列(堆棧)。循環隊列是將元素添加到尾部的隊列,並且從頭部讀取(但不刪除)隊列。一旦頭=尾巴,你重新開始。

您可以使用一個簡單的數組構建自己的隊列。管理圍繞操作的同步並不難,例如向陣列添加更多隊列 - 並在需要時擴展它。我相信給隊列添加隊列並不是你經常做的事情。

這很容易管理,您可以擴展您的循環隊列以計算訪問條目的頻率,並限制訪問其條目的頻率(通過添加/刪除使用者線程,甚至使它們等待一點在從一個條目管理的堆棧消耗之前)。

在使用多線程從循環隊列中讀取元素時,通過在從堆棧消耗之前調用「註冊」操作,您甚至可以避免線程鎖定:每個線程都有它的ID,當它「註冊」存儲在給定的隊列條目處。在註冊之前和從堆棧彈出之前,線程會執行「讀取註冊ID」操作,並且返回的ID必須與其自己的ID匹配。這意味着只有「擁有」給定隊列條目的線程才能從該堆棧彈出。如果註冊/確認註冊過程失敗,則意味着另一個線程正在消耗該條目,因此當前線程將轉到下一個可用條目。

我在過去曾經使用過這種策略,並且像一個魅力一樣縮放。我希望這對你有意義。