2017-10-16 58 views
2

async package的文檔鏈接描述withAsync功能:如何父異步多孩子異步操作

菌種在一個單獨的線程異步操作,並通過其異步 手柄所提供的功能。當函數返回或拋出一個異常時,在Async上調用uninterruptibleCancel。這是異步的一個有用的變體,確保Async永遠不會無意中保持運行 。

我在那個一直盯着過去2小時,一直無法弄清楚如何啓動一個監視線程,會派生多個工作線程,使得:

  • 如果監視器線程死亡,所有工作線程應該被終止,
  • 但是,如果任何工作線程死亡,則其他工作線程不應受影響。應該通知監視器,它應該能夠重新啓動工作線程。
+1

好奇:什麼是你的使用情況?這聽起來像是你試圖建立一個Erlang式的監督模型,對於這個模型你可能會更好地服務於像''pipes-concurrency''這樣的演員庫(https://hackage.haskell.org/package/pipes-併發) –

+0

@BenjaminHodgson的用例是當我的webapp啓動時產生一個工作隊列。作業隊列內部產生一個作業輪詢線程和一個作業監聽/通知線程。如果其中任何一個線程死亡,則需要重新生成它們。如果應用程序線程被終止,那麼工作隊列需要被終止。 –

回答

2

看來我們需要兩個功能:一個啓動所有的異步任務,另一個監視它們並在死亡時重新啓動它們。

第一個可以這樣寫:

withAsyncMany :: [IO t] -> ([Async t] -> IO b) -> IO b 
withAsyncMany []  f = f [] 
withAsyncMany (t:ts) f = withAsync t $ \a -> withAsyncMany ts (f . (a:)) 

如果我們使用的managed包,我們也可以寫成這樣的:

import Control.Monad.Managed (with,managed) 

withAsyncMany' :: [IO t] -> ([Async t] -> IO b) -> IO b 
withAsyncMany' = with . traverse (\t -> managed (withAsync t)) 

重啓功能將循環列表異口同聲地詢問他們的狀況,並在他們失敗時更新他們:

{-# language NumDecimals #-} 
import Control.Concurrent (threadDelay) 

resurrect :: IO t -> [Async t] -> IO() 
resurrect restartAction = go [] 
    where 
    go ts [] = do 
     threadDelay 1e6 -- wait a little before the next round of polling 
     go [] (reverse ts) 
    go past (a:pending) = do 
     status <- poll a -- has the task died, or finished? 
     case status of 
      Nothing -> go (a:past) pending 
      Just _ -> withAsync restartAction $ \a' -> go (a':past) pending 

但是我擔心很多嵌套的withAsyncs會導致某種類型的資源泄漏(因爲某些異常處理程序必須與每個withAsync一起安裝以便在父線程死亡的情況下通知孩子)的可能性。

因此,也許在這種情況下,它會更好產卵工人用普通async秒,Async S中的集合存儲到某種可變的參考和安裝在監視線程一個異常處理程序,它會遍歷容器終止每個任務。

0

這是另一個答案,它使用async而不是withAsync。主要功能是

monitor :: Int -> IO() -> IO() 
monitor count task = 
    bracket (do asyncs <- replicateM count (async task) 
       newIORef asyncs) 
      (\ref -> forever (do 
       threadDelay 1e6 
       asyncs <- readIORef ref 
       vivify task (writeIORef ref) asyncs)) 
      (\ref -> do 
       asyncs <- readIORef ref 
       mapM_ uninterruptibleCancel asyncs) 

它使用輔助vivify功能橫貫Async個列表,復興死的和寫回更新列表到IORef

vivify :: IO() -> ([Async()] -> IO()) -> [Async()] -> IO() 
vivify task write asyncs = go [] asyncs 
    where 
    go _ [] = do 
     return() 
    go past (a:pending) = do 
     status <- poll a 
     case status of 
      Nothing -> do 
       go (a:past) pending 
      Just _ -> do 
       past' <- mask_ $ do 
        a' <- async task 
        write (reverse (a':past) ++ pending) 
        return (a':past) 
       go past' pending 

我們屏蔽異步例外創建一個新的Async並將其保留在IOref之間的時間間隔,因爲否則如果異步異常到達並中止監視器線程,那麼Async將保持懸掛狀態。