2017-07-29 104 views
1

下面是一個簡單的.net核心1.1控制檯應用程序。使用-r參數調用它,它讀取rabbitmq隊列中的所有消息,用任何其他參數調用它,並將每個參數作爲消息排隊。.NET Core 1.1接收RabbitMQ消息

下面是問題所在,我可以將消息排入隊列,但所有嘗試讀取消息都不會導致消息被讀取。顯然,我沒有正確使用隊列,並希望得到一些指導。

謝謝!

using System; 
using System.Collections.Generic; 
using System.Text; 
using Newtonsoft.Json; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 

namespace RabbitMqDemo 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var client = new MessagingClient(); 
      if (args.Length == 1 && args[0].ToLower() == "-r") 
      { 
       Console.WriteLine("Reading Messages from Queue."); 
       var messages = client.ReceiveMessages(); 
       Console.WriteLine($"Read {messages.Length} message(s) from queue."); 
       foreach(var msg in messages) 
        Console.WriteLine(msg); 
      } 
      else 
      { 
       foreach (var msg in args) 
       { 
        client.SendMessage(msg); 
       } 
       Console.WriteLine($"Enqueued {args.Length} Message."); 
      } 
     } 
    } 

    internal class MessagingClient 
    { 
     private readonly ConnectionFactory connectionFactory; 
     private string ExchangeName => "defaultExchange"; 
     private string RoutingKey => ""; 
     private string QueueName => "Demo"; 

     private string HostName => "localhost"; 


     public MessagingClient() 
     { 
      this.connectionFactory = new ConnectionFactory {HostName = this.HostName}; 
     } 

     public void SendMessage(string message) 
     { 
      using (var connection = this.connectionFactory.CreateConnection()) 
      { 
       using (var channel = connection.CreateModel()) 
       { 
        this.QueueDeclare(channel, this.QueueName); 
        var properties = this.SetMessageProperties(channel, message); 

        string messageJson = JsonConvert.SerializeObject(message); 
        var body = Encoding.UTF8.GetBytes(messageJson); 

        channel.BasicPublish(exchange: this.ExchangeName, routingKey: this.RoutingKey, basicProperties: properties, body: body); 
       } 
      } 
     } 

     public string[] ReceiveMessages() 
     { 
      var messages = new List<string>(); 
      using (var connection = this.connectionFactory.CreateConnection()) 
      { 
       using (var channel = connection.CreateModel()) 
       { 
        this.QueueDeclare(channel, this.QueueName); 
        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 


        var consumer = new EventingBasicConsumer(channel); 
        consumer.Received += (model, ea) => 
        {       
         string bodystring = Encoding.UTF8.GetString(ea.Body); 
         messages.Add(bodystring); 

         // ReSharper disable once AccessToDisposedClosure 
         channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 
        }; 
        channel.BasicConsume(queue: this.QueueName, autoAck: false, consumer: consumer); 
       } 
      } 
      return messages.ToArray(); 
     } 

     private void QueueDeclare(IModel channel, string queueName) 
     { 
      channel.ExchangeDeclare(ExchangeName, type: ExchangeType.Direct, 
       durable: true, 
       autoDelete: false, 
       arguments: null); 

      var queueDeclared = channel.QueueDeclare(queue: queueName, 
       durable: true, 
       exclusive: false, 
       autoDelete: false, 
       arguments: null); 

      channel.QueueBind(queueName, ExchangeName, RoutingKey);    
     } 

     private IBasicProperties SetMessageProperties(IModel channel, object message) 
     { 
      var properties = channel.CreateBasicProperties(); 
      properties.ContentType = "application/json"; 
      properties.Persistent = true; 
      return properties; 
     } 

    } 
} 

回答

1
  • 首先,使用管理用戶界面,以確保您的交換和隊列設置正確的消息以及已發佈到它。
  • 第二,ReceiveMessages()因此您的閱讀器可能會立即返回一個空數組,然後事件有機會觸發。當消費者收到來自RabbitMQ的消息時,您沒有代碼需要等待。請注意0​​如何使用Console.ReadLine()。在您的示例中,可以使用同步對象(ManualResetEvent)來防止ReceiveMessages()返回,直到讀取特定的消息計數爲止。
+0

男人讓我感到愚蠢。當然,它不會阻止正在執行的線程。我添加了一些阻止邏輯,它像一個冠軍。謝謝盧克! – cdarrigo

+0

很高興我能幫到你 –