2016-09-16 53 views
1

我想實現一個解決方案,使用RabbitMQ來實現像分佈式RPC一樣的使用一個請求和一個響應隊列的大量處理器,我已經實現了這樣的解決方案與Apache Apollo和我本來希望能夠將其遷移到RabbitMQ。這裏的關鍵點:RabbitMQ:使用路由來實現消息選擇

  • 每個服務器的連接請求隊列只
  • 每個服務器流程,應該是他(報頭字段)

我在執行阿波羅請求關鍵點是使用選擇器(比如標題字段值的where子句),我認爲這是通過路由和路由密鑰在RabbitMQ中實現的,但是我一定是錯的,因爲我看到工作人員接收不應該是他們的消息。

爲了複製問題,我修改了路由示例(http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html),我有兩個使用者,我可以從定義routingKey的不同參數開始,併爲一個使用者生成消息。我看到的行爲是消費的消費似乎是隨機的(第一次消費者'約翰'的消息由消費者處理'約翰',第二次消費者處理'瑪麗')

有沒有人有指示或代碼片段在RabbitMQ中使用選擇器?

下面我爲消費者代碼:

public static void Main(String[] args) 
{ 
    var factory = new ConnectionFactory { HostName = "localhost" }; 
    using (var connection = factory.CreateConnection()) 
     using (var channel = connection.CreateModel()) 
     { 
      const String request = "request"; 
      channel.ExchangeDeclare(request, "direct"); 

      channel.QueueDeclare(request, true, false, false, null); 

      if (args.Length < 1) 
      { 
       Console.WriteLine(" Press [enter] to exit."); 
       Console.ReadLine(); 
       Environment.ExitCode = 1; 
       return; 
      } 

      var myRoutingKey = args[0]; 
      channel.QueueBind(request, request, myRoutingKey); 

      Console.WriteLine($" [*] Waiting for messages for {myRoutingKey}."); 

      var consumer = new EventingBasicConsumer(channel); 
      consumer.Received += (model, ea) => 
      { 
       var body = ea.Body; 
       var message = Encoding.UTF8.GetString(body); 
       var routingKey = ea.RoutingKey; 
       Console.WriteLine($" [x] Received '{routingKey}':'{message}'"); 
      }; 
      channel.BasicConsume(request, true, consumer); 

      Console.WriteLine(" Press [enter] to exit."); 
      Console.ReadLine(); 
     } 
} 

和製片人:

public static void Main(String[] args) 
{ 
    var factory = new ConnectionFactory { HostName = "localhost" }; 
    using (var connection = factory.CreateConnection()) 
     using (var channel = connection.CreateModel()) 
     { 
      const String request = "request"; 
      channel.ExchangeDeclare(request, "direct"); 

      channel.QueueDeclare(request, true, false, false, null); 

      var routingKey = args.Length > 0 ? args[0] : "John"; 

      const String message = "Hi"; 
      var body = Encoding.UTF8.GetBytes(message); 
      channel.BasicPublish(request, routingKey, null, body); 
      Console.WriteLine($" [x] Sent '{routingKey}':'{message}'"); 
     } 

    Console.WriteLine(" Press [enter] to exit."); 
    Console.ReadLine(); 
} 

在此先感謝。

回答

0

我可以猜測爲什麼這不適合你。關鍵是你消費者的這兩條線。

channel.QueueDeclare(request, true, false, false, null); 
channel.QueueBind(request, request, myRoutingKey); 

事實上,「請求」是您的所有消費者的隊列名稱。如果您運行該程序使用多個路由鍵設置多個綁定,最終結果是名爲「request」的隊列通過多個路由鍵(例如「John」,「Mary」)綁定到您的交換。請記住,當您執行此綁定時,綁定在RabbitMQ服務器中不是瞬態的,並且它們仍然存在。

現在回到如何解決你的問題。有多種選擇,但這裏是其中之一。首先我建議閱讀RabbitMQ Model

您可以使用具有這些線,而不是您同教程代碼:

var queueName = channel.QueueDeclare().QueueName; 
channel.QueueBind(queueName, request, myRoutingKey); 

但上面的意思是創建一個新的隊列並綁定到你想要的交換你運行你的消費計劃,每次路由密鑰。另一種方法是使用之前做過的相同代碼,但只需選擇適當的隊列名稱,而不是固定的隊列名稱。例如,可以有一個隊列每路由密鑰

var queueName = myRoutingKey ; 
channel.QueueDeclare(queueName, true, false, false, null); 
channel.QueueBind(queueName, request, myRoutingKey); 

或可替代地可以羣若干路由密鑰的成類似於教程樣品的單個隊列。

重點是你需要做的事情不能用一個隊列完成。 (除了在消費時過濾掉消息)。但是這聽起來不像是你真正的要求。你問什麼是每個消費者服務器只處理你可以在這個模型中做的相關消息。生產者只發布到那一個交換(雖然這是你想要的)。

+0

嗨阿敏,謝謝你的回答。我還發現,使用動態生成的隊列不會出現問題。無論如何,我只是在尋求一個只有一個請求和一個響應隊列的解決方案,這是因爲性能方面的問題,以及我之前與其他經紀人實施的解決方案之間的這種解決方案。認識到不同的模型後者不是問題,只需要重新設計,但我肯定需要驗證性能。 – Leon

+0

此外,通過使用動態生成的隊列,我可以釋放生產者發送的所有消息,這些消息是在路由使用者未運行時生成的。我也試圖宣佈交易所持久,但它沒有幫助。 – Leon

+0

動態生成的隊列本質上是臨時隊列,因爲你將失去我假設的隊列名稱?我不認爲動態隊列名稱將會成爲您使用一致名稱的解決方案(將它們視爲您的選擇器)。這樣做沒有性能問題。這就是RabbitMQ的設計目的。 –