2017-02-16 88 views
0

我有一些可觀察到的順序執行所有觀察員後完成操作,例如:可觀察到的序列

var period = TimeSpan.FromSeconds(0.5); 
var observable = Observable 
    .Interval(period) 
    .Publish() 
    .RefCount(); 

我想在後臺線程上執行一些艱苦的計算該序列的元素,並進行一些最後的動作當所有的計算完成時。所以我想要這樣的事情:

observable.ObserveOn(Scheduler.Default).Subscribe(i => ComplexComputation1(i)); 
observable.ObserveOn(Scheduler.Default).Subscribe(i => ComplexComputation2(i)); 
// next observer must be called only after ComplexComputation1/2 complete on input i 
observable.Subscribe(i => FinalAction(i)); 

我可以在Rx做這個嗎?或者,這可能違反了反應式編程的一些原則,我應該在這種情況下采用另一種方法?

回答

2

在反應模式中計算有序的序列是非常危險的。

你可以做的一件事就是在複雜的計算完成後發出一個事件。然後,您可以讓一位消費觀察員在收到前面步驟完成的消息後執行他的計算。


另一個可能的解決方案是創建一個具體的序列塊,定期發射。這降低了解決方案的並行性。

observable.ObserveOn(Scheduler.Default).Subscribe(i => 
{  
    ComplexComputation1(i)); 
    ComplexComputation2(i)); 
    FinalAction(i); 
} 
+0

我想過這種方法。但實際上,我不知道有多少觀察者會處理每個元素。這個解決方案能應用於這種情況嗎? –

+0

我添加了序列塊方法作爲可能的解決方案。 –

1

爲了驗證這一點,我創建了下面的方法來幫助說明事件的順序:

public void ComplexComputation1(long i) 
{ 
    Console.WriteLine("Begin ComplexComputation1"); 
    Thread.Sleep(100); 
    Console.WriteLine("End ComplexComputation1"); 
} 

public void ComplexComputation2(long i) 
{ 
    Console.WriteLine("Begin ComplexComputation2"); 
    Thread.Sleep(100); 
    Console.WriteLine("End ComplexComputation2"); 
} 

public void FinalAction(long i) 
{ 
    Console.WriteLine("Begin FinalAction"); 
    Thread.Sleep(100); 
    Console.WriteLine("End FinalAction"); 
} 

你原來的代碼跑這樣的:

 
Begin FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
End FinalAction 
End ComplexComputation1 
Begin FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End FinalAction 
End ComplexComputation2 
End ComplexComputation1 
Begin FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
End ComplexComputation1 
End FinalAction 
... 

這很容易執行代碼在單個後臺線程上依次運行。只需使用EventLoopScheduler即可。

var els = new EventLoopScheduler(); 

observable.ObserveOn(els).Subscribe(i => ComplexComputation1(i)); 
observable.ObserveOn(els).Subscribe(i => ComplexComputation2(i)); 
// next observer must be called only after ComplexComputation1/2 complete on input i 
observable.ObserveOn(els).Subscribe(i => FinalAction(i)); 

這給:

 
Begin ComplexComputation1 
End ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation1 
End ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation1 
End ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 

但只要你介紹Scheduler.Default這不起作用。

的更多或更少的簡單的選擇是要做到這一點:

var cc1s = observable.ObserveOn(Scheduler.Default).Select(i => { ComplexComputation1(i); return Unit.Default; }); 
var cc2s = observable.ObserveOn(Scheduler.Default).Select(i => { ComplexComputation2(i); return Unit.Default; }); 

observable.Zip(cc1s.Zip(cc2s, (cc1, cc2) => Unit.Default), (i, cc) => i).Subscribe(i => FinalAction(i)); 

按預期工作。

你得到一個不錯的順序是這樣的:

 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation1 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation2 
Begin ComplexComputation1 
End ComplexComputation2 
End ComplexComputation1 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
End ComplexComputation1 
Begin FinalAction 
End FinalAction 
0

這似乎像組成一個簡單的例子嵌套觀察到被夷爲平地(的SelectMany /合併/的毗連)和Zip

在這裏,我已經採取假定Long Running方法的自由返回Task。 但是,如果他們不,那麼慢阻塞同步方法可以用Observable.Start(()=>ComplexComputation1(x))包裝。

void Main() 
{ 
    var period = TimeSpan.FromSeconds(0.5); 
    var observable = Observable 
     .Interval(period) 
     .Publish() 
     .RefCount(); 

    var a = observable.Select(i => ComplexComputation1(i).ToObservable()) 
       .Concat(); 
    var b = observable.Select(i => ComplexComputation2(i).ToObservable()) 
       .Concat(); 

    a.Zip(b, Tuple.Create) 
     .Subscribe(pair => FinalAction(pair.Item1, pair.Item2)); 
} 

// Define other methods and classes here 
Random rnd = new Random(); 
private async Task<long> ComplexComputation1(long i) 
{ 
    await Task.Delay(rnd.Next(50, 1000)); 
    return i; 
} 
private async Task<long> ComplexComputation2(long i) 
{ 
    await Task.Delay(rnd.Next(50, 1000)); 
    return i; 
} 

private void FinalAction(long a, long b) 
{ 

}