2013-05-31 27 views
1

我想使用Java實現各種發佈者/訂閱者模式,並且當前用完了想法。發佈者/訂閱者模式的並行實現

有1個發佈者和N個訂戶,發佈者發佈對象,那麼每個訂閱者需要按照正確的順序處理每個對象一次和一次。發佈者和每個訂閱者在他們自己的線程中運行。

在我最初的實現中,每個訂閱者都有自己的阻塞隊列,發佈者將對象放入每個訂閱者的隊列中。這可以正常工作,但如果用戶的任何隊列已滿,發佈者將被阻止。這會導致性能下降,因爲每個用戶在處理對象時需要不同的時間。

然後在另一個實現中,發佈者將對象保存在它自己的隊列中。與對象一起,AtomicInteger計數器與其中的訂戶數量相關聯。每個用戶隨後偷看隊列並減少計數器,並在計數器達到零時將其從隊列中刪除。

以這種方式,發佈者沒有阻塞,但是現在訂閱者需要等待對方處理對象,在下一個對象被窺視之前從隊列中刪除對象。

有沒有更好的方法來做到這一點?我認爲這應該是一個相當普遍的模式。

+0

我想知道如果阿卡,其異步消息傳遞設施,可以幫助在這種情況下? – Wudong

回答

0

您的「許多隊列」實施是要走的路。我不認爲你需要關注一個阻止製片人的完整隊列,因爲完成時間不會受到影響。假設您有三個消費者,兩個消費者以每秒1次的速度消耗,第三個消費者以每5秒1的速率消耗,同時生產者以每兩秒鐘1的速率消耗。最後,第三個隊列將被填滿,所以製片人會阻止它,並且也會停止將項目放入第一個和第二個隊列中。有解決方法,但他們不會改變這樣的事實,第三個消費者將總是成爲瓶頸。如果您生產/消費100件商品,那麼由於第三名消費者(5秒100件商品),這將至少需要500秒,即使第一位和第二位消費者在200秒後完成,情況也會如此(因爲你已經做了一些巧妙的事情來讓生產者在第三個隊列滿了之後繼續填充他們的隊列)或者他們在500秒之後完成(因爲生產者在第三個隊列中被阻塞)。

+0

雖然它取決於用例 - 也許subsumer 3可以默默放棄或放棄物品。也許重要的指標是發佈和消費之間的平均時間。但我同意這從純粹的吞吐量角度來看似乎是最好的方法。 – selig

+0

感謝您的答案。但是,我的問題是,從長遠來看,所有的消費者都會在相同的時間內處理所有的對象。然而,每個消費者花費不同的時間處理單個對象。不應該有一個消費者阻止所有其他人的工作。 – Wudong

+0

@Wudong在這種情況下,您需要使用無限容量的阻塞隊列,即「LinkedBlockingQueues」 - 如果您的隊列中有一個隊列已滿,則無法繼續生成製作者。 (您可以將多餘的項目存儲在其他地方,但這只是使用無限容量的阻塞隊列的更復雜版本。) –

0

肯定

每個用戶都有自己的阻塞隊列,出版商將對象放入每個用戶的queue.`

的,這是要走的路。 您可以使用線程方式將其放入隊列...因此,如果一個隊列滿了,發佈者不會等待。例如,

s1 s2 s3是用戶,addToQueue是每個用戶中增加到相應隊列中的方法。 addQueue方法是等到隊列非空..所以致電addQueue將是一個阻止呼叫ideally synchronised code ...

然後在出版商,你可以做同樣的事情到下面的代碼

注:代碼可能不處於工作狀態,因爲它是..但應該給你的想法。

List<subscriber> slist;// Assume its initialised 
public void publish(final String message){ 

    for (final subscriber s: slist){ 


      Thread t=new Thread(new Runnable(){ 
      public void run(){ 
       s.addToQueue(message); 
      } 
      }); 

     t.start(); 
    } 

} 
+0

這是一個有趣的想法,但它會執行嗎?消息發送的每個線程似乎有點太多。 – Wudong

+0

@Wudong我從來沒有嘗試過......你可以嘗試讓我們知道。但我認爲它應該做的訣竅..也注意到:線程將盡快消息隊列中添加隊列..所以只要排隊插入不充分,將不會有很多線程等待.... –

0

有1個出版商和N訂戶,發佈者發佈對象然後每個用戶需要以正確的順序一次且僅一次處理的對象的每個。發佈者和每個訂閱者在他們自己的線程中運行。

我會改變這種架構。我最初考慮每個用戶的隊列,但我不喜歡這種機制。例如,如果第一個用戶需要更長的時間才能運行,那麼所有的工作都將在該隊列中結束,並且您只能完成1個工作。

由於您必須按順序運行訂閱服務器,因此我需要一個線程池來運行通過所有訂戶的每條消息。對用戶的呼叫需要是可重入的,這可能是不可能的。

所以,你將有10個線程(比方說),並從發佈的隊列中每個人取出一個游泳池,並做類似如下:

public void run() { 
    while (!shutdown && !Thread.currentThread().isInterrupted()) { 
     Article article = publisherQueue.take(); 
     for (Subscriber subscriber : subscriberList) { 
      subscriber.process(article); 
     } 
    } 
} 
+0

我該怎麼保證每個訂閱者在此實現中處理「文章」的順序?如果每篇文章都是由一個線程處理的,那麼它們很可能會按照它們生成的不同順序進行處理。 – Wudong

+0

哇。我沒有意識到你的帖子必須以相同的順序通過所有訂閱者。訂戶的訂單是否完全重要,還是僅僅是文章的最終訂單? – Gray

+0

這只是「物品」事宜的順序,訂戶的順序無關緊要。 – Wudong