2011-10-10 29 views
1

我試圖解決以下問題:a)用戶從IObservable接收事件一段時間。然後它取消訂閱,做一些東西,然後再次訂閱。在這裏它應該開始接收來自完全取消訂閱的同一點的事件。 b)這種行爲對於多用戶模型是可取的。例如。當一個人取消訂閱時,其他人應該繼續接收事件。如何重新訂閱特定點的序列?

有沒有RX方面的建議?

在此先感謝!

+0

你點「b)」告訴我,你可能還沒有完全充分理解的Rx。除非您明確地「發佈」可觀察項或使用主題,否則觀察項不會在多個訂戶之間共享。 (您應該儘可能避免主題。)您應該將每個訂閱表單視爲一個新的獨特的可觀察操作鏈,回溯到可觀察的原始源(或多個來源)。 – Enigmativity

回答

0

這聽起來像你需要一個「pausable」流。假設一次只有一個用戶會處理這些值(而其他用戶只是等待),這個solution可能就是你需要的。

+0

這個帖子有一些問題,正如在這個SO問題中所確定的:http://stackoverflow.com/questions/7620182/pause-and-resume-subscription-on-cold-iobservable –

1

這裏有一個相當簡單的Rx方式來做你想從my answer複製到this other question。我創建了一個名爲Pausable的擴展方法,該方法需要可觀察的源代碼和第二個可觀察的布爾值,以暫停或恢復可觀察值。

public static IObservable<T> Pausable<T>(
    this IObservable<T> source, 
    IObservable<bool> pauser) 
{ 
    return Observable.Create<T>(o => 
    { 
     var paused = new SerialDisposable(); 
     var subscription = Observable.Publish(source, ps => 
     { 
      var values = new ReplaySubject<T>(); 
      Func<bool, IObservable<T>> switcher = b => 
      { 
       if (b) 
       { 
        values.Dispose(); 
        values = new ReplaySubject<T>(); 
        paused.Disposable = ps.Subscribe(values); 
        return Observable.Empty<T>(); 
       } 
       else 
       { 
        return values.Concat(ps); 
       } 
      }; 

      return pauser.StartWith(false).DistinctUntilChanged() 
       .Select(p => switcher(p)) 
       .Switch(); 
     }).Subscribe(o); 
     return new CompositeDisposable(subscription, paused); 
    }); 
} 

它可以像這樣使用:

var xs = Observable.Generate(
    0, 
    x => x < 100, 
    x => x + 1, 
    x => x, 
    x => TimeSpan.FromSeconds(0.1)); 

var bs = new Subject<bool>(); 

var pxs = xs.Pausable(bs); 

pxs.Subscribe(x => { /* Do stuff */ }); 

Thread.Sleep(500); 
bs.OnNext(true); 
Thread.Sleep(5000); 
bs.OnNext(false); 
Thread.Sleep(500); 
bs.OnNext(true); 
Thread.Sleep(5000); 
bs.OnNext(false); 
+0

謝謝,我讚賞這兩種方法通過我的問題的答案。處理IDisposable結果來管理IObservable狀態更適合我的任務,所以我在下面標記了接受的答案。您的評論和回答讓我更深入瞭解Rx,非常有幫助,再次感謝。 – borovikpe

+0

@Enigmativity它看起來像有一個競爭條件。我的測試代碼:\t \t \t 'var pxs = Observable.Interval(TimeSpan.FromMilliseconds(1))。Pausable(bs); var i = 0; pxs.Subscribe(pi => {if(i ++!= pi)throw new Exception();});' –

+0

@IlianPinzon - 我相信我解決了我的競爭條件。你能檢查嗎? – Enigmativity