2013-04-25 104 views
1

我有一個進程,其中我的主線程正在讀取文件並將其分解成多個部分。那些部分則需要進一步處理。我想利用任何可用的線程,以便下游處理儘可能多地利用CPU(或更多的核心)。我不想從主線程創建過多積壓,所以我需要主線程等待添加到隊列中,直到有另一個可用線程。.NET - 阻塞主線程,直到有任何可用線程

我看到像VB.NET 4.0: Looking to execute multiple threads, but wait until all threads are completed before resuming很多文章,但他們都在等待所有線程完成,而我只需要任何線程可用

這是不是我可以用Task Parallel Library解決,還是應該我手動創建線程和監視線程池?

Using Reader As New StreamReader(FileName) 
    Do 
     CurrentBlockSize = Reader.ReadBlock(CurrentBuffer, 0, BufferSize) 

     RunningBuffer &= New String(CurrentBuffer) 

     If RunningBuffer.Contains(RowDelimiter) Then 
      LineParts = RunningBuffer.Split(RowDelimiter) 

      For I As Integer = 0 To LineParts.Count - 1 
       If I < LineParts.Count - 1 Then 

        'Make synchronous call that blocks until' 
        'another thread is available to process the line' 
        AddLineToTheProcessingQueue(CurrentLine) 

       Else 
        RunningBuffer = LineParts(I) 
       End If 
      Next 
     End If 

    Loop While CurrentBlockSize = BufferSize 
End Using 
+0

這幾乎肯定是浪費精力,這種代碼幾乎總是被磁盤綁定的。一個簡單的檢查:重新啓動您的機器並運行單線程版本。如果一個內核的CPU負載不超過50%,那麼添加更多線程無法使其更快。 – 2013-04-25 18:47:56

+0

@HansPassant,下游工作將涉及處理並通過TCP/IP向數據庫發送大量數據,這可能會比磁盤讀取速度慢。這就是我想要的多線程部分 – 2013-04-25 18:59:29

+0

然後拉,不要推送數據。就像大多數程序讀取文件一樣。 – 2013-04-25 19:48:42

回答

1

將此代碼粘貼到新的控制檯應用程序中。

Imports System.Threading 

Module Module1 

    ' I just picked 6 randomly, not sure what is a good strategy for picking this number 
    ' also, not sure what is the difference between a Worker Thread and a Completion thread 
    Const MaxWorkerThreads As Integer = 6 
    Const MaxCompletionPortThreads As Integer = 6 

    Sub Main() 

     ThreadPool.SetMaxThreads(MaxWorkerThreads, MaxCompletionPortThreads) 

     Dim availableWorkerThreads As Integer 
     Dim availableCompletionPortThreads As Integer 

     For i As Integer = 0 To 100 

      ' GetAvailableThreads returns results via output parameters 
      ThreadPool.GetAvailableThreads(availableWorkerThreads, availableCompletionPortThreads) 

      Dim tries As Integer = 0 

      Do While (availableWorkerThreads = 0) 
       ' this loop does not execute if there are available threads 
       ' you may want to add a fail-safe to check "tries" in case the child threads get stuck 
       tries += 1 

       Console.WriteLine(String.Format("waiting to start item {0}, attempt {1}, available threads: {2}, {3}", i, tries, availableWorkerThreads, availableCompletionPortThreads)) 

       ' failure to call Sleep will make your program unresponsive 
       Thread.Sleep(1000) 

       ' call GetAvailableThreads again for the next test at the top of the loop 
       ThreadPool.GetAvailableThreads(availableWorkerThreads, availableCompletionPortThreads) 
      Loop 

      ' this is how you pass parameters to a thread created through QueueUserWorkItem 
      Dim parameters As Object() = {i} 
      ThreadPool.QueueUserWorkItem(AddressOf DoWork, parameters) 
      ' According to MSDN, you must Sleep after calling QueueUserWorkItem, or else the current thread will just exit 
      Thread.Sleep(500) 

     Next 

    End Sub 

    Sub DoWork(parameters As Object()) 
     Dim itemNumber = parameters(0) 
     Dim sleepLength = itemNumber * 1000 
     Console.WriteLine(String.Format("Item: {0} - sleeping for {1} miliseconds.", itemNumber, sleepLength)) 
     Thread.Sleep(sleepLength) 
     Console.WriteLine(String.Format("Item: {0} - done sleeping.", itemNumber)) 
    End Sub 

End Module 
0

我不知道到底爲什麼做你想做的事,但你可以實現通過BlockingCollectionBoundedCapacity集的數據流塊非常類似的東西。

例如,如果您將容量設置爲1並且此時您的消費者正忙,您將無法向隊列中添加第二個項目,直到其中一個消費者完成其當前工作並移除該項目從隊列中。這兩個版本都可以讓您等待,直到您可以將其他項添加到隊列中。

+0

是的,我昨天開始閱讀TPL數據流和有限容量,非常有希望。經過一些測試後我會報告。您是否擁有帶有限容量的數據流塊的示例代碼? – 2013-04-25 15:10:41

+0

@TomHalladay你可以看看[這組測試](https://github.com/mono/mono/blob/master/mcs/class/System.Threading.Tasks.DataStatus/Test/System.Threading.Tasks。 Dataflow/BoundedCapacityTest.cs)在TDF的單聲道版本中。 – svick 2013-04-25 16:05:22