2013-04-09 65 views
1

我喜歡Rx,但我遇到了一個問題,我一直在遇到。假設我們有一個上游IObservable<Foo>N下游順序序列,其中每個序列只對滿足一些簡單謂詞(比如foo.bar == someKey)的Foos感興趣。無功擴展:Where()的問題

當然這是針對Where()操作一個簡單的工作:

IObservable<Foo> foos = ...; 
foos.Where(foo => foo.bar == "abc").Subscribe(f => A(f)); 
foos.Where(foo => foo.bar == "xyz").Subscribe(f => B(f)); 
foos.Where(foo => foo.bar == "bla").Subscribe(f => C(f)); 
... 
[many more subscriptions for different bar values] 

什麼將主要發生在這裏的是,對上游生產的每個Foo,該Where()謂詞將爲該FooN次評估。它像線性搜索一樣查找所有需要此訂戶的訂戶Foo。這一切都很好,正是我們(應該)期待在這裏使用Where()

我的問題是,在我的情況下,N可能非常大,但想要任何特定Foo的用戶子集非常小。通常,每個Foo將只有一個。這意味着我本質上是做一個緩慢的線性搜索,當我可以做一個非常有效的查找來找到這個Foo需要傳播的少數下游序列。我的應用程序運行在非常關鍵的性能環境中,我無法承受這種低效率。

我已經絞盡腦汁想要找到一些更高效的優化方法,但我只能想出一些解決方案,這些解決方案涉及存儲大量狀態(映射訂戶等),並且必須非常小心地管理併發,這首先破壞了很多使用Rx的目的。我希望以現有的運營商的角度來處理這個問題。有沒有人處理過這個問題,或知道一個好的解決方案?我很高興提供更多細節。

編輯

我想我的例子是有點過於簡單化。我沒有處理與某些已知邊界內的數值匹配的情況。 N僅用於說明目的。上面的更新示例。

+0

你會對所有可能的bar 0..N值,還是僅僅爲某些? – 2013-04-09 17:54:49

+0

最好的可能是保持你的foos排序順序。看看'List.BinarySearch',然後只是迭代調用'Subscribe'直到'foo.Bar> = N' – 2013-04-09 18:16:41

+0

對不起,我的例子太簡單了,請參閱編輯和新示例代碼。 – Tim 2013-04-09 19:14:26

回答

3

得到了戴夫·塞克斯頓了在Rx討論板Codeplex上一個很好的解決方案:

https://rx.codeplex.com/discussions/439717

有關使用的GroupByGroupByUntil發佈如何?

例如:(未經測試

IConnectableObservable<IGroupedObservable<string, Foo>> foosByBar = 
    (from foo in foos 
    group foo by foo.bar) 
    .Publish(); 

foosByBar.Where(g => g.Key == "abc").Take(1).SelectMany(g => g).Subscribe(A); 
foosByBar.Where(g => g.Key == "xyz").Take(1).SelectMany(g => g).Subscribe(B); 
foosByBar.Where(g => g.Key == "bla").Take(1).SelectMany(g => g).Subscribe(C); 

foosByBar.Connect(); 

的GroupBy使用詞典查找每鍵以找到適當的可觀察到的,其中值被推動。

發佈廣播分組,以便字典查找操作被所有觀察者共享。

/採取執行謂詞只有一次找到相應的組,然後收到該 組的每個值的廣播與興趣相同的密鑰的任何其他觀察員一起。

注意的GroupBy不重播IGroupedObservable所以你 必須設置所有訂閱的連接之前。如果您想 而使用引用次數連接,那麼也許你應該考慮 應用重播運營商的 結果的GroupBy

+0

你爲什麼不將他們在那裏找到的解決方案發布到這個答案中? – 2013-04-10 16:06:11

+0

我認爲從源頭獲取信息會更好,但無論如何CodePlex在某一天爆炸時我都會這樣做。 – Tim 2013-04-10 18:08:19

0

有些東西正在存儲狀態,現在它只是可見的存儲您通過Wheres添加的所有訂閱者。目前還不清楚你是否意識到這一點,但foos必須通知每個觀察員每條消息。所有Where所做的就是讓大多數觀察者只是檢查謂詞並返回,但是每個消息都會檢查每個謂詞。

構建一個包含作爲觀察者的處理程序的映射不會太困難,應該爲您提供所需的性能收益。只需註冊儘可能多的處理程序,然後將地圖訂閱到源觀察點即可。如果一個Dictionary沒有提供你需要的匹配語義,你將不得不提出一些其他的方案來減少查找,但總體思路是一樣的。請注意,如果它有多個它應該處理的輸入,您可以多次註冊相同的處理程序,並且您可以爲相同的輸入註冊多個處理程序。

class ObserverMap<T> : IObserver<T> 
{ 
    ObserverMap(Action<Exception> onError, Action onCompleted) 
    { 
     _onError = onError; 
     _onCompleted = onCompleted; 
     _handlers = new Dictionary<T, List<Action<T>>>(); 
    } 
    ObserverMap(Action<Exception> onError, Action onCompleted, IEqualityComparer<T> comparer) 
    { 
     _onError = onError; 
     _onCompleted = onCompleted; 
     _handlers = new Dictionary<T, List<Action<T>>>(comparer); 
    } 

    int _stopped; 
    Dictionary<T, List<Action<T>>> _handlers; 
    Action<Exception> _onError; 
    Action _onCompleted; 

    public void OnCompleted() 
    { 
     if (System.Threading.Interlocked.Exchange(ref _stopped, 1) == 0) 
     { 
      if (_onCompleted != null) _onCompleted(); 
     } 
    } 

    public void OnError(Exception error) 
    { 
     if (System.Threading.Interlocked.Exchange(ref _stopped, 1) == 0) 
     { 
      if (_onCompleted != null) _onCompleted(); 
     } 
    } 

    public void OnNext(T value) 
    { 
     if (_stopped != 0) return; 

     List<Action<T>> match; 
     if (_handlers.TryGetValue(value, out match)) 
     { 
      foreach (var handler in match) 
      { 
       handler(value); 
      } 
     } 
    } 

    public IDisposable RegisterHandler(T key, Action<T> handler) 
    { 
     if (handler == null) throw new ArgumentNullException("handler"); 

     List<Action<T>> match; 
     if (!_handlers.TryGetValue(key, out match)) 
     { 
      match = new List<Action<T>>(); 
      _handlers.Add(key, match); 
     } 
     match.Add(handler); 

     return System.Reactive.Disposables.Disposable.Create(() => match.Remove(handler)); 
    } 
} 
+0

我會建議,而不是寫一個簽名'IDisposable RegisterHandler(T鍵,動作處理程序)'用於添加訂閱者的方法,寫一個索引器屬性/方法的簽名更像'IObservable this [T key] {得到{return Observable.Create(...); }}。 – 2013-04-09 22:58:03

+0

是的,我知道狀態正在被存儲和每個oberver的通知(這是我描述線性搜索時的意思)。 Rx雖然整齊地把它弄乾淨了,我寧願自己也不必寫這種代碼。我確實有類似於你所描述的解決方案,但我希望有一種更清潔的「內置」方式。 – Tim 2013-04-09 23:21:30