2014-11-03 81 views
1

對於我的生活,我一直無法獲得與RabbitMQ與臨時replyto隊列工作的RPC。以下是來自this test的簡單示例。我在輸出窗口中看到一堆異常,並且dlq填滿了,但消息從未被確認。無法將RabbitMQ RPC與ServiceStack分佈式服務結合使用。

namespace ConsoleApplication4 
{ 
    class Program 
    { 
     public static IMessageService CreateMqServer(int retryCount = 1) 
     { 
      return new RabbitMqServer { RetryCount = retryCount }; 
     } 

     static void Main(string[] args) 
     { 

      using (var mqServer = CreateMqServer()) 
      { 
       mqServer.RegisterHandler<HelloIntro>(m => 
        new HelloIntroResponse { Result = "Hello, {0}!".Fmt(m.GetBody().Name) }); 
       mqServer.Start(); 
      } 

      Console.WriteLine("ConsoleAppplication4"); 
      Console.ReadKey(); 
     } 
    } 
} 



namespace ConsoleApplication5 
{ 
    class Program 
    { 
     public static IMessageService CreateMqServer(int retryCount = 1) 
     { 
      return new RabbitMqServer { RetryCount = retryCount }; 
     } 

     static void Main(string[] args) 
     { 
      using (var mqServer = CreateMqServer()) 
      { 
       using (var mqClient = mqServer.CreateMessageQueueClient()) 
       { 
        var replyToMq = mqClient.GetTempQueueName(); 
        mqClient.Publish(new Message<HelloIntro>(new HelloIntro { Name = "World" }) 
        { 
         ReplyTo = replyToMq 
        }); 

        IMessage<HelloIntroResponse> responseMsg = mqClient.Get<HelloIntroResponse>(replyToMq); 
        mqClient.Ack(responseMsg); 

       } 
      } 

      Console.WriteLine("ConsoleAppplication5"); 
      Console.ReadKey(); 
     } 
    } 
} 

首先例外

RabbitMQ.Client.Exceptions.OperationInterruptedException occurred 
    _HResult=-2146233088 
    _message=The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=405, text="RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'mq:tmp:10dd20804ee546d6bf5a3512f66143ec' in vhost '/'", classId=50, methodId=20, cause= 
    HResult=-2146233088 
    IsTransient=false 
    Message=The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=405, text="RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'mq:tmp:10dd20804ee546d6bf5a3512f66143ec' in vhost '/'", classId=50, methodId=20, cause= 
    Source=RabbitMQ.Client 
    StackTrace: 
     at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply() 
     at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body) 
     at RabbitMQ.Client.Framing.Impl.v0_9_1.Model._Private_QueueBind(String queue, String exchange, String routingKey, Boolean nowait, IDictionary`2 arguments) 
     at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey, IDictionary`2 arguments) 
     at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey) 
     at ServiceStack.RabbitMq.RabbitMqExtensions.RegisterQueue(IModel channel, String queueName) 
     at ServiceStack.RabbitMq.RabbitMqExtensions.RegisterQueueByName(IModel channel, String queueName) 
     at ServiceStack.RabbitMq.RabbitMqProducer.PublishMessage(String exchange, String routingKey, IBasicProperties basicProperties, Byte[] body) 
    InnerException: 

其次是這個

System.Threading.ThreadInterruptedException occurred 
    _HResult=-2146233063 
    _message=Thread was interrupted from a waiting state. 
    HResult=-2146233063 
    IsTransient=true 
    Message=Thread was interrupted from a waiting state. 
    Source=mscorlib 
    StackTrace: 
     at System.Threading.Monitor.ObjWait(Boolean exitContext, Int32 millisecondsTimeout, Object obj) 
     at System.Threading.Monitor.Wait(Object obj, Int32 millisecondsTimeout, Boolean exitContext) 
    InnerException: 

然後重複了多次,並掛起。這個特別的post似乎表明他們能夠通過ServerStack和RabbitMQ RPC獲得某種成功,但在開始更改我的代碼之前,我想知道我的代碼無法工作的原因。

謝謝 斯蒂芬

+1

如果聲明獨佔隊列,則不能從其他AMQP通道訪問這些隊列。 – 2014-11-10 21:51:27

+0

此問題已解決,現在一切正常 – 2014-11-10 22:03:23

回答

1

重新指定不再執行排隊隊列in this commit時出現問題。

還有一個new RabbitMqTest project展示了一個簡單的客戶端/服務器示例,通過2個獨立的控制檯應用程序進行通信。

此更改可從v4.0.34 +即now on MyGet獲取。

ServiceStack.RabbitMqRabbitMq.Client NuGet依賴關係也已升級到v3.4.0。

2

當你的客戶端調用GetTempQueueName(),它創建了一個獨特的隊列,這不能從另一個連接(即服務器)進行訪問。

所以我創造了我自己的簡單的MQ客戶端不使用servicestack的MQ客戶端,只取決於的RabbitMQ的.NET庫:

public class MqClient : IDisposable 
    { 
     ConnectionFactory factory = new ConnectionFactory() 
     { 
      HostName = "192.168.97.201", 
      UserName = "guest", 
      Password = "guest", 
      //VirtualHost = "test", 
      Port = AmqpTcpEndpoint.UseDefaultPort, 
     }; 

     private IConnection connection; 
     private string exchangeName; 

     public MqClient(string defaultExchange) 
     { 
      this.exchangeName = defaultExchange; 
      this.connection = factory.CreateConnection(); 
     } 

     public TResponse RpcCall<TResponse>(IReturn<TResponse> reqDto, string exchange = null) 
     { 
      using (var channel = connection.CreateModel()) 
      { 
       string inq_queue_name = string.Format("mq:{0}.inq", reqDto.GetType().Name); 

       string responseQueueName = channel.QueueDeclare("",false,false,true,null).QueueName; 
       //string responseQueueName = channel.QueueDeclare().QueueName; 

       var props = channel.CreateBasicProperties(); 
       props.ReplyTo = responseQueueName; 

       var message = ServiceStack.Text.JsonSerializer.SerializeToString(reqDto); 

       channel.BasicPublish(exchange ?? this.exchangeName, inq_queue_name, props, UTF8Encoding.UTF8.GetBytes(message)); 

       var consumer = new QueueingBasicConsumer(channel); 
       channel.BasicConsume(responseQueueName, true, consumer); 


       var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
       //channel.BasicAck(ea.DeliveryTag, false); 

       string response = UTF8Encoding.UTF8.GetString(ea.Body); 
       string responseType = ea.BasicProperties.Type; 
       Console.WriteLine(" [x] New Message of Type '{1}' Received:{2}{0}", response, responseType, Environment.NewLine); 

       return ServiceStack.Text.JsonSerializer.DeserializeFromString<TResponse>(response); 

      } 
     } 

     ~MqClient() 
     { 
      this.Dispose(); 
     } 

     public void Dispose() 
     { 
      if (connection != null) 
      { 
       this.connection.Dispose(); 
       this.connection = null; 
      } 
     } 

    } 

可以使用這樣的:

using (var mqClient = new MqClient("mx.servicestack")) 
{ 
    var pingResponse = mqClient.RpcCall<PingResponse>(new Ping { }); 
} 

重要:您必須使用servicestack版本4.0.32+。

相關問題