2011-06-02 61 views
6

我試圖從我的博客的xml備份中下載3000多張照片。我遇到的問題是,如果只有其中一張照片不再可用,則整個異步會被阻止,因爲AsyncGetResponse不會執行超時。F#中的「節流」異步下載

ildjarn幫我把一個版本AsyncGetResponse那裏沒有關於超時失敗,但採用能提供很多超時 - 似乎這只是排隊超時請求。看起來所有的WebRequests都是立即啓動的,唯一的辦法就是設置超時時間到下載全部的時間:這不是很好,因爲這意味着我已經調整了超時取決於圖像的數量。

我是否已達到香草味的限度async?我應該看看被動擴展嗎?

這有點尷尬,因爲我已經在這裏詢問了twoquestions這個特定的代碼位,我仍然沒有按照我想要的方式工作!

+0

這聽起來像是一些「隊列」丟失......我不知道如何F#或它的「異步」工作,但它應該很容易寫入使用CPS,如果這符合模型。 – 2011-06-02 20:06:51

+1

聲音越來越像你想'MailboxProcessor'而不是直接使用'async' ... – ildjarn 2011-06-02 20:16:06

+0

你說有些照片不可用,那麼服務器應該返回404(找不到)對不對?我的意思是它不應該是這樣的事情超時的情況。你問服務器一個資源,服務器響應這個簡單的資源。 – Ankur 2011-06-03 04:19:22

回答

9

我認爲必須有更好的方法來找出文件不可用比使用超時。我不完全確定,但是如果找不到文件,有沒有辦法讓它拋出異常?然後,您可以將async代碼包裝在try .. with之內,並且應該避免大部分問題。無論如何,如果你想編寫自己的「併發管理器」來並行運行一定數量的請求,並對剩餘的未決請求進行排隊,那麼F#中最簡單的選擇就是使用代理(MailboxProcessor類型)。以下對象封裝了以下行爲:

type ThrottlingAgentMessage = 
    | Completed 
    | Work of Async<unit> 

/// Represents an agent that runs operations in concurrently. When the number 
/// of concurrent operations exceeds 'limit', they are queued and processed later 
type ThrottlingAgent(limit) = 
    let agent = MailboxProcessor.Start(fun agent -> 
    /// Represents a state when the agent is blocked 
    let rec waiting() = 
     // Use 'Scan' to wait for completion of some work 
     agent.Scan(function 
     | Completed -> Some(working (limit - 1)) 
     | _ -> None) 
    /// Represents a state when the agent is working 
    and working count = async { 
     while true do 
     // Receive any message 
     let! msg = agent.Receive() 
     match msg with 
     | Completed -> 
      // Decrement the counter of work items 
      return! working (count - 1) 
     | Work work -> 
      // Start the work item & continue in blocked/working state 
      async { try do! work 
        finally agent.Post(Completed) } 
      |> Async.Start 
      if count < limit then return! working (count + 1) 
      else return! waiting() } 
    working 0)  

    /// Queue the specified asynchronous workflow for processing 
    member x.DoWork(work) = agent.Post(Work work) 
+0

我正準備發佈這個鏈接給你的博客。 http://tomasp.net/blog/parallel-extra-blockingagent.aspx – gradbot 2011-06-02 20:29:55

+0

@gradbot - 我在想那個代理,但發現這個(也許?)更合適。應該有一些帶有可重用代理的「設計模式」書:-)。 – 2011-06-02 21:09:20

+0

謝謝你。 (有些請求返回時沒有找到,但我有一對(3000的),這只是從來沒有回覆...) – Benjol 2011-06-02 22:34:53

6

沒有什麼是件容易的事。 :)

我認爲你遇到的問題是問題領域內在的問題(與僅僅是異步編程模型的問題相反,儘管他們確實有些交互)。

假設你想下載3000張照片。首先,在你的.NET過程中,有一些像System.Net.ConnectionLimit或者我忘記名字的東西,例如限制您的.NET進程可以同時運行的同時HTTP連接的數量(並且默認值只是'2',我認爲)。所以你可以找到控制並將其設置爲更高的數字,這將有所幫助。

但是接下來,您的機器和互聯網連接具有有限的帶寬。因此,即使您可以嘗試同時啓動3000個HTTP連接,基於帶寬管道限制,每個連接也會變得更慢。所以這也會與超時相互作用。 (這甚至沒有考慮服務器上有什麼限制/限制,也許如果你發送了3000個請求,它會認爲你是DoS攻擊和黑名單IP)。一個好的解決方案需要一些智能節流和流量控制,以便管理底層系統資源的使用方式。

正如其他答案一樣,F#代理(MailboxProcessors)是編寫這種節流/流量控制邏輯的良好編程模型。

(即使所有這些,如果大多數圖片文件都是1MB,但其中有1GB文件混合在那裏,那麼這個文件可能會超時。)

無論如何,這不是問題的答案,只是指出問題域本身有多少固有的複雜性。 (也許這也暗示了爲什麼UI'下載管理器'如此受歡迎。)

+0

沒有免費的午餐:) – Benjol 2011-06-08 06:40:33

4

這是Tomas答案的變體,因爲我需要一個可以返回結果的代理。

type ThrottleMessage<'a> = 
    | AddJob of (Async<'a>*AsyncReplyChannel<'a>) 
    | DoneJob of ('a*AsyncReplyChannel<'a>) 
    | Stop 

/// This agent accumulates 'jobs' but limits the number which run concurrently. 
type ThrottleAgent<'a>(limit) =  
    let agent = MailboxProcessor<ThrottleMessage<'a>>.Start(fun inbox -> 
     let rec loop(jobs, count) = async { 
      let! msg = inbox.Receive() //get next message 
      match msg with 
      | AddJob(job) -> 
       if count < limit then //if not at limit, we work, else loop 
        return! work(job::jobs, count) 
       else 
        return! loop(job::jobs, count) 
      | DoneJob(result, reply) -> 
       reply.Reply(result)   //send back result to caller 
       return! work(jobs, count - 1) //no need to check limit here 
      | Stop -> return() } 
     and work(jobs, count) = async { 
      match jobs with 
      | [] -> return! loop(jobs, count) //if no jobs left, wait for more 
      | (job, reply)::jobs ->   //run job, post Done when finished 
       async { let! result = job 
         inbox.Post(DoneJob(result, reply)) } 
       |> Async.Start 
       return! loop(jobs, count + 1) //job started, go back to waiting 
     } 
     loop([], 0) 
    ) 
    member m.AddJob(job) = agent.PostAndAsyncReply(fun rep-> AddJob(job, rep)) 
    member m.Stop() = agent.Post(Stop) 

在我的具體情況,我只需要使用它作爲一個'一次性「地圖」,所以我加了一個靜態函數:

static member RunJobs limit jobs = 
     let agent = ThrottleAgent<'a>(limit) 
     let res = jobs |> Seq.map (fun job -> agent.AddJob(job)) 
         |> Async.Parallel 
         |> Async.RunSynchronously 
     agent.Stop() 
     res 

似乎工作確定。 ..

+0

我建議儘可能避免使用'RunSynchronously',並讓'RunJobs'本身異步。 – Tarmil 2015-06-04 07:42:54

0

下面是一個開箱即用解決方案:

FSharpx.Control offers an Async.ParallelWithThrottle function。我不確定這是否是it uses SemaphoreSlim的最佳實施方案。但易用性非常好,而且由於我的應用程序不需要頂級性能,所以對我來說它已經足夠好用了。雖然如果有人知道如何讓它變得更好,那麼它就是一個圖書館,但是讓圖書館成爲最優秀的開發人員總是一件好事,所以我們其他人可以使用可以工作的代碼並完成我們的工作!