2017-10-05 66 views
0

如何用RX做生產者 - 消費者?如何用RX做適當的生產者 - 消費者模式

我發現this答案,它使用Java的調度限制觀察者OnNext調用concurent的數量 - 所以我可以做在C#中類似的東西,但我肯定是有另一種方式來做到這一點。在那兒?

此外,我發現奇怪的是,標準主題不等待OnNext結束,它只是立即啓動。

有什麼想法?

回答

0

這是關於現在的調度程序,但歷史上用於強制執行到.OnNext(...)的串行調用的主題 - 問題是如果主題包含線程同步代碼,許多非線程情況下的性能受到影響。所以科目更加高效,但允許同時撥打.OnNext(...)

但是,您可以通過一些適當的調度程序或發佈調用來解決此問題。

讓我們看看一些代碼,開始使用這些方法:

public void Foo() 
{ 
    Console.WriteLine("Foo Start"); 
    Thread.Sleep(5000); 
    Console.WriteLine("Foo End"); 
} 

public void Bar() 
{ 
    Console.WriteLine("Bar Start"); 
    Thread.Sleep(5000); 
    Console.WriteLine("Bar End"); 
} 

如果我現在這樣寫:

var subject = new Subject<Unit>(); 

subject.Subscribe(u => Foo()); 
subject.Subscribe(u => Bar()); 

subject.OnNext(Unit.Default); 
subject.OnNext(Unit.Default); 

我得到如下:

 
Foo Start 
Foo End 
Bar Start 
Bar End 
Foo Start 
Foo End 
Bar Start 
Bar End 

此代碼是運行在即時調度程序上 - 它必須等待每個.OnNext(...)調用完成後再繼續。

有了這個代碼:

var subject = new Subject<Unit>(); 
var query = subject.ObserveOn(Scheduler.Default); 

query.Subscribe(u => Foo()); 
query.Subscribe(u => Bar()); 

subject.OnNext(Unit.Default); 
subject.OnNext(Unit.Default); 

我現在得到:

 
Foo Start 
Bar Start 
Foo End 
Foo Start 
Bar End 
Bar Start 
Foo End 
Bar End 

的調度程序現在可以自由地使用線程池,因此時間表同時調用。

現在,如果我們採取同樣的代碼,但.Publish()它可以追溯到第一行爲查詢:

var subject = new Subject<Unit>(); 
var query = subject.ObserveOn(Scheduler.Default).Publish(); 

query.Subscribe(u => Foo()); 
query.Subscribe(u => Bar()); 

query.Connect(); 

subject.OnNext(Unit.Default); 
subject.OnNext(Unit.Default); 

最後,如果我們回到原來的代碼,但使用EventLoopScheduler時間表,然後我們正在使用單個後臺線程,以便再次調用串口。

var loop = new EventLoopScheduler(); 
var subject = new Subject<Unit>(); 
var query = subject.ObserveOn(loop); 

query.Subscribe(u => Foo()); 
query.Subscribe(u => Bar()); 

subject.OnNext(Unit.Default); 
subject.OnNext(Unit.Default); 
+0

是的,但這不是完全「消費者」,它會處理每個項目一次 –