2012-02-08 84 views
5

有時我想爲網絡活動等同時並行運行最大數量的IO操作。我掀起了一個小併發線程函數,它與https://gist.github.com/810920一起工作良好,但這不是真正的池,因爲所有IO操作必須在別人可以開始之前完成。如何創建線程池?

類型的東西我要找的會是這樣的:

runPool :: Int -> [IO a] -> IO [a] 

,應該能夠在有限和無限列表操作。

管道包看起來能夠很好地實現這一點,但我覺得可能有類似的解決方案,我只是提供了使用來自haskell平臺的mvars等的要點。

有沒有人遇到過沒有任何沉重依賴的慣用解決方案?

回答

7

你需要一個線程池,如果你想要的東西短,你可以從Control.ThreadPool獲得靈感(從控制發動機組件還提供更多的一般功能),例如threadPoolIO就是:

threadPoolIO :: Int -> (a -> IO b) -> IO (Chan a, Chan b) 
threadPoolIO nr mutator = do 
    input <- newChan 
    output <- newChan 
    forM_ [1..nr] $ 
     \_ -> forkIO (forever $ do 
      i <- readChan input 
      o <- mutator i 
      writeChan output o) 
    return (input, output) 

它使用兩個Chan與外部通信,但這通常是你想要的,它確實有助於編寫不混亂的代碼。

如果你絕對要包起來在你的類型的函數可以封裝過通信:

runPool :: Int -> [IO a] -> IO [a] 
runPool n as = do 
    (input, output) <- threadPoolIO n (id) 
    forM_ as $ writeChan input 
    sequence (repeat (length as) $ readChan output) 

這會不會讓你的操作的順序,是一個問題(很容易通過傳輸動作的索引來糾正,或者只是使用數組來存儲響應)?

注意:如果您打算在長時間運行的應用程序中創建並清理其中幾個池,那麼n個線程將永遠保持活動狀態,向threadPoolIO添加「killAll」返回的操作可以輕鬆解決此問題(if不是,考慮到Haskell中線程的重量,它可能不值得費神)。 請注意,此函數僅適用於有限列表,這是因爲IO通常是嚴格的,因此如果您真的希望您可以使用IO,則無法在整個列表生成之前開始處理IO [a]的元素懶惰IO與unsafeInterleaveIO(也許不是最好的主意),或完全改變你的模型,並使用類似管道的東西來流式傳輸你的結果。

+1

如果你不太喜歡你的runPool類型,threadPoolIO本身更健壯一些:你可以很容易地在你的程序中的多個地方重複使用它,你可以控制拆分無限列表並提供它並閱讀大塊響應等等...... – Jedai 2012-02-08 19:31:38

+0

'threadPoolIO'看起來不錯。我將查看代碼,看看它是如何實現的,因爲我對創建線程池的最佳方式以及知道哪個Hackage版本受到社區青睞表示興趣。 – 2012-02-09 06:00:57