看來我們需要兩個功能:一個啓動所有的異步任務,另一個監視它們並在死亡時重新啓動它們。
第一個可以這樣寫:
withAsyncMany :: [IO t] -> ([Async t] -> IO b) -> IO b
withAsyncMany [] f = f []
withAsyncMany (t:ts) f = withAsync t $ \a -> withAsyncMany ts (f . (a:))
如果我們使用的managed包,我們也可以寫成這樣的:
import Control.Monad.Managed (with,managed)
withAsyncMany' :: [IO t] -> ([Async t] -> IO b) -> IO b
withAsyncMany' = with . traverse (\t -> managed (withAsync t))
重啓功能將循環列表異口同聲地詢問他們的狀況,並在他們失敗時更新他們:
{-# language NumDecimals #-}
import Control.Concurrent (threadDelay)
resurrect :: IO t -> [Async t] -> IO()
resurrect restartAction = go []
where
go ts [] = do
threadDelay 1e6 -- wait a little before the next round of polling
go [] (reverse ts)
go past (a:pending) = do
status <- poll a -- has the task died, or finished?
case status of
Nothing -> go (a:past) pending
Just _ -> withAsync restartAction $ \a' -> go (a':past) pending
但是我擔心很多嵌套的withAsyncs
會導致某種類型的資源泄漏(因爲某些異常處理程序必須與每個withAsync
一起安裝以便在父線程死亡的情況下通知孩子)的可能性。
因此,也許在這種情況下,它會更好產卵工人用普通async
秒,Async
S中的集合存儲到某種可變的參考和安裝在監視線程一個異常處理程序,它會遍歷容器終止每個任務。
好奇:什麼是你的使用情況?這聽起來像是你試圖建立一個Erlang式的監督模型,對於這個模型你可能會更好地服務於像''pipes-concurrency''這樣的演員庫(https://hackage.haskell.org/package/pipes-併發) –
@BenjaminHodgson的用例是當我的webapp啓動時產生一個工作隊列。作業隊列內部產生一個作業輪詢線程和一個作業監聽/通知線程。如果其中任何一個線程死亡,則需要重新生成它們。如果應用程序線程被終止,那麼工作隊列需要被終止。 –