2016-07-26 86 views
3

我有這一小段代碼,模擬使用大對象(那巨大的byte[])的流量。對於序列中的每個項目,調用一個異步方法來獲得一些結果。問題?事實上,它會拋出OutOfMemoryException無效擴展SelectMany與大對象

代碼LINQPad(C#程序)兼容:

void Main() 
{ 
    var selectMany = Enumerable.Range(1, 100) 
        .Select(i => new LargeObject(i)) 
        .ToObservable() 
        .SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o))); 

    selectMany 
     .Subscribe(r => Console.WriteLine(r)); 
} 


private static async Task<int> DoSomethingAsync(LargeObject lo) 
{ 
    await Task.Delay(10000); 
    return lo.Id; 
} 

internal class LargeObject 
{ 
    public int Id { get; } 

    public LargeObject(int id) 
    { 
     this.Id = id; 
    } 

    public byte[] Data { get; } = new byte[10000000]; 
} 

似乎它創建的同時所有對象。我該如何正確地做到這一點?

其基本思想是調用DoSomethingAsync以獲得每個對象的一些結果,所以這就是爲什麼我使用SelectMany。爲了簡化,我只介紹了一個Task.Delay,但在現實生活中它是一個可以同時處理一些項目的服務,所以我想引入一些併發機制來獲得它的優勢。

請注意,從理論上講,處理少量項目的時間不應該填滿內存。實際上,我們只需要每個「大對象」來獲取DoSomethingAsync方法的結果。在那之後,大對象不再被使用。

+0

我不知道你的問題是否與你的測試代碼(Enumerable.Range'急切地創建所有大對象),或者你在生產中看到這個問題?無論哪種方式,如果某個序列創建了許多LargeObjects,並且它們仍在使用,那麼不能被GC'ed,那麼您會得到一個OOM異常。 –

回答

4

我覺得我是repeating myself。與您最後一個問題和我的最後一個答案類似,您需要做的是限制要併發創建的bigObjects™的數量。

爲此,您需要將對象創建和處理合併到一個線程池中。現在的問題是,我們使用異步方法來允許線程在我們的異步方法運行時執行其他操作。由於緩慢的網絡調用是異步的,因此您的(快速)對象創建代碼將不斷創建大型對象。

取而代之,我們可以使用Rx通過將對象創建與異步調用相結合來保持併發觀察對象的數量,並使用.Merge(maxConcurrent)來限制併發性。

作爲獎勵,我們還可以設置查詢執行的最短時間。只需要Zip以最少的延遲。

static void Main() 
{ 
    var selectMany = Enumerable.Range(1, 100) 
         .ToObservable() 
         .Select(i => Observable.Defer(() => Observable.Return(new LargeObject(i))) 
          .SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o))) 
          .Zip(Observable.Timer(TimeSpan.FromMilliseconds(400)), (el, _) => el) 
         ).Merge(4); 

    selectMany 
     .Subscribe(r => Console.WriteLine(r)); 

    Console.ReadLine(); 
} 


private static async Task<int> DoSomethingAsync(LargeObject lo) 
{ 
    await Task.Delay(10000); 
    return lo.Id; 
} 

internal class LargeObject 
{ 
    public int Id { get; } 

    public LargeObject(int id) 
    { 
     this.Id = id; 
     Console.WriteLine(id + "!"); 
    } 

    public byte[] Data { get; } = new byte[10000000]; 
} 
0

您可以引入一個時間間隔延遲是這樣的:

var source = Enumerable.Range(1, 100) 
    .ToObservable() 
    .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (i, ts) => i) 
    .Select(i => new LargeObject(i)) 
    .SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o))); 

所以不要扯所有100個整數一次,立即將它們轉換爲LargeObject則呼籲所有100 DoSomethingAsync的,它淌的整數出來一個接一個地隔開一秒鐘。


這就是TPL + Rx解決方案的樣子。不用說,它比單獨的Rx或TPL單獨不那麼優雅。不過,我不認爲這個問題是非常適合的Rx:

void Main() 
{ 
    var source = Observable.Range(1, 100); 

    const int MaxParallelism = 5; 
    var transformBlock = new TransformBlock<int, int>(async i => await DoSomethingAsync(new LargeObject(i)), 
     new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = MaxParallelism }); 
    source.Subscribe(transformBlock.AsObserver()); 
    var selectMany = transformBlock.AsObservable(); 

    selectMany 
     .Subscribe(r => Console.WriteLine(r)); 
} 
+0

我可以理解這可能在實踐中有效,但選擇1秒延遲是任意的,並且仍然可以允許內存不足錯誤或者它可以顯着減慢計算。這不是一個可靠的解決方案。 – Enigmativity

+0

編輯添加TPL答案。 Rx不會在這裏發光。 – Shlomo

2

看來,它創建在同一時間的所有對象。

是的,因爲您一次都創建它們。

如果我簡化你的代碼,我可以告訴你爲什麼:

void Main() 
{ 
    var selectMany = 
     Enumerable 
      .Range(1, 5) 
      .Do(x => Console.WriteLine($"{x}!")) 
      .ToObservable() 
      .SelectMany(i => Observable.FromAsync(() => DoSomethingAsync(i))); 

    selectMany 
     .Subscribe(r => Console.WriteLine(r)); 
} 

private static async Task<int> DoSomethingAsync(int i) 
{ 
    await Task.Delay(1); 
    return i; 
} 

運行該生產:

 
1! 
2! 
3! 
4! 
5! 
4 
3 
5 
2 
1 

因爲Observable.FromAsync的你是允許源之前,任何運行至結束結果返回。換句話說,您正在快速構建所有大型對象,但要慢慢處理它們。

您應該允許Rx同步運行,但在默認調度程序上,以便您的主線程不被阻塞。代碼將運行,沒有任何內存問題,您的程序將在主線程上保持響應。

下面是這個代碼:我試圖測試其他選項

var selectMany = 
    Observable 
     .Range(1, 100, Scheduler.Default) 
     .Select(i => new LargeObject(i)) 
     .Select(o => DoSomethingAsync(o)) 
     .Select(t => t.Result); 

(我已經有效地代替Enumerable.Range(1, 100).ToObservable()Observable.Range(1, 100)爲也將有一些問題,幫助)

,但如此遠遠地允許DoSomethingAsync以異步方式運行的任何內容都會出現內存不足錯誤。

+0

感謝您的回答,@Enigmativity,但我想我錯過了一些東西。我正在調用的異步方法是一個可以同時處理項目的遠程服務。在處理另一個物品之前等待一個物品處理是不理想的。你認爲我可以同時處理多個項目(3個或4個)以獲得併發優勢而不會遇到內存問題嗎? – SuperJMN

+1

@SuperJMN,如果您嘗試創建比您可以分配更多的'LargeObject',則會收到OOM異常。這與Rx有關。如果它們需要獨立於正在執行的DoSomethingAsync被創建,那麼你有麻煩了。在我看來,你實際上想要堅持到隊列中並突破Rx。 –

+1

@SuperJMN - 我嘗試了一堆東西讓它扼殺處理,但它不工作。一次處理一個對象時效率不高,但它具有高效的內存。這取決於你要達到的效率。對於計算效率來說,Shlomo的回答更加糟糕。如果我想到什麼,我會讓你知道。 – Enigmativity

0

ConcatMap支持開箱即用。我知道這個運算符在.net中不可用,但是你可以使用Concat運算符來完成相同的操作,這個運算符會延遲訂閱每個內部源,直到前一個完成。

+0

您可以發佈ConcatMap操作符的代碼和一些示例嗎?謝謝! – SuperJMN

+0

http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/ –

+0

http://reactivex.io/documentation/operators/concat.html –