2012-04-14 80 views
8

基本上我的消費者也是生產者。我們得到一個初始數據集並將其發送到隊列中。消費者需要一個項目,對其進行處理,從該點有3種可能性:是否可以確保唯一郵件位於rabbitmq隊列中?

  1. 數據良好,並得到把「好」的隊列存儲
  2. 數據不好,丟棄
  3. 數據不好(尚)或差(尚),所以數據被分解成更小的部分併發送回隊列進行進一步處理。

我的問題是與第3步,因爲隊列的增長速度非常快,在首個可能是一個數據被分解成隊列複製的一部分,多數民衆贊成者和消費者繼續處理,並在結束了一個無限循環。

我認爲防止這種做法是防止重複進入隊列。我不能在客戶端做到這一點,因爲在一個小時之內,我可能會有很多核心處理數十億個數據點(每個客戶端在提交之前掃描它會使我減慢太多)。我認爲這需要在服務器端完成,但正如我所提到的,數據量非常大,我不知道如何有效地確保沒有重複。

我可能會問不可能的事情,但認爲我會給它一個鏡頭。任何想法將不勝感激。

回答

2

的核心問題似乎是這樣的:

"...its possible that a piece of data is broken down into a part that's 
duplicated in the queue and the consumers continue to process it and 
end up in a infinite loop." 

您可以專注於自己的排隊項目所有你想要的獨特性,但上面的問題是,你應該集中你的努力,IMO。防止無限循環的一種方法可能是在消費者重新排列已分解項目之前,由消費者設置消息有效載荷中的「已訪問」位。

另一種選擇是讓消費者重新排隊回到一個特殊的隊列,該隊列的處理方式稍有不同,以防止無限循環。無論哪種方式,您都應該通過將其作爲應用程序策略的核心部分進行處理,而不是使用消息傳遞系統的功能來解決問題。

+0

我試圖做到這些(我認爲)。通過確保沒有過去的項目重複,我確保相同的數據不會被處理多次。我只是在一定的的RabbitMQ的FPGA實現,有沒有辦法簡單地發送消息的ID,並有RabbitMQ的丟棄重複或者我需要設置一個過濾器或東西(如果我這樣做,它是如何與RabbitMQ的工作)。 – 2012-04-14 19:30:20

+0

沒有辦法做到這一點,AFAIK。兔子並不關心你的消息內容或者你的隊列中已經存在的內容,所以應該由你的應用來處理。 – 2012-04-14 19:45:13

+0

所以,如果我的消息的ID是唯一的(我的實際數據的散列碼),我需要發送給兔子之前,將它們存儲在一個數據庫或東西,查詢針對(查看是否MSG ID已經被之前發送)?我一直在想,但它會要求客戶端在我的消息服務器等待時執行一些查詢(我正試圖查看是否可以將此工作推送到消息服務器本身) – 2012-04-14 19:53:08

8

我想,即使你能解決不發送重複的隊列中的問題,你遲早會觸及這個問題:

從RabbitMQ的文檔:「從失敗中恢復:如果一由於客戶端連接到的節點失敗,客戶端與代理斷開連接,如果客戶端是發佈客戶端,則代理可能已經接受並傳遞來自客戶端的消息,而客戶端未收到客戶端的確認信息;同樣在消費方面,客戶可能已經發出了對消息的確認,並且不知道這些確認是否將其提供給經紀人並在失敗之​​前處理髮生。總之,你還需要確保你的消費客戶端可以識別和處理重複的消息。」

基本上,它看起來像這樣,你發送到RabbitMQ的請求,RabbitMQ的一個ACK迴應,但1個原因或者你的消費者或製作者沒有收到這個ACK,Rabbitmq沒有辦法知道沒有收到這個確認,你的製片人最終會重新發送信息,從未收到過確認信。

這是在消息被作爲一種RPC的應用程序尤其是處理重複的消息的痛苦,但它看起來像使用這種通訊架構的時候,這是不可避免的。

相關問題