2011-08-18 75 views
1

我有類似以下的情況:這種情況下推薦的F#模式是什麼?

let mutable stopped = false 

let runAsync() = async { 
    while not stopped do 
     let! item = fetchItemToProcessAsync 
     match item with 
     | Some job -> job |> runJobAsync |> Async.Start 
     | None -> do! Async.Sleep(1000) 
} 

let run() = Async.Start runAsync 
let stop() = 
    stopped <- true 

現在,當停止方法被調用時,我不得不停止閱讀從DB進一步的項目,並等待當前開始到完成返回前的那些從這個功能。

完成此操作的最佳方法是什麼?我想用一個計數器,(與聯鎖API)組成,當計數器達到0

如果沒有做到這一點的另一種方式,我將不勝感激指導從停止方法返回。我有一種感覺,我可以在這裏使用代理,但我不確定是否有任何可用的方法來與代理完成此操作,或者如果我仍然必須編寫自定義邏輯以確定作業已完成執行。

回答

1

看看actor-based patterns and MailboxProcessor

基本上你能想象,作爲一個異步隊列。如果您使用的運行列表(開始Async.StartChildAsync.StartAsTask)作爲MailboxProcessor裏面的參數爲你的循環,你可以從容地處理通過等待或的CancellationToken)停產

下面是一個簡單示例中,我放在一起:


type Commands = 
    | RunJob of Async 
    | JobDone of int 
    | Quit of AsyncReplyChannel 

type JobRunner() = 
    let processor = 
     MailboxProcessor.Start (fun inbox -> 
      let rec loop (nextId, jobs) = async { 
       let! cmd = inbox.Receive() 
       match cmd with 
       | Quit cb -> 
        if not (Map.isEmpty jobs) 
        then async { 
          do! Async.Sleep 100 
          inbox.Post (Quit cb)} 
         |> Async.Start 
         return! loop (nextId, jobs) 
        else 
         cb.Reply() 
         return() 
       | JobDone id -> 
        return! loop (nextId, jobs |> Map.remove id) 
       | RunJob job -> 
        let runJob i = async { 
         do! job 
         inbox.Post (JobDone i) 
        } 
        let! child = Async.StartChild (runJob nextId) 
        return! loop (nextId+1, jobs |> Map.add nextId child) 
      } 
      loop (0, Map.empty)) 
    member jr.PostJob(job) = processor.Post (RunJob job) 
    member jr.Quit() = processor.PostAndReply(fun cb -> Quit cb) 

let postWaitJob (jobRunner : JobRunner) time = 
    let job = async { 
     do! Async.Sleep time 
     printfn "sleept for %d ms" time } 
    jobRunner.PostJob job 

let testRun() = 
    let jr = new JobRunner() 
    printfn "starting jobs..." 
    [10..-1..1] |> List.iter (fun i -> postWaitJob jr (i*1000)) 
    printfn "sending quit" 
    jr.Quit() 
    printfn "done!" 

嗯...有與編輯器的一些問題在這裏:它只是殺死了大量的代碼,當我用管道回運營商...哎呀

簡短的解釋:你可以看到我總是提供內部循環與下一個免費的作業ID和一個Map-> AsyncChild作業。 (你當然可以實現其他/更好的解決方案 - 該地圖是不是在這個例子中neccessary,但你可以用命令「Cancell JobNr」或任何這樣的程度) 作業完成消息僅用於internaly刪除從這個職位剛剛退出地圖 檢查,如果該映射爲空 - 如果沒有額外的工作需要和Mailboxprocessor退出(返程()) - 如果它不是空的一個新的異步孩子開始,只是等待100毫秒,然後重新發送退出 - 消息 RunJob相當簡單 - 它只是將給定的作業連同JobDone的後臺鏈接到MessabeBoxProcessor中,並使用更新後的值(nextId是一個,並且新的Job映射到舊的nextId)返回recursivley循環。

+0

感謝您的答覆 - 我下去相同的路徑了。不過,我有兩個問題,看你的例子:1)當我們開始用'Async.StartChild異步:異步<'u>'然後不,我們需要有另一個呼叫「從異步<'u>檢索'U(如讓! x =孩子或者做!孩子)'? 2)鑑於這些工作可以在不同的時間完成,我們不需要爲Map函數使用某種同步嗎? –

+1

嗨 - 是的,如果你想resutl你會淨另一個讓!爲孩子 - 但我不關心答案(沒有),你不必。第二:不,因爲我從來沒有訪問過相同的地圖,但總是生成新的地圖(地圖是不可變的)我不必考慮很多的協調性 - 除了那些只有當前繼續/運行循環的線程才能訪問它(這就是爲什麼我發佈JobDone消息到處理器而不是更改全球地圖/字典或其他) – Carsten

+0

我建議你在那裏放一些「printf」並且用代碼來玩 - 總是幫助我在這些情況下(如果你運行testRun函數(你可以把它全部粘貼到F#交互中),你可以看到例如在任何任務完成之前調用quit,但函數只會在所有10個任務完成時返回) – Carsten

1

看看這個snippet上fssnip.net。這是您可以使用的通用作業處理器。

+0

幾乎與我的解決方案相同 - 但我喜歡分成「退出」階段。 – Carsten