0

我正在制定一個過程(比如生產者)需要發送單向消息給可變數量的過程(比如消費者)的需求。事件驅動的發佈訂閱模型

發佈 - 訂閱模式似乎很好,因爲消費者將訂閱來自制作者的消息。我嘗試使用ZeroMQ來實現這一點。

但是,我有幾個問題是:

  1. 消費者不得不爲信息連續輪詢。當有新消息時,我會通知消費者。

  2. 生產者隊列有可能被填滿。我希望生產者能夠根據某些條件從隊列中刪除消息(例如,刪除超過5秒的消息,或刪除已被讀取5次的消息)。

  3. 由於消費者正在輪詢並且消息未從隊列中刪除,因此消費者會看到重複的消息,直到有新消息出現爲止。我希望每消息消息只收到一次消息。

我知道我可能會使用錯誤的模型(發佈 - 訂閱可能不適合)。我曾考慮過使用請求回覆,但這並不奏效,因爲製片人不想跟蹤消費者的數量。

任何人都可以提出一個很好的選擇嗎?

+0

我唯一的建議就是包裹的pub/sub輪詢在一個特殊的偵聽器線程提供通知應用程序的主環(或其他)也忽略重複郵件並儘快卸載收件箱。 – 2012-03-26 17:37:39

+0

我建議您查看OMG的DDS中間件標準。已經有一個很好的開源實現,它被稱爲OpenDDS(請參閱http://www.opendds.org)。它具有豐富的服務質量設置,可讓您控制其行爲。即將到來的3.1版本的OpenDDS將非常好。 – 2012-03-27 12:04:51

+0

我對你關於ZMQ pub/sub的3個問題感到困惑。這一切聽起來像你沒有真正調查過ZMQ做什麼? 1)總是進行某種形式的投票,無論你的客戶是否明確地進行投票,或者你將其附加到某個處理程序,並且庫在底層進行2)ZMQ有一個「高水位」,它將啓動如果消息沒有從緩衝區中排出,則在特定點之後丟棄消息3)消息僅向每個訂戶傳遞一次。我錯過了什麼嗎? – jdi 2012-03-28 01:24:32

回答

0

您是否需要多個生產者?如果不是,您可以使用PUSH/PULL而不是PUB/SUB。

通過PUSH/PULL,您可以擁有儘可能多的消費者(他們是模型的PULL一側)。寫入PUSH端點的所有消息在所有連接的使用者之間以循環方式分佈。這也確保了兩個消費者不會收到相同的消息。

正如你所描述的,讓消費者作爲SUB端點,如果兩個或多個消費者訂閱了相同的「前綴」,則最終可以向多個消費者傳遞相同的消息(假設這將成爲模型中的問題) 」。

假設「前綴」是你傳遞給sock.setsockopt(ZMQ_SUBSCRIBE, "prefix", ...);

+0

我認爲,即使他想要多個生產者,他仍然可以在中間使用這種方法。多個生產者會推到設備上,多個消費者會從中消費。 – jdi 2012-03-28 01:31:31

0

嘗試使用JMS提供者或AMQP提供商的字符串。這些有你正在尋找與主題的一些東西:

  1. 推送通知給訂閱者。

  2. 消息上的生存時間屬性,允許消息被刪除或放入死信隊列中,如果消息在TTL內沒有消耗。

  3. 一次性通知 - 取決於您的配置。

需要注意的是一次性的消息確實在網絡failuer這可能會導致無論是丟失的消息或重複的消息的情況下,邊界條件......你選擇。

提供者使用的條款。 RabbitMQ很受AMQP的歡迎。對於JMS,有許多專有產品或開源實現。

1

我建議去生產者和消費者之間的經紀人推拉模型。

  1. 經紀應該被通知任何新的消息。
  2. 消費者會聽經紀人通知(請表來跟蹤成功/ failure.So複製將重試過程中應避免)
  3. 一旦#2完成,然後消費者可以拉動生產的(源)中的數據併發送ACK經紀人的成功/失敗

希望這有助於

0

DDS(數據分發服務)中間件支持你想正是實現和更容易。

直接回答你的問題:

  1. DDS支持偵聽器機制,你的用戶並不需要連續輪詢。

  2. DDS具有良好的QoS設置以防止發佈者隊列被填滿。您可以使用歷史QoS來說「只保留隊列中最新的10個樣本」,或者您可以使用壽命QoS來說「只保留最近10秒內發佈的樣本」。

  3. 此外,您可以使用DDS偵聽器機制,並且每個新樣本僅通知一次。無需投票。

目前有兩個開源實現。