2016-10-03 265 views
1

當我試圖用CsvHelper向存儲器流寫入非常大量的數據(具有300 000行以上的列表)時,它會拋出異常「System.IO.IOException :流太長了。「無法寫入大量的數據流

數據類相當大,具有〜30個屬性,因此文件中的每個記錄都會有〜30列。

這是例外拋出(此代碼是基於that答案CsvHelper LIB作者的方式)的實際編寫代碼:

using (var memoryStream = new MemoryStream()) 
{ 
    using (var streamWriter = new StreamWriter(memoryStream, encoding ?? Encoding.ASCII)) 
    { 
     var csvWriter = new CsvWriter(streamWriter, GetConfiguration(delimiter, mappingClassType, mappingActions)); 
     csvWriter.WriteRecords(data); //data is IEnumerable<T> and has more than 300k records 

     streamWriter.Flush(); 
     return memoryStream.ToArray(); 
    } 
} 

然後,我所得到的字節數組保存到文件中。

File.WriteAllBytes(filePath, resultedBytesArray); 

請注意,相同的代碼效果很好,當我寫的10萬條記錄到文件(在這種情況下,文件大小爲1GB左右)。順便說一句,我的目標是寫出超過600 000條數據記錄。

這是與此問題相關的堆棧跟蹤的相關部分。

Stream was too long.|System.IO.IOException: Stream was too long. 
at System.IO.MemoryStream.Write(Byte[] buffer, Int32 offset, Int32 count) 
at System.IO.StreamWriter.Flush(Boolean flushStream, Boolean flushEncoder) 
at System.IO.StreamWriter.Write(Char[] buffer, Int32 index, Int32 count) 
at CsvHelper.CsvWriter.NextRecord() in C:\Users\Josh\Projects\CsvHelper\src\CsvHelper\CsvWriter.cs:line 290 
at CsvHelper.CsvWriter.WriteRecords(IEnumerable records) in C:\Users\Josh\Projects\CsvHelper\src\CsvHelper\CsvWriter.cs:line 490 
at FileExport.Csv.CsvDocument.Create[T](IEnumerable`1 data, String delimiter, Encoding encoding, Type mappingClassType, IDictionary`2 mappingActions) in d:\Dev\DrugDevExport\FileExport\Csv\CsvDocument.cs:line 33 

至於我擔心的基本途徑,以實現我的目標和避免的問題是關於幾個部分組成分裂我的寫入數據的列表,並一起將它們連接起來的話,但可能沒有任何漂亮顯而易見的解決方案,沒有重要的代碼重構(如增加默認流/緩衝區大小等)。

另外請記住,我也應用了兩種可能的解決方案,以防止「內存不足」對象異常。

感謝。

+1

你爲什麼寫一個MemoryStream?你是否需要完全在內存中存儲流?你談論文件,但使用MemoryStream ...用FileStream替換它,看看會發生什麼... – spender

+0

您是否嘗試過讀取有限的數據並將其寫入循環中的流?即不是一次全部。你也許可以嘗試類似的方法分塊這篇文章http://stackoverflow.com/questions/2819081/memorystream-and-large-object-heap –

+0

@PaulZahra,我在我的問題中提到的那個,這樣(通過分裂了大量的數據)它很可能會工作,並且它現在可以處理10萬條數據記錄,但是有沒有其他解決方案沒有分裂? –

回答

1

非常感謝Spender,就像他在問題下面的評論中提到的那樣,已經通過用FileStream替換MemoryStream並將數據直接寫入文件來修復它。

在我的情況下,將數據寫入MemoryStream然後在沒有任何原因的情況下再次將其複製到文件中是毫無用處的。感謝him再次打開我的眼睛在這一事實。

我的固定代碼如下。

using (var fileStream = File.Create(path)) 
{ 
    using (var streamWriter = new StreamWriter(fileStream, encoding ?? Encoding.ASCII)) 
    { 
     var csvWriter = new CsvWriter(streamWriter, GetConfiguration(delimiter, mappingClassType, mappingActions)); 
     csvWriter.WriteRecords(data); 
    } 
} 

現在它可以處理任意數量的輸入數據。

1

您可以通過編寫自己的MemoryStream解決2GB的這一限制:

class HugeMemoryStream : Stream 
    { 
     #region Fields 

     private const int PAGE_SIZE = 1024000; 
     private const int ALLOC_STEP = 1024; 

     private byte[][] _streamBuffers; 

     private int _pageCount = 0; 
     private long _allocatedBytes = 0; 

     private long _position = 0; 
     private long _length = 0; 

     #endregion Fields 

     #region Internals 

     private int GetPageCount(long length) 
     { 
      int pageCount = (int)(length/PAGE_SIZE) + 1; 

      if ((length % PAGE_SIZE) == 0) 
       pageCount--; 

      return pageCount; 
     } 

     private void ExtendPages() 
     { 
      if (_streamBuffers == null) 
      { 
       _streamBuffers = new byte[ALLOC_STEP][]; 
      } 
      else 
      { 
       byte[][] streamBuffers = new byte[_streamBuffers.Length + ALLOC_STEP][]; 

       Array.Copy(_streamBuffers, streamBuffers, _streamBuffers.Length); 

       _streamBuffers = streamBuffers; 
      } 

      _pageCount = _streamBuffers.Length; 
     } 

     private void AllocSpaceIfNeeded(long value) 
     { 
      if (value < 0) 
       throw new InvalidOperationException("AllocSpaceIfNeeded < 0"); 

      if (value == 0) 
       return; 

      int currentPageCount = GetPageCount(_allocatedBytes); 
      int neededPageCount = GetPageCount(value); 

      while (currentPageCount < neededPageCount) 
      { 
       if (currentPageCount == _pageCount) 
        ExtendPages(); 

       _streamBuffers[currentPageCount++] = new byte[PAGE_SIZE]; 
      } 

      _allocatedBytes = (long)currentPageCount * PAGE_SIZE; 

      value = Math.Max(value, _length); 

      if (_position > (_length = value)) 
       _position = _length; 
     } 

     #endregion Internals 

     #region Stream 

     public override bool CanRead => true; 

     public override bool CanSeek => true; 

     public override bool CanWrite => true; 

     public override long Length => _length; 

     public override long Position 
     { 
      get { return _position; } 
      set 
      { 
       if (value > _length) 
        throw new InvalidOperationException("Position > Length"); 
       else if (value < 0) 
        throw new InvalidOperationException("Position < 0"); 
       else 
        _position = value; 
      } 
     } 

     public override void Flush() { } 

     public override int Read(byte[] buffer, int offset, int count) 
     { 
      int currentPage = (int)(_position/PAGE_SIZE); 
      int currentOffset = (int)(_position % PAGE_SIZE); 
      int currentLength = PAGE_SIZE - currentOffset; 

      long startPosition = _position; 

      if (startPosition + count > _length) 
       count = (int)(_length - startPosition); 

      while (count != 0 && _position < _length) 
      { 
       if (currentLength > count) 
        currentLength = count; 

       Array.Copy(_streamBuffers[currentPage++], currentOffset, buffer, offset, currentLength); 

       offset += currentLength; 
       _position += currentLength; 
       count -= currentLength; 

       currentOffset = 0; 
       currentLength = PAGE_SIZE; 
      } 

      return (int)(_position - startPosition); 
     } 

     public override long Seek(long offset, SeekOrigin origin) 
     { 
      switch (origin) 
      { 
       case SeekOrigin.Begin: 
        break; 

       case SeekOrigin.Current: 
        offset += _position; 
        break; 

       case SeekOrigin.End: 
        offset = _length - offset; 
        break; 

       default: 
        throw new ArgumentOutOfRangeException("origin"); 
      } 

      return Position = offset; 
     } 

     public override void SetLength(long value) 
     { 
      if (value < 0) 
       throw new InvalidOperationException("SetLength < 0"); 

      if (value == 0) 
      { 
       _streamBuffers = null; 
       _allocatedBytes = _position = _length = 0; 
       _pageCount = 0; 
       return; 
      } 

      int currentPageCount = GetPageCount(_allocatedBytes); 
      int neededPageCount = GetPageCount(value); 

      // Removes unused buffers if decreasing stream length 
      while (currentPageCount > neededPageCount) 
       _streamBuffers[--currentPageCount] = null; 

      AllocSpaceIfNeeded(value); 

      if (_position > (_length = value)) 
       _position = _length; 
     } 

     public override void Write(byte[] buffer, int offset, int count) 
     { 
      int currentPage = (int)(_position/PAGE_SIZE); 
      int currentOffset = (int)(_position % PAGE_SIZE); 
      int currentLength = PAGE_SIZE - currentOffset; 

      long startPosition = _position; 

      AllocSpaceIfNeeded(_position + count); 

      while (count != 0) 
      { 
       if (currentLength > count) 
        currentLength = count; 

       Array.Copy(buffer, offset, _streamBuffers[currentPage++], currentOffset, currentLength); 

       offset += currentLength; 
       _position += currentLength; 
       count -= currentLength; 

       currentOffset = 0; 
       currentLength = PAGE_SIZE; 
      } 
     } 

     #endregion Stream 
    } 
using ICSharpCode.SharpZipLib.GZip; 
using System; 
using System.IO; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 

      // HugeMemoryStrem Test 

      string filename = @"gzip-filename.gz"; 

      HugeMemoryStream ms = new HugeMemoryStream(); 

      using (StreamWriter sw = new StreamWriter(ms, Encoding.UTF8, 16384, true)) 
      using (FileStream fs = new FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read)) 
      using (GZipInputStream gzipStream = new GZipInputStream(fs)) 
      using (StreamReader sr = new StreamReader(gzipStream, Encoding.UTF8, false, 16384, true)) 
      { 
       for (string line = sr.ReadLine(); line != null; line = sr.ReadLine()) 
        sw.WriteLine(line); 
      } 

      ms.Seek(0, SeekOrigin.Begin); 

      using (StreamReader srm = new StreamReader(ms, Encoding.UTF8, false, 16384, true)) 
      using (FileStream fs = new FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read)) 
      using (GZipInputStream gzipStream = new GZipInputStream(fs)) 
      using (StreamReader sr = new StreamReader(gzipStream, Encoding.UTF8, false, 16384, true)) 
      { 
       for (string line1 = sr.ReadLine(), line2 = srm.ReadLine(); line1 != null; line1 = sr.ReadLine(), line2 = srm.ReadLine()) 
       { 
        if (line1 != line2) 
         throw new InvalidDataException(); 
       } 
      }