回答
使用額外的庫,如果你使用Parallel.ForEach
從不需要內置TPL,寫一對夫婦普查員(如下所示)。您的代碼可以是這樣的:
using (var input = new StreamReader(File.OpenRead(@"c:\path\to\my\file.txt")))
{
Parallel.ForEach(
input.ReadLines().TakeChunks(100),
new ParallelOptions() { MaxDegreeOfParallelism = 8 /* better be number of CPU cores */ },
batchOfLines => {
DoMyProcessing(batchOfLines);
});
}
這個工作,你需要IEnumerable<T>
一對夫婦的擴展方法和幾個普查員,定義如下:
public static class EnumerableExtensions
{
public static IEnumerable<string> ReadLines(this StreamReader input)
{
return new LineReadingEnumerable(input);
}
public static IEnumerable<IReadOnlyList<T>> TakeChunks<T>(this IEnumerable<T> source, int length)
{
return new ChunkingEnumerable<T>(source, length);
}
public class LineReadingEnumerable : IEnumerable<string>
{
private readonly StreamReader _input;
public LineReadingEnumerable(StreamReader input)
{
_input = input;
}
public IEnumerator<string> GetEnumerator()
{
return new LineReadingEnumerator(_input);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
public class LineReadingEnumerator : IEnumerator<string>
{
private readonly StreamReader _input;
private string _current;
public LineReadingEnumerator(StreamReader input)
{
_input = input;
}
public void Dispose()
{
_input.Dispose();
}
public bool MoveNext()
{
_current = _input.ReadLine();
return (_current != null);
}
public void Reset()
{
throw new NotSupportedException();
}
public string Current
{
get { return _current; }
}
object IEnumerator.Current
{
get { return _current; }
}
}
public class ChunkingEnumerable<T> : IEnumerable<IReadOnlyList<T>>
{
private readonly IEnumerable<T> _inner;
private readonly int _length;
public ChunkingEnumerable(IEnumerable<T> inner, int length)
{
_inner = inner;
_length = length;
}
public IEnumerator<IReadOnlyList<T>> GetEnumerator()
{
return new ChunkingEnumerator<T>(_inner.GetEnumerator(), _length);
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
}
public class ChunkingEnumerator<T> : IEnumerator<IReadOnlyList<T>>
{
private readonly IEnumerator<T> _inner;
private readonly int _length;
private IReadOnlyList<T> _current;
private bool _endOfInner;
public ChunkingEnumerator(IEnumerator<T> inner, int length)
{
_inner = inner;
_length = length;
}
public void Dispose()
{
_inner.Dispose();
_current = null;
}
public bool MoveNext()
{
var currentBuffer = new List<T>();
while (currentBuffer.Count < _length && !_endOfInner)
{
if (!_inner.MoveNext())
{
_endOfInner = true;
break;
}
currentBuffer.Add(_inner.Current);
}
if (currentBuffer.Count > 0)
{
_current = currentBuffer;
return true;
}
_current = null;
return false;
}
public void Reset()
{
_inner.Reset();
_current = null;
_endOfInner = false;
}
public IReadOnlyList<T> Current
{
get
{
if (_current != null)
{
return _current;
}
throw new InvalidOperationException();
}
}
object IEnumerator.Current
{
get
{
return this.Current;
}
}
}
}
這裏要提到的一件重要事情是,當循環完成時'Parallel.ForEach'將會返回,在長時間運行的操作中,這可能會阻塞UI線程。一種解決這個問題的方法是在初始調用方法上分離出一項新任務。 – ColinM
除非我遺漏了某些東西,否則此分塊實現不是線程安全的。 –
它不是線程安全的@Eric J,它不一定是。 Parallel.ForEach負責處理線程安全性,同時在並行線程之間劃分枚舉(查看System.Collections.Concurrent.Partitioner及其嵌套類InternalPartitionEnumerable和InternalPartitionEnumerator)。 –
使用MoreLinq的Batch
方法,這將創建一個IEnumerable<string>
的集合,其中包含行批量大小爲100,它將爲每100行旋轉一個新任務。
這是一個基本的實現,使用Semaphore
可以在任何給定的時間只運行一定數量的任務,並且還可以看到什麼開銷File.ReadAllLines
將具有500,000,000行的性能。
public class FileProcessor
{
public async Task ProcessFile()
{
List<Task> tasks = new List<Task>();
var lines = File.ReadAllLines("File.txt").Batch(100);
foreach (IEnumerable<string> linesBatch in lines)
{
IEnumerable<string> localLinesBatch = linesBatch;
Task task = Task.Factory.StartNew(() =>
{
// Perform operation on localLinesBatch
});
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
}
public static class LinqExtensions
{
public static IEnumerable<IEnumerable<TSource>> Batch<TSource>(
this IEnumerable<TSource> source, int size)
{
TSource[] bucket = null;
var count = 0;
foreach (var item in source)
{
if (bucket == null)
bucket = new TSource[size];
bucket[count++] = item;
if (count != size)
continue;
yield return bucket;
bucket = null;
count = 0;
}
if (bucket != null && count > 0)
yield return bucket.Take(count);
}
}
- 1. 批處理文件「多個變量」
- 2. Dos創建批處理文件並運行多個C++程序
- 3. 多線程文件處理和數據庫批量插入
- 4. Spring批處理 - 如何在多個線程中讀取一個大文件?
- 5. 從另一個批處理文件運行批處理文件
- 6. C#System.Diagnostics.Process如何處理多個批處理文件
- 7. 從批處理文件中批量讀取多行
- 8. 閱讀並處理大量帶多線程的文件
- 9. 批處理文件變量
- 10. MOVE批量處理文件
- 11. 如何使用批處理文件運行多個程序
- 12. 從C#執行批處理文件#
- 13. 在C#中執行批處理文件#
- 14. 在c#中執行批處理文件
- 15. Java和多行批處理文件
- 16. 批處理文件不能處理大量數據嗎?
- 17. 在c#中並行讀取和處理100個文本文件
- 18. 從一個批處理文件,子文件夾中執行多個批處理文件
- 19. 多線程批處理文件傳遞參數
- 20. spring批處理多線程文件讀取
- 21. Spring批處理多線程拋出java.lang.Thread.State
- 22. 批處理文件到多個目錄
- 23. 拖放多個文件到批處理
- 24. 批處理文件打開多個URL
- 25. 遠程執行批處理文件java
- 26. PSEXEC運行遠程批處理文件
- 27. 批處理文件執行
- 28. 運行批處理文件
- 29. 運行批處理文件
- 30. 編程批處理文件
對於批處理,看看這裏http://stackoverflow.com/a/13731823/5062791 – ColinM
http://cc.davelozinski.com/c-sharp/the-fastest-way-to-read-and-process-text-files –
編寫代碼和然後比較它們......或者更好地閱讀SO文檔如何提出一個好問題...... –