2012-07-27 35 views
2

對於基於Rx的變化跟蹤解決方案,我需要一個運算符,它可以以可觀察序列獲取第一個和最近一個項目。Rx:用於從Observable流獲取第一個和最近值的運算符

我怎麼會寫會產生以下大理石圖的Rx操作(注:括號只是用來陣容的項目......我不知道如何最好地在文本代表此):

 xs:---[a ]---[b ]-----[c ]-----[d ]---------| 
desired:---[a,a]---[a,b]-----[a,c]-----[a,d]---------| 

回答

5

使用相同的命名爲@Wilka你可以用下面的擴展,是有點不言自明:

public static IObservable<TResult> FirstAndLatest<T, TResult>(this IObservable<T> source, Func<T,T,TResult> func) 
{ 
    var published = source.Publish().RefCount(); 
    var first = published.Take(1);   
    return first.CombineLatest(published, func); 
} 

注意,它不一定返回Tuple,而是爲您提供在結果上傳遞選擇器函數的選項。這使它與基本的主要操作(CombineLatest)保持一致。這顯然很容易改變。

用法(如果你想的元組產生的數據流中):

Observable.Interval(TimeSpan.FromSeconds(0.1)) 
      .FirstAndLatest((a,b) => Tuple.Create(a,b)) 
      .Subscribe(Console.WriteLine); 
+3

您應該添加一個發佈操作符來共享訂閱源的副作用。上面的FirstAndLatest實現將導致兩個訂閱源爲其每個訂閱的結果,這可能會導致大量重複計算(或更糟糕的副作用,如啓動I/O和什麼)。 – 2012-07-28 22:13:35

+0

我接受這個答案,因爲這是第一個正確答案,儘管Bart的評論讓我想知道如何將Publish()整合到實現中。這不僅僅是在最後加上Publish()的問題。 – Damian 2012-07-29 00:01:37

+0

根據@BartDeSmet我添加了'Publish'(還添加了'RefCount',不知道這是否是首選方式,或者調用'Connect')。 – yamen 2012-07-29 21:20:47

1

我懷疑有這樣做的更好的方法(和我不喜歡使用這樣做),但你可以創建一個這樣

public static IObservable<Tuple<T, T>> FirstAndLatest2<T>(this IObservable<T> source) 
{ 
    return Observable.Defer(() => { 
     bool hasFirst = false; 
     T first = default(T); 

     return source 
      .Do(item => 
      { 
       if (!hasFirst) 
       { 
        hasFirst = true; 
        first = item; 
       } 
      }) 
      .Select(current => Tuple.Create(first, current)); 
    }); 
} 

操作,那麼你會用它LIK E本:

Observable.Interval(TimeSpan.FromSeconds(0.1)) 
    .FirstAndLatest() 
    .Subscribe(Console.WriteLine); 
+2

FirstAndLatest錯誤。由於缺乏懶惰,hasFirst和First狀態在所有訂閱中共享。所有的自定義操作符應該首先調用Observable.Create,除非它純粹是一個現有操作符組合的「宏」。或者,您也可以在這裏使用Observable.Defer以創建每個訂閱狀態。 – 2012-07-28 22:12:19

+0

謝謝,我錯過了。我已更新我的答案以使用Observable.Defer – Wilka 2012-07-30 08:51:43

1

試試這個:

public static IObservable<Tuple<T, T>> FirstAndLatest<T>(
    this IObservable<T> source) 
{ 
    return 
     source 
      .Take(1) 
      .Repeat() 
      .Zip(source, (x0, xn) => Tuple.Create(x0, xn)); 
} 

簡單,是吧?


或者,以共享底層源替代,試試這個:

public static IObservable<Tuple<T, T>> FirstAndLatest<T>(
    this IObservable<T> source) 
{ 
    return 
     source.Publish(
      s => 
       s.Take(1) 
       .Repeat() 
       .Zip(s, (x0, xn) => Tuple.Create(x0, xn))); 
} 

哎呦!抓住這個。它不起作用。它基本上不斷產生一對最新值。這樣發佈不起作用。原始實施是最好的。

+1

您應該添加一個發佈操作符來共享訂閱源的副作用。上面的FirstAndLatest實現將導致兩個訂閱源爲其每個訂閱的結果,這可能會導致大量重複計算(或更糟糕的副作用,如啓動I/O和什麼)。 – 2012-07-28 22:14:07

+0

@BartDeSmet - 我故意用這種方式實現它。如果源觀察值很熱,那麼我們是否不希望爲每個訂閱進行計算?不過,我會做出一個替代實施來避免這些問題。 – Enigmativity 2012-07-29 00:48:39

+0

@Enigmativity仍然認爲'Take(1).CombineLatest'比Take(1).Repeat()。Zip'更合適。 – yamen 2012-07-30 20:39:27

相關問題