2016-11-25 121 views
-5

我有一個包含500.000.000行的文件。c#多線程處理大批量文件行100個

這些行是最多10個字符的字符串。

如何使用多線程處理此文件並以100個批次處理?

+0

對於批處理,看看這裏http://stackoverflow.com/a/13731823/5062791 – ColinM

+0

http://cc.davelozinski.com/c-sharp/the-fastest-way-to-read-and-process-text-files –

+1

編寫代碼和然後比較它們......或者更好地閱讀SO文檔如何提出一個好問題...... –

回答

1

使用額外的庫,如果你使用Parallel.ForEach從不需要內置TPL,寫一對夫婦普查員(如下所示)。您的代碼可以是這樣的:

using (var input = new StreamReader(File.OpenRead(@"c:\path\to\my\file.txt"))) 
{ 
    Parallel.ForEach(
     input.ReadLines().TakeChunks(100), 
     new ParallelOptions() { MaxDegreeOfParallelism = 8 /* better be number of CPU cores */ }, 
     batchOfLines => { 
      DoMyProcessing(batchOfLines); 
     }); 
} 

這個工作,你需要IEnumerable<T>一對夫婦的擴展方法和幾個普查員,定義如下:

public static class EnumerableExtensions 
{ 
    public static IEnumerable<string> ReadLines(this StreamReader input) 
    { 
     return new LineReadingEnumerable(input); 
    } 

    public static IEnumerable<IReadOnlyList<T>> TakeChunks<T>(this IEnumerable<T> source, int length) 
    { 
     return new ChunkingEnumerable<T>(source, length); 
    } 

    public class LineReadingEnumerable : IEnumerable<string> 
    { 
     private readonly StreamReader _input; 

     public LineReadingEnumerable(StreamReader input) 
     { 
      _input = input; 
     } 
     public IEnumerator<string> GetEnumerator() 
     { 
      return new LineReadingEnumerator(_input); 
     } 
     IEnumerator IEnumerable.GetEnumerator() 
     { 
      return GetEnumerator(); 
     } 
    } 

    public class LineReadingEnumerator : IEnumerator<string> 
    { 
     private readonly StreamReader _input; 
     private string _current; 

     public LineReadingEnumerator(StreamReader input) 
     { 
      _input = input; 
     } 
     public void Dispose() 
     { 
      _input.Dispose(); 
     } 
     public bool MoveNext() 
     { 
      _current = _input.ReadLine(); 
      return (_current != null); 
     } 
     public void Reset() 
     { 
      throw new NotSupportedException(); 
     } 
     public string Current 
     { 
      get { return _current; } 
     } 
     object IEnumerator.Current 
     { 
      get { return _current; } 
     } 
    } 

    public class ChunkingEnumerable<T> : IEnumerable<IReadOnlyList<T>> 
    { 
     private readonly IEnumerable<T> _inner; 
     private readonly int _length; 

     public ChunkingEnumerable(IEnumerable<T> inner, int length) 
     { 
      _inner = inner; 
      _length = length; 
     } 
     public IEnumerator<IReadOnlyList<T>> GetEnumerator() 
     { 
      return new ChunkingEnumerator<T>(_inner.GetEnumerator(), _length); 
     } 
     IEnumerator IEnumerable.GetEnumerator() 
     { 
      return this.GetEnumerator(); 
     } 
    } 

    public class ChunkingEnumerator<T> : IEnumerator<IReadOnlyList<T>> 
    { 
     private readonly IEnumerator<T> _inner; 
     private readonly int _length; 
     private IReadOnlyList<T> _current; 
     private bool _endOfInner; 

     public ChunkingEnumerator(IEnumerator<T> inner, int length) 
     { 
      _inner = inner; 
      _length = length; 
     } 
     public void Dispose() 
     { 
      _inner.Dispose(); 
      _current = null; 
     } 
     public bool MoveNext() 
     { 
      var currentBuffer = new List<T>(); 

      while (currentBuffer.Count < _length && !_endOfInner) 
      { 
       if (!_inner.MoveNext()) 
       { 
        _endOfInner = true; 
        break; 
       } 

       currentBuffer.Add(_inner.Current); 
      } 

      if (currentBuffer.Count > 0) 
      { 
       _current = currentBuffer; 
       return true; 
      } 

      _current = null; 
      return false; 
     } 
     public void Reset() 
     { 
      _inner.Reset(); 
      _current = null; 
      _endOfInner = false; 
     } 
     public IReadOnlyList<T> Current 
     { 
      get 
      { 
       if (_current != null) 
       { 
        return _current; 
       } 

       throw new InvalidOperationException(); 
      } 
     } 
     object IEnumerator.Current 
     { 
      get 
      { 
       return this.Current; 
      } 
     } 
    } 
} 
+0

這裏要提到的一件重要事情是,當循環完成時'Parallel.ForEach'將會返回,在長時間運行的操作中,這可能會阻塞UI線程。一種解決這個問題的方法是在初始調用方法上分離出一項新任務。 – ColinM

+0

除非我遺漏了某些東西,否則此分塊實現不是線程安全的。 –

+0

它不是線程安全的@Eric J,它不一定是。 Parallel.ForEach負責處理線程安全性,同時在並行線程之間劃分枚舉(查看System.Collections.Concurrent.Partitioner及其嵌套類InternalPartitionEnumerable和InternalPartitionEnumerator)。 –

2

使用MoreLinq的Batch方法,這將創建一個IEnumerable<string>的集合,其中包含行批量大小爲100,它將爲每100行旋轉一個新任務。

這是一個基本的實現,使用Semaphore可以在任何給定的時間只運行一定數量的任務,並且還可以看到什麼開銷File.ReadAllLines將具有500,000,000行的性能。

public class FileProcessor 
{ 
    public async Task ProcessFile() 
    { 
     List<Task> tasks = new List<Task>(); 
     var lines = File.ReadAllLines("File.txt").Batch(100); 
     foreach (IEnumerable<string> linesBatch in lines) 
     { 
      IEnumerable<string> localLinesBatch = linesBatch; 
      Task task = Task.Factory.StartNew(() => 
      { 
       // Perform operation on localLinesBatch 
      }); 
      tasks.Add(task); 
     } 

     await Task.WhenAll(tasks); 
    } 
} 

public static class LinqExtensions 
{ 
    public static IEnumerable<IEnumerable<TSource>> Batch<TSource>(
       this IEnumerable<TSource> source, int size) 
    { 
     TSource[] bucket = null; 
     var count = 0; 

     foreach (var item in source) 
     { 
      if (bucket == null) 
       bucket = new TSource[size]; 

      bucket[count++] = item; 
      if (count != size) 
       continue; 

      yield return bucket; 

      bucket = null; 
      count = 0; 
     } 

     if (bucket != null && count > 0) 
      yield return bucket.Take(count); 
    } 
}