我需要偵聽UDP套接字和10秒後或緩衝區中的100個項目應調用一些邏輯。一般來說,它工作正常,但我不知道如何正確停止偵聽套接字。正確的方法來停止偵聽UDP套接字與RxExtensions
var ip = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1234);
var socket = new UdpClient(ip);
var cancellationTokenSource = new CancellationTokenSource();
var observable =
Observable
.FromAsync(socket.ReceiveAsync)
.DoWhile(() => !cancellationTokenSource.IsCancellationRequested)
.Buffer(TimeSpan.FromSeconds(10), 100);
var subscribtion = observable.Subscribe(o =>
{
//logic
});
//simulate close method from another thread
Task.Factory.StartNew(() =>
{
Task.Delay(TimeSpan.FromSeconds(12)).Wait();
cancellationTokenSource.Cancel();
socket.Close();
subscribtion.Dispose();
});
當我關閉可模擬插座,存在一種情況,當存在於緩存一些數據,無法處理 - 是沒有辦法避免這種情況?
當我從另一個進程發送一些消息以500ms的延遲,這將是作品像下面的例子:
- 20的消息將收入
- 一些邏輯將被調用 - 訂戶邏輯
- 4消息將收入
- 模擬close方法將被調用
當「Close方法」將被調用我需要立即處理緩衝區中的所有數據並關閉應用程序,而無需等待緩衝區超時。緩衝區延遲時間由用戶定義,所以我不想等待調用用戶邏輯,因爲它可能是一個相當長的時間。
您能否提供[mcve]?我想運行一些代碼來說明你的問題。然後它可以被修復。我要說的是,解決方案將不會混合TPL和Rx。 Rx更強大,這就是你應該關注的。 – Enigmativity
感謝您的關注。也許我試着重新問我的問題,並更多地關注Rx。在我的例子中,我使用了Observable with Buffer方法。它通知用戶每10秒鐘或100個緩衝區中的項目。但當有人關閉我的應用時,我有一種特殊情況。它可能發生在沒有指定的時間,所以例如,如果它發生在距離最後一個訂戶第三秒鐘通知並存在緩衝區中的一些數據時,我必須讓我的應用再持續7秒來處理來自緩衝區的所有數據。有什麼方法可以對Observable類(按需)說 - 停止觀察你的源和上次通知訂閱者。 – tom
「停止觀察你的來源和上次通知訂戶」是什麼意思? – Enigmativity