2016-08-18 75 views
3

我對下面的代碼示例以及人們的想法有點好奇。 這個想法是從NetworkStream中讀取(〜20 msg/s),而不是在main中工作,將事情傳遞給MainboxProcessor,以便在完成時處理並返回綁定。通過Rx從MailboxProcessor返回結果是一個好主意嗎?

通常的方法是使用PostAndReply,但我想綁定到ListView或C#中的其他控件。必須對LastN物品進行魔術並過濾。 另外,Rx有一些錯誤處理。

下面的示例觀察2..10中的數字並返回「hello X」。在它停止像EOF。將它設爲ToEnumerable是因爲其他線程在其他之前完成,但它也適用於Subscribe。

讓我困擾:

  1. 通過主題(OBJ)的遞歸左右。我沒有看到有大約3-4個問題。好主意?
  2. 主題的生命期。

open System 
open System.Threading 
open System.Reactive.Subjects 
open System.Reactive.Linq // NuGet, take System.Reactive.Core also. 
open System.Reactive.Concurrency 

type SerializedLogger() = 

    let _letters = new Subject<string>() 
    // create the mailbox processor 
    let agent = MailboxProcessor.Start(fun inbox -> 

     // the message processing function 
     let rec messageLoop (letters:Subject<string>) = async{ 

      // read a message 
      let! msg = inbox.Receive() 

      printfn "mailbox: %d in Thread: %d" msg Thread.CurrentThread.ManagedThreadId 
      do! Async.Sleep 100 
      // write it to the log  
      match msg with 
      | 8 -> letters.OnCompleted() // like EOF. 
      | x -> letters.OnNext(sprintf "hello %d" x) 

      // loop to top 
      return! messageLoop letters 
      } 

     // start the loop 
     messageLoop _letters 
     ) 

    // public interface 
    member this.Log msg = agent.Post msg 
    member this.Getletters() = _letters.AsObservable() 

/// Print line with prefix 1. 
let myPrint1 x = printfn "onNext - %s, Thread: %d" x Thread.CurrentThread.ManagedThreadId 

// Actions 
let onNext = new Action<string>(myPrint1) 
let onCompleted = new Action(fun _ -> printfn "Complete") 

[<EntryPoint>] 
let main argv = 
    async{ 
    printfn "Main is on: %d" Thread.CurrentThread.ManagedThreadId 

    // test 
    let logger = SerializedLogger() 
    logger.Log 1 // ignored? 

    let xObs = logger 
       .Getletters() //.Where(fun x -> x <> "hello 5") 
       .SubscribeOn(Scheduler.CurrentThread) 
       .ObserveOn(Scheduler.CurrentThread) 
       .ToEnumerable() // this 
       //.Subscribe(onNext, onCompleted) // or with Dispose() 

    [2..10] |> Seq.iter (logger.Log) 

    xObs |> Seq.iter myPrint1 

    while true 
     do 
     printfn "waiting" 
     System.Threading.Thread.Sleep(1000) 

    return 0 
    } |> Async.RunSynchronously // return an integer exit code 

回答

3

我也做過類似的事情,但使用普通的F#Event類型,而不是Subject。它基本上可以讓你創建IObservable並觸發它的訂閱 - 就像你使用更復雜的Subject。基於事件的版本是:

type SerializedLogger() = 
    let letterProduced = new Event<string>() 
    let lettersEnded = new Event<unit>() 
    let agent = MailboxProcessor.Start(fun inbox -> 
    let rec messageLoop (letters:Subject<string>) = async { 
     // Some code omitted 
     match msg with 
     | 8 -> lettersEnded.Trigger() 
     | x -> letterProduced.Trigger(sprintf "hello %d" x) 
     // ... 

member this.Log msg = agent.Post msg 
member this.LetterProduced = letterProduced.Publish 
member this.LettersEnded = lettersEnded.Publish 

重要的區別是:

  • Event不能觸發OnCompleted,所以我改爲暴露於兩個獨立的事件。這很不幸!鑑於Subject與所有其他方面的事件非常相似,這可能是使用主題而不是普通事件的一個很好的理由。

  • 使用Event的好方面是它是一個標準的F#類型,因此您不需要代理中的任何外部依賴關係。

  • 我注意到您的評論注意到第一個電話Log被忽略。這是因爲您只有在發生此次通話後才訂閱事件處理程序。我認爲你可以在這裏使用ReplaySubject variation on the Subject idea--它在你訂閱它時重播所有的事件,所以之前發生的事件不會丟失(但是需要緩存)。

綜上所述,筆者認爲使用Subject可能是一個好主意 - 它本質上是相同的模式,使用Event(我認爲這是暴露從代理的通知相當標準的方式),但它可以讓你觸發OnCompleted 。由於緩存成本的原因,我可能不會使用ReplaySubject - 您必須在觸發任何事件之前確保訂閱。

相關問題