2016-09-29 71 views
2

我正在嘗試在MassTransit v3中使用C#和RabbitMQ實現只發布總線,其中總線沒有使用者。這個概念是消息將被髮布和排隊,然後一個單獨的微服務將消耗隊列中的消息。看看this SO answer,必須指定接收端點,以便消息實際排隊。但是,這似乎是contradict the common gotchas in the MassTransit docs,其中指出If you need to only send or publish messages, don’t create any receive endpoints使用C#和RabbitMQ在MassTransit v3中實現只發布總線

下面是一些示例代碼:

public class Program 
    { 
     static void Main(string[] args) 
     { 
      var bus = BusConfigurator.ConfigureBus(); 

      bus.Start(); 

      bus.Publish<IItemToQueue>(new ItemToQueue { Text = "Hello World" }).Wait(); 

      Console.ReadKey(); 

      bus.Stop(); 
     } 
    } 

    public static class BusConfigurator 
    { 
     public static IBusControl ConfigureBus() 
     { 
      var bus = Bus.Factory.CreateUsingRabbitMq(cfg => 
      { 
       var host = cfg.Host(new Uri("rabbitmq://localhost/"), hst => 
       { 
        hst.Username("guest"); 
        hst.Password("guest"); 
       }); 

       cfg.ReceiveEndpoint(host, "queuename", e => 
       { 
        e.Consumer<MyConsumer>(); 
       }); 
      }); 

      return bus; 
     } 
    } 

    public interface IItemToQueue 
    { 
     string Text { get; set; } 
    } 

    public class ItemToQueue : IItemToQueue 
    { 
     public string Text { get; set; } 
    } 

    public class MyConsumer : IConsumer<IItemToQueue> 
    { 
     public async Task Consume(ConsumeContext<IItemToQueue> context) 
     { 
      await Console.Out.WriteLineAsync(context.Message.Text); 
     } 
    } 

在此示例中,我收到了RabbitMQ的隊列中的消息符合市場預期,這是由MyConsumer它寫的Hello World控制檯消耗,消息隨後從隊列中刪除。

然而,當我從上面除去下面的代碼並重新運行樣品:

cfg.ReceiveEndpoint(host, RabbitMqConstants.ValidationQueue, e => 
{ 
    e.Consumer<MyConsumer>(); 
}); 

臨時隊列創建(具有生成的名稱)和消息似乎從未被放置到臨時隊列。當總線停止時,該隊列將被刪除。

我遇到的問題是指定了ReceiveEndpoint,消息將從發佈程序中的隊列中消耗並被刪除(意味着消費者微服務不會處理排隊的項目)。沒有指定RecieveEndpoint,就會使用一個臨時隊列(並且消費者微服務不知道這個臨時隊列的名稱),這個消息似乎永遠不會排隊,並且當總線停止時隊列被刪除,如果不是該程序失敗了。

an example of a send only bus in the MassTransit docs但它是非常基本的,所以我想知道如果有人有任何建議?

+0

雖然您不需要發佈者/發件人的接收端點,但確實需要它們在某個地方,否則就沒有綁定到消息交換的隊列,而且這些消息也不會路由到任何地方 - 從而消失。 –

回答

1

接收端點應位於您的服務中,與發佈應用程序分開。這樣,該服務將具有接收端點並在應用程序發佈它們時使用這些消息。

如果您在應用程序中有接收端點,則應用程序將使用消息,因爲它具有在接收端點中指定的相同隊列名稱。

你需要做的是創建另一個服務,使用相同的配置(包括接收端點) - 並將接收端點從應用程序中取出。此時,該服務將擁有接收端點並消耗隊列中的消息。即使服務停止,郵件仍會繼續傳送到隊列中,一旦服務啓動,它們將開始消費。

+0

這只是一個讓RabbitMQ成立的問題嗎?我處於類似的情況,我需要保證接收方隊列在應用程序開始發送消息時立即接收消息,而不管該服務在當時是否已啓動並正在運行,只要它們正在保存並最終得到處理。如果一旦運行服務設置了滿足此要求的事情,那沒問題。這是文檔可以使用某些示例的一個區域。假設這有效,我會很高興寫出來。 –

+0

在GitHub中有一個問題,實質上是一個不啓動,但在RabbitMQ中創建拓撲。自己接線的問題是,類型系統中細微的細微差別可能無法正確手動設置,從而導致啓動時出現錯誤。但是,如果要發送到特定隊列,則會有查詢字符串參數供發件人將交換機綁定到隊列。它是bind或bindQueue = true。 –

+0

謝謝。你能提供問題鏈接嗎?今天早上我終於可以做些事情了。一旦我和一位消費者開始合作,我就可以將其刪除,並將消息保留在隨後的運行中。我處於開發/探索模式,顯然不是一個生產計劃:)你提出了一些關於我一直在考慮的類型和版本信息的問題,但是這是另一個帖子的主題,我玩了更多 –