2012-07-27 76 views
8

我是MassTransit的新手,我錯過了我的理解。MassTransit和事件與命令發佈

比方說,我有一個服務器場,所有節點都可以做同樣的工作。應用程序框架是CQRS的樣式。這意味着我有兩個基本類型的消息發佈:

  • 命令:必須處理:必須由服務器只有一個,任何人(先用工作插槽免費)
  • 事件的處理所有服務器

我已經建立了一個非常簡單的MassTransit原型(控制檯應用程序每隔X秒發送一次hello)。

在API中,我可以看到有一個「發佈」方法。我如何指定它是什麼類型的消息(一個對所有服務器)?

如果我看一個「處理程序」配置,我可以指定隊列uri。如果我爲所有主機指定相同的隊列,則所有主機都會收到該消息,但我無法將執行限制爲只有一臺服務器。

如果我從主機專用隊列偵聽,則只有一個服務器會處理這些消息,但我不知道如何廣播其他類型的消息。

請幫我理解我錯過了什麼。 PS:如果它在意,我的信息系統是rabbitmq。

爲了測試,我已創建一個共同的類庫這一類:

public static class ActualProgram 
{ 
    private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource(); 

    private static readonly Random g_Random = new Random(); 

    public static void ActualMain(int delay, int instanceName) 
    { 
     Thread.Sleep(delay); 
     SetupBus(instanceName); 

     Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token); 

     Console.WriteLine("Press enter at any time to exit"); 
     Console.ReadLine(); 
     g_Shutdown.Cancel(); 

     Bus.Shutdown(); 
    } 

    private static void PublishRandomMessage() 
    { 
     Bus.Instance.Publish(new Message 
     { 
      Id = g_Random.Next(), 
      Body = "Some message", 
      Sender = Assembly.GetEntryAssembly().GetName().Name 
     }); 

     if (!g_Shutdown.IsCancellationRequested) 
     { 
      Thread.Sleep(g_Random.Next(500, 10000)); 
      Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token); 
     } 
    } 

    private static void SetupBus(int instanceName) 
    { 
     Bus.Initialize(sbc => 
     { 
      sbc.UseRabbitMqRouting(); 
      sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName); 
      sbc.Subscribe(subs => 
      { 
       subs.Handler<Message>(MessageHandled); 
      }); 
     }); 
    } 

    private static void MessageHandled(Message msg) 
    { 
     ConsoleColor color = ConsoleColor.Red; 
     switch (msg.Sender) 
     { 
      case "test_app1": 
       color = ConsoleColor.Green; 
       break; 

      case "test_app2": 
       color = ConsoleColor.Blue; 
       break; 

      case "test_app3": 
       color = ConsoleColor.Yellow; 
       break; 
     } 
     Console.ForegroundColor = color; 
     Console.WriteLine(msg.ToString()); 
     Console.ResetColor(); 
    } 

    private static void MessageConsumed(Message msg) 
    { 
     Console.WriteLine(msg.ToString()); 
    } 
} 

public class Message 
{ 
    public long Id { get; set; } 

    public string Sender { get; set; } 

    public string Body { get; set; } 

    public override string ToString() 
    { 
     return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body); 
    } 
} 

我也有,只是運行ActualMain方法3控制檯應用程序:

internal class Program 
{ 
    private static void Main(string[] args) 
    { 
     ActualProgram.ActualMain(0, 1); 
    } 
} 

回答

9

你想要什麼被稱爲競爭消費者(搜索所以你會發現更多的信息) 使用RabbitMQ使生活變得簡單,你所需要做的就是爲每個消費者指定相同的隊列名稱,消息將只被處理他們。 而不是每次都像你一樣產生一個唯一的隊列。

private static void SetupBus(int instanceName) 
{ 
    Bus.Initialize(sbc => 
    { 
     sbc.UseRabbitMqRouting(); 
     sbc.ReceiveFrom("rabbitmq://localhost/Commands); 
     sbc.Subscribe(subs => 
     { 
      subs.Handler<Message>(MessageHandled); 
     }); 
    }); 
} 

AFAIK,您需要爲命令處理程序有一個單獨的進程,而不是事件處理程序。所有的命令處理程序都將接收來自同一個隊列,所有的事件處理程序都將接收來自它們自己的唯一隊列。

難題的另一部分就是如何讓信息進入公共汽車。您仍然可以使用publish來執行命令,但是如果您錯誤地配置了使用者,則可能會得到多個執行消息,因爲消息將發送給所有使用者,如果要確保消息在單個隊列中結束,則可以使用Send而不是Publish。

Bus.Instance 
    .GetEndpoint(new Uri("rabbitmq://localhost/Commands")) 
    .Send(new Message 
    { 
     Id = g_Random.Next(), 
     Body = "Some message", 
     Sender = Assembly.GetEntryAssembly().GetName().Name 
    }); 
+0

謝謝。這有助於我理解差異。唯一覺得害怕我的是「你需要爲命令處理程序分配一個獨立的進程,而不是事件處理程序」。這將對全球架構產生影響,但如果沒有其他選擇,我會接受這一點。 – 2012-08-22 08:02:18

+0

對於競爭消費者,您還需要針對每個消費者單獨的總線實例。命令通常在單獨的總線實例上的原因通常是它們在消費者在多個線程上處理時同步處理。 MT不允許您在每個消息類型的基礎上指定併發性。我認爲它可能在一個過程中託管多條巴士,但我還沒有嘗試過。我使用topshelf(來自同一個人)主持每一個「服務」。它允許您在部署時很容易地選擇進程(appdomains)或單獨的進程/獨立機器 – 2012-08-23 11:37:42

+0

thx。我會看一看 – 2012-08-23 11:46:04