這是Tomas答案的變體,因爲我需要一個可以返回結果的代理。
type ThrottleMessage<'a> =
| AddJob of (Async<'a>*AsyncReplyChannel<'a>)
| DoneJob of ('a*AsyncReplyChannel<'a>)
| Stop
/// This agent accumulates 'jobs' but limits the number which run concurrently.
type ThrottleAgent<'a>(limit) =
let agent = MailboxProcessor<ThrottleMessage<'a>>.Start(fun inbox ->
let rec loop(jobs, count) = async {
let! msg = inbox.Receive() //get next message
match msg with
| AddJob(job) ->
if count < limit then //if not at limit, we work, else loop
return! work(job::jobs, count)
else
return! loop(job::jobs, count)
| DoneJob(result, reply) ->
reply.Reply(result) //send back result to caller
return! work(jobs, count - 1) //no need to check limit here
| Stop -> return() }
and work(jobs, count) = async {
match jobs with
| [] -> return! loop(jobs, count) //if no jobs left, wait for more
| (job, reply)::jobs -> //run job, post Done when finished
async { let! result = job
inbox.Post(DoneJob(result, reply)) }
|> Async.Start
return! loop(jobs, count + 1) //job started, go back to waiting
}
loop([], 0)
)
member m.AddJob(job) = agent.PostAndAsyncReply(fun rep-> AddJob(job, rep))
member m.Stop() = agent.Post(Stop)
在我的具體情況,我只需要使用它作爲一個'一次性「地圖」,所以我加了一個靜態函數:
static member RunJobs limit jobs =
let agent = ThrottleAgent<'a>(limit)
let res = jobs |> Seq.map (fun job -> agent.AddJob(job))
|> Async.Parallel
|> Async.RunSynchronously
agent.Stop()
res
它似乎工作確定。 ..
這聽起來像是一些「隊列」丟失......我不知道如何F#或它的「異步」工作,但它應該很容易寫入使用CPS,如果這符合模型。 – 2011-06-02 20:06:51
聲音越來越像你想'MailboxProcessor'而不是直接使用'async' ... – ildjarn 2011-06-02 20:16:06
你說有些照片不可用,那麼服務器應該返回404(找不到)對不對?我的意思是它不應該是這樣的事情超時的情況。你問服務器一個資源,服務器響應這個簡單的資源。 – Ankur 2011-06-03 04:19:22