2013-02-10 63 views
0

我很難理解主題對象。無功擴展,主題<T>

考慮下面的代碼:

 var sub = new Subject<int>(); 
     sub.Subscribe(x => Console.WriteLine(x)); //subscriber #1   
     sub.Subscribe(x => Console.WriteLine(x)); //subscriber #2   
     sub.OnNext(2); 

我創建int類型的題目,當我執行OnNext它調用其它付費用戶(#1和#2)。 我沒有得到的東西是我讀的那個主體意味着一個對象,它既是可觀察的,又是觀察者,但是這是如何解釋爲什麼當我調用OnNext時,其他用戶被調用。

我會理解,如果主題的OnNext會將其傳播給所有訂閱者=發佈給所有其他人(這是有道理的),但是當我檢查源代碼時,我看不到任何內容,請參閱下文。

有人可能從下面的代碼瞭解究竟是什麼使OnNext(2)傳播到其他訂閱? (#1,#2)?

公共密封類主題:ISubject,ISubject,IObserver,的IObservable,IDisposable的 {// 領域 私人揮發性IObserver _observer;

// Methods 
public Subject() 
{ 
    this._observer = NopObserver<T>.Instance; 
} 

public void Dispose() 
{ 
    this._observer = DisposedObserver<T>.Instance; 
} 

public void OnCompleted() 
{ 
    IObserver<T> comparand = null; 
    IObserver<T> completed = DoneObserver<T>.Completed; 
    do 
    { 
     comparand = this._observer; 
    } 
    while (((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) && (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, completed, comparand) != comparand)); 
    comparand.OnCompleted(); 
} 

public void OnError(Exception error) 
{ 
    if (error == null) 
    { 
     throw new ArgumentNullException("error"); 
    } 
    IObserver<T> comparand = null; 
    DoneObserver<T> observer3 = new DoneObserver<T> { 
     Exception = error 
    }; 
    DoneObserver<T> observer2 = observer3; 
    do 
    { 
     comparand = this._observer; 
    } 
    while (((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) && (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, observer2, comparand) != comparand)); 
    comparand.OnError(error); 
} 

public void OnNext(T value) 
{ 
    this._observer.OnNext(value); 
} 

public IDisposable Subscribe(IObserver<T> observer) 
{ 
    if (observer == null) 
    { 
     throw new ArgumentNullException("observer"); 
    } 
    IObserver<T> comparand = null; 
    IObserver<T> observer3 = null; 
    do 
    { 
     comparand = this._observer; 
     if (comparand == DisposedObserver<T>.Instance) 
     { 
      throw new ObjectDisposedException(""); 
     } 
     if (comparand == DoneObserver<T>.Completed) 
     { 
      observer.OnCompleted(); 
      return Disposable.Empty; 
     } 
     DoneObserver<T> observer4 = comparand as DoneObserver<T>; 
     if (observer4 != null) 
     { 
      observer.OnError(observer4.Exception); 
      return Disposable.Empty; 
     } 
     if (comparand == NopObserver<T>.Instance) 
     { 
      observer3 = observer; 
     } 
     else 
     { 
      Observer<T> observer5 = comparand as Observer<T>; 
      if (observer5 != null) 
      { 
       observer3 = observer5.Add(observer); 
      } 
      else 
      { 
       observer3 = new Observer<T>(new ImmutableList<IObserver<T>>(new IObserver<T>[] { comparand, observer })); 
      } 
     } 
    } 
    while (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, observer3, comparand) != comparand); 
    return new Subscription<T>((Subject<T>) this, observer); 
} 

private void Unsubscribe(IObserver<T> observer) 
{ 
    IObserver<T> comparand = null; 
    IObserver<T> instance = null; 
Label_0004: 
    comparand = this._observer; 
    if ((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) 
    { 
     Observer<T> observer4 = comparand as Observer<T>; 
     if (observer4 != null) 
     { 
      instance = observer4.Remove(observer); 
     } 
     else 
     { 
      if (comparand != observer) 
      { 
       return; 
      } 
      instance = NopObserver<T>.Instance; 
     } 
     if (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, instance, comparand) != comparand) 
     { 
      goto Label_0004; 
     } 
    } 
} 

// Properties 
public bool HasObservers 
{ 
    get 
    { 
     return (((this._observer != NopObserver<T>.Instance) && !(this._observer is DoneObserver<T>)) && (this._observer != DisposedObserver<T>.Instance)); 
    } 
} 

// Nested Types 
private class Subscription : IDisposable 
{ 
    // Fields 
    private IObserver<T> _observer; 
    private Subject<T> _subject; 

    // Methods 
    public Subscription(Subject<T> subject, IObserver<T> observer) 
    { 
     this._subject = subject; 
     this._observer = observer; 
    } 

    public void Dispose() 
    { 
     IObserver<T> observer = Interlocked.Exchange<IObserver<T>>(ref this._observer, null); 
     if (observer != null) 
     { 
      this._subject.Unsubscribe(observer); 
      this._subject = null; 
     } 
    } 
} 

}

+0

順便說一句,在複雜這裏是[接收1.1/2.0的性能優化]的一部分(http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions- v2-0-β-可供now.aspx)。 (§提高生產者速度)。 – 2014-09-09 20:13:03

回答

0

主題是可觀察到的,因爲你可以訂閱。你用你的例子來做(你沒有訂閱兩個訂閱者)。

主題還是一個觀察者,因爲你可以做到以下幾點:

someObservable.Subscribe(subject); 

這樣,你的主題將someObservable接收事件,並將其傳播給自己的用戶。

P.S.在你的代碼中,你自己調用OnNext()方法。但是,這正是someObservable當你與你的主題訂閱時所要做的。

+0

看到我的答案,它總結了一些。無論如何感謝Thanx。 – Gilad 2013-02-10 17:58:06

+0

對不起,評論錯誤的地方... – 2014-09-09 20:12:20

1

我知道,但困擾我的是它沒有任何意義。我深入研究代碼,發現他們內部的觀察者實現包含更多的觀察者,見下文。

如果你檢查OnNext方法,你可以看到他們遍歷所有觀察者並調用他們的OnNext方法。

現在一切都對我有意義,我理解了邏輯,但看不到它在哪裏實現。

internal class Observer<T> : IObserver<T> 
{ 
    private readonly ImmutableList<IObserver<T>> _observers; 

    public Observer(ImmutableList<IObserver<T>> observers) 
    { 
     this._observers = observers; 
    } 

    internal IObserver<T> Add(IObserver<T> observer) 
    { 
     return new Observer<T>(this._observers.Add(observer)); 
    } 

    public void OnCompleted() 
    { 
     foreach (IObserver<T> observer in this._observers.Data) 
     { 
      observer.OnCompleted(); 
     } 
    } 

    public void OnError(Exception error) 
    { 
     foreach (IObserver<T> observer in this._observers.Data) 
     { 
      observer.OnError(error); 
     } 
    } 

    public void OnNext(T value) 
    { 
     foreach (IObserver<T> observer in this._observers.Data) 
     { 
      observer.OnNext(value); 
     } 
    } 

    internal IObserver<T> Remove(IObserver<T> observer) 
    { 
     int index = Array.IndexOf<IObserver<T>>(this._observers.Data, observer); 
     if (index < 0) 
     { 
      return this; 
     } 
     if (this._observers.Data.Length == 2) 
     { 
      return this._observers.Data[1 - index]; 
     } 
     return new Observer<T>(this._observers.Remove(observer)); 
    } 
}