2016-04-28 82 views
1

MassTransit的新手,仍然在玩一些教程項目。我將會有一個服務可能會運行20分鐘,並且在完成時我需要做一些事情。因爲它可能需要很長時間,我不想跟隨請求/響應模式並等待響應,並阻止線程。我認爲我的另一種選擇是創建另一個隊列,供消費者在完成作業時發佈。我看過這篇文章:MassTransit3 how to make request from consumer,但我不知道如何實現這一點。我的項目,再從this教程,看起來是這樣的:MassTransit:在消費者中創建回叫隊列

出版商:

static void Main(string[] args) 
{ 
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
     x.Host(new Uri("rabbitmq://localhost/"), h => {})); 
    var busHandle = bus.Start(); 
    var text = ""' 
    Console.WriteLine("Publisher"); 
    while(text != "quit") 
    { 
     Console.Write("Enter a message: "); 
     text = Console.ReadLine(); 

     var message = new SomethingHappenedMessage() 
     { 
      What = text, 
      When = DateTime.Now 
     } 
     bus.Publish(message); 
    } 
    busHandle.Stop(); 
} 

認購人:

static void Main(string[] args) 
{ 
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
    { 
     var host = x.Host(new Uri("rabbitmq://localhost"), h => {}); 
     x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e => 
      e.Consumer<SomethingHappenedConsumer>()); 
    }); 
    Console.WriteLine("Subscriber"); 
    var busHandle = bus.Start(); 
    Console.ReadKey(); 
    busHandle.Stop(); 
} 

消費者:

class SomethingHappenedConsumer : IConsumer<ISomethingHappened> 
{ 
    public Task Consume(ConsumeContext<ISomethingHappened> context) 
    { 
     Console.Write("TXT: " + context.Message.What); 
     Console.Write(" SENT: " + context.Message.When); 
     Console.Write(" PROCESSED: " + DateTime.Now); 
     Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")"); 
     return Task.FromResult(0); 
    } 
} 

我將如何去創造一個消費者中的回調隊列?

回答

1

在您的消費者中,只需Bus.Publish(new ResponseMessage());(或任何您稱之爲您的回覆)並讓您的發佈者爲該消息類型註冊消費者。您的發佈者似乎沒有綁定到隊列,只需組成隊列名稱並將其綁定到隊列即可。

1

再次感謝@Travis尋求幫助。只是想展示最終的代碼,我最終將爲任何人在未來。消息看起來很有趣,但它正確回覆給發佈者。

出版商:

static void Main(string[] args) 
{ 
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
    { 
     var host = x.Host(new Uri("rabbitmq://localhost/"), h => { }); 
     x.ReceiveEndpoint(host, "MtPubSubExample_TestPublisher", e => 
      e.Consumer<ResponseConsumer>()); 
    }); 
    var busHandle = bus.Start(); 
    var text = ""; 
    Console.WriteLine("Publisher"); 
    while(text != "quit") 
    { 
     Console.Write("Enter a message: "); 
     text = Console.ReadLine(); 

     var message = new SomethingHappenedMessage() 
     { 
      What = text, 
      When = DateTime.Now 
     }; 
     bus.Publish(message); 
    } 

    busHandle.Stop(); 
} 

響應消費者:

class ResponseConsumer : IConsumer<IResponse> 
{ 
    public Task Consume(ConsumeContext<IResponse> context) 
    { 
     Console.WriteLine("RESPONSE MESSAGE: " + context.Message.Message); 
     return Task.FromResult(0); 
    } 
} 

認購人:

static void Main(string[] args) 
{ 
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
    { 
     var host = x.Host(new Uri("rabbitmq://localhost/"), h => { }); 
     x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e => 
      e.Consumer<SomethingHappenedConsumer>()); 
    }); 
    Console.WriteLine("Subscriber"); 
    var busHandle = bus.Start(); 
    Console.ReadKey(); 
    busHandle.Stop(); 
} 

用戶消費:

class SomethingHappenedConsumer : IConsumer<ISomethingHappened> 
{ 
    private IBusControl bus = Bus.Factory.CreateUsingRabbitMq(x => 
     x.Host(new Uri("rabbitmq://localhost/"), h => { })); 

    public Task Consume(ConsumeContext<ISomethingHappened> context) 
    { 
     var now = DateTime.Now; 
     Console.Write("TXT: " + context.Message.What); 
     Console.Write(" SENT: " + context.Message.When); 
     Console.Write(" PROCESSED: " + now); 
     Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")"); 

     var response = new ResponseMessage() 
     { 
      Message = "The request was processed at " + now 
     }; 

     bus.Publish(response); 
     return Task.FromResult(0); 
    } 
}