2017-04-26 53 views
1

我很新手在rebus。Rebus停止從Azure服務總線獲取消息。從100條消息只有5我可以取

我已經從零開始構建了pub/sub示例。現在我正在用戶中收到消息。

我面臨的問題是我發佈了100條消息,然後突然當我啓動訂閱服務時,它僅獲得100條消息中的5條消息。

Windows服務執行5次異步任務,然後熄滅。我做錯了什麼?

我的用戶配置是這樣的:

using (var activator = new BuiltinHandlerActivator()) { 
    activator.Register(() => new TestMessageHandler()); 

    Configure.With(activator) 
     .Transport(t => t.UseAzureServiceBus(Constants.connectionString, Constants.subQueue)) 
     .Routing(r => r.TypeBased().MapAssemblyOf<TestMessage>(Constants.pubQueue)‌​) 
     .Start(); 

    activator.Bus.Subscribe<TestMessage>().Wait(120000); 
} 

和我的處理程序是這樣的:

public async Task Handle(TestMessage message) { 
    var message = string.Format("name: {1} and source name {2} {0} using the warp as a transport {0}", Environment.NewLine, message.Name , message.SourceName); 

    await Task.Run(() => Logger(message)); 
} 

private void Logger(TestMessage message) { 
    Console.WriteLine(message.ToString(false)); 
} 

從我張貼的代碼,是有什麼,我做錯了什麼?

+0

你可以嘗試在訂戶中顯示代碼嗎? – mookid8000

+0

您好,感謝您的快速回復。這是我的代碼。 – hfpg2001

+0

使用(var activator = new BuiltinHandlerActivator()) { activator.Register(()=> new TestMessageHandler()); Configure.With(活化劑) .Transport(T => t.UseAzureServiceBus(Constants.connectionString,Constants.subQueue)) .Routing(R => r.TypeBased()。MapAssemblyOf (Constants.pubQueue)) 。開始(); activator.Bus.Subscribe ().Wait(120000); } – hfpg2001

回答

0

從代碼

using (var activator = new BuiltinHandlerActivator()) { 
    (...) 
    activator.Bus.Subscribe<TestMessage>().Wait(120000); 
} 

好像你的用戶幾乎立即處置activator,從而停止總線。

這個假設很好地符合您所遇到的行爲,只處理隊列中的前幾條消息。

您應該在運行應用程序的整個持續時間內保持激活器實例(或者返回IBus,如果您願意的話),然後在關閉時處置它。

+0

謝謝,@mookid,但我認爲等待將等待,直到排隊是空的....是不是? – hfpg2001

+0

不,這不是它的工作原理...... Rebus的目的是在你的應用程序運行的整個過程中啓動和保持,只要它在運行,它將處理消息,它也是設計的因此即使在隊列中存在更多的消息(例如,如果隊列中存在更多消息),也可以隨時停止它。你想更新你的系統或其他...只是在您重新啓動時恢復消息處理 – mookid8000

相關問題