2016-11-24 122 views
0

我需要偵聽UDP套接字和10秒後或緩衝區中的100個項目應調用一些邏輯。一般來說,它工作正常,但我不知道如何正確停止偵聽套接字。正確的方法來停止偵聽UDP套接字與RxExtensions

var ip = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1234); 
var socket = new UdpClient(ip); 

var cancellationTokenSource = new CancellationTokenSource(); 
var observable = 
    Observable 
    .FromAsync(socket.ReceiveAsync) 
    .DoWhile(() => !cancellationTokenSource.IsCancellationRequested) 
    .Buffer(TimeSpan.FromSeconds(10), 100); 

var subscribtion = observable.Subscribe(o => 
{ 
    //logic 
}); 

//simulate close method from another thread 
Task.Factory.StartNew(() => 
{ 
    Task.Delay(TimeSpan.FromSeconds(12)).Wait(); 
    cancellationTokenSource.Cancel(); 
    socket.Close(); 
    subscribtion.Dispose(); 
}); 

當我關閉可模擬插座,存在一種情況,當存在於緩存一些數據,無法處理 - 是沒有辦法避免這種情況?

當我從另一個進程發送一些消息以500ms的延遲,這將是作品像下面的例子:

  1. 20的消息將收入
  2. 一些邏輯將被調用 - 訂戶邏輯
  3. 4消息將收入
  4. 模擬close方法將被調用

當「Close方法」將被調用我需要立即處理緩衝區中的所有數據並關閉應用程序,而無需等待緩衝區超時。緩衝區延遲時間由用戶定義,所以我不想等待調用用戶邏輯,因爲它可能是一個相當長的時間。

+0

您能否提供[mcve]?我想運行一些代碼來說明你的問題。然後它可以被修復。我要說的是,解決方案將不會混合TPL和Rx。 Rx更強大,這就是你應該關注的。 – Enigmativity

+0

感謝您的關注。也許我試着重新問我的問題,並更多地關注Rx。在我的例子中,我使用了Observable with Buffer方法。它通知用戶每10秒鐘或100個緩衝區中的項目。但當有人關閉我的應用時,我有一種特殊情況。它可能發生在沒有指定的時間,所以例如,如果它發生在距離最後一個訂戶第三秒鐘通知並存在緩衝區中的一些數據時,我必須讓我的應用再持續7秒來處理來自緩衝區的所有數據。有什麼方法可以對Observable類(按需)說 - 停止觀察你的源和上次通知訂閱者。 – tom

+0

「停止觀察你的來源和上次通知訂戶」是什麼意思? – Enigmativity

回答

1

歡迎來到StackOverflow!

緩衝區方法的現有重載不支持時間,計數和門。但是有一個過載觸發緩衝區關閉,當一個序列產生一個值。所以我們只需通過合併緩衝區關閉的所有條件的觀察值來創建一個序列。

看看這個演示。

onTime將在指定的時間段後產生一個值。

onCount將在x項目通過後生成其第一個值。

onClose會立即產生一個訂閱值 - 但我們不會連接到它,直到我們決定。

  var producer = Observable.Interval(TimeSpan.FromSeconds(0.2)); 
      var source = producer.Publish().RefCount(); 

      var onClose = Observable.Return(0L).Publish(); 
      var onTime = Observable.Timer(TimeSpan.FromSeconds(2)); 
      var onCount = source.Skip(10); 

      var bufferClose = Observable.Merge(onClose, onTime, onCount); 

     var subscription = 
      source 
      .Buffer(() => bufferClose) 
      .Subscribe(list => Console.WriteLine(string.Join(",", list))); 

      Console.WriteLine("Waiting for close"); 
      Console.ReadLine(); 

      onClose.Connect(); //signal 
      subscription.Dispose(); 

      Console.WriteLine("Closed"); 

      Console.ReadLine(); 

這將產生基於數量或時間的輸出,直到回被按下,當它立即用什麼在緩衝區中的可用關閉。

+0

明天我會嘗試解決方案,但看起來很有希望。謝謝。 – tom

+0

@tom結果如何? – Asti