如何用RX做生產者 - 消費者?如何用RX做適當的生產者 - 消費者模式
我發現this答案,它使用Java的調度限制觀察者OnNext調用concurent的數量 - 所以我可以做在C#中類似的東西,但我肯定是有另一種方式來做到這一點。在那兒?
此外,我發現奇怪的是,標準主題不等待OnNext結束,它只是立即啓動。
有什麼想法?
如何用RX做生產者 - 消費者?如何用RX做適當的生產者 - 消費者模式
我發現this答案,它使用Java的調度限制觀察者OnNext調用concurent的數量 - 所以我可以做在C#中類似的東西,但我肯定是有另一種方式來做到這一點。在那兒?
此外,我發現奇怪的是,標準主題不等待OnNext結束,它只是立即啓動。
有什麼想法?
這是關於現在的調度程序,但歷史上用於強制執行到.OnNext(...)
的串行調用的主題 - 問題是如果主題包含線程同步代碼,許多非線程情況下的性能受到影響。所以科目更加高效,但允許同時撥打.OnNext(...)
。
但是,您可以通過一些適當的調度程序或發佈調用來解決此問題。
讓我們看看一些代碼,開始使用這些方法:
public void Foo()
{
Console.WriteLine("Foo Start");
Thread.Sleep(5000);
Console.WriteLine("Foo End");
}
public void Bar()
{
Console.WriteLine("Bar Start");
Thread.Sleep(5000);
Console.WriteLine("Bar End");
}
如果我現在這樣寫:
var subject = new Subject<Unit>();
subject.Subscribe(u => Foo());
subject.Subscribe(u => Bar());
subject.OnNext(Unit.Default);
subject.OnNext(Unit.Default);
我得到如下:
Foo Start Foo End Bar Start Bar End Foo Start Foo End Bar Start Bar End
此代碼是運行在即時調度程序上 - 它必須等待每個.OnNext(...)
調用完成後再繼續。
有了這個代碼:
var subject = new Subject<Unit>();
var query = subject.ObserveOn(Scheduler.Default);
query.Subscribe(u => Foo());
query.Subscribe(u => Bar());
subject.OnNext(Unit.Default);
subject.OnNext(Unit.Default);
我現在得到:
Foo Start Bar Start Foo End Foo Start Bar End Bar Start Foo End Bar End
的調度程序現在可以自由地使用線程池,因此時間表同時調用。
現在,如果我們採取同樣的代碼,但.Publish()
它可以追溯到第一行爲查詢:
var subject = new Subject<Unit>();
var query = subject.ObserveOn(Scheduler.Default).Publish();
query.Subscribe(u => Foo());
query.Subscribe(u => Bar());
query.Connect();
subject.OnNext(Unit.Default);
subject.OnNext(Unit.Default);
最後,如果我們回到原來的代碼,但使用EventLoopScheduler
時間表,然後我們正在使用單個後臺線程,以便再次調用串口。
var loop = new EventLoopScheduler();
var subject = new Subject<Unit>();
var query = subject.ObserveOn(loop);
query.Subscribe(u => Foo());
query.Subscribe(u => Bar());
subject.OnNext(Unit.Default);
subject.OnNext(Unit.Default);
是的,但這不是完全「消費者」,它會處理每個項目一次 –