2011-02-25 63 views
3

你好」我已經試過了的101個的Rx例子之一:爲什麼Rx Observable.Subscribe會阻止我的線程?

static IEnumerable<int> GenerateAlternatingFastAndSlowEvents() 
    { 
     int i = 0; 

     while (true) 
     { 
      if (i > 1000) 
      { 
       yield break; 
      } 
      yield return i; 
      Thread.Sleep(i++ % 10 < 5 ? 500 : 1000); 
     } 
    } 

    private static void Main() 
    { 
     var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp(); 
     var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750)); 

     using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp))) 
     { 
      Console.WriteLine("Press any key to unsubscribe"); 
      Console.ReadKey(); 
     } 

     Console.WriteLine("Press any key to exit"); 
     Console.ReadKey(); 
    } 

我不明白爲什麼行‘按任意鍵退訂’從來沒有顯示。我的理解是訂閱是異步的,你訂閱並立即返回。我錯過了什麼導致我的主線程阻塞?

回答

6

阻塞是由while (true)IEnumerable<T>.ToObservable()擴展方法默認爲CurrentThreadScheduler的可枚舉循環的組合引起的。

如果提供Scheduler.TaskPool(或pre-.NET Scheduler.ThreadPool 4)的ToObservable過載,你應該明白你希望(儘管它不會打電話給你的用戶在主線程,僅供參考)的行爲。

說了這麼多,我想你會發現你的組合Thread.SleepThrottle將按照你的預期工作。您最好創建一個使用調度程序來安排延遲的自定義觀察值。

2

我同意理查德。

.ToObservable()實施看起來是這樣的:

public static IObservable<TSource> ToObservable<TSource>(
    this IEnumerable<TSource> source) 
{ 
    if (source == null) 
    { 
     throw new ArgumentNullException("source"); 
    } 
    return source.ToObservable<TSource>(Scheduler.CurrentThread); 
} 

它調用了.ToObservable(IScheduler)超載與Scheduler.CurrentThread因爲你正在使用.Sleep(...)造成觀察到有延遲完成代碼之前可以超越.Subscribe(...)方法。想想這個代碼的行爲是什麼樣子的,如果它全部運行在一個線程上(它是這樣的)

爲了解決這個問題,你可以使用任務池或線程池調度程序,理查德建議,但我認爲你的代碼有一個更基本的問題。這就是說,你正在使用「老派」線程睡眠,而不是依靠Rx方法。

試試這個生成您觀察到的:

var observable = 
    Observable 
     .GenerateWithTime(0, i => i <= 1000, i => i + 1, 
      i => i, i => TimeSpan.FromMilliseconds(i % 10 < 5 ? 500 : 1000)) 
     .Timestamp(); 

GenerateWithTime(...)沒有一切,你GenerateAlternatingFastAndSlowEvents方法做了,但它創造的直接觀察到的,並做它用Scheduler.ThreadPool引擎蓋下,這樣你就不需要指定任何調度程序。

+0

因此使用.ToObservable創建一個在當前線程上運行的Observable,因此它運行Generate ..方法的主線程。所以訂閱不會阻塞線程,因爲它忙於執行observable。那麼,如果我使用SubscribeOn並傳遞ThreadPoolScheduler,那麼我看到消息「按任意鍵取消訂閱」是怎麼回事?如果我的線程忙於執行observable,那麼應該不會影響我訂閱的位置。 – Eldar 2011-02-26 10:05:26

+0

@Eldar - 只要您使用'SubscribeOn(Scheduler.ThreadPool)',那麼訂閱和觀察者就會在新線程上運行,這樣'Subscribe'方法會立即返回,您將看到您的消息。只有當您的可觀察計劃使用'CurrentThread'調度程序時,您的'Subscribe'方法和觀察者才能在當前線程上運行。 – Enigmativity 2011-02-28 00:31:28

相關問題