2013-03-19 67 views
1

我有一個組件向一個基於Web的API提交請求,但這些請求必須被限制,以免違反API的數據限制。這意味着所有請求都必須通過一個隊列來控制它們提交的速率,但它們可以(也應該)同時執行以實現最大吞吐量。每個請求在將來的某個時刻必須將某些數據返回給調用代碼。如何使用C#任務並行庫和IProducerConsumerCollection實現通用回調?

我努力創造一個很好的模型來處理數據的返回。

使用BlockingCollection我不能僅從Schedule方法返回Task<TResult>,因爲排隊和出隊進程位於緩衝區的任一端。因此,我創建了一個RequestItem<TResult>類型,其中包含形式爲Action<Task<TResult>>的回調。

這個想法是,一旦一個項目被從隊列中拉出,回調可以與啓動的任務一起被調用,但是我已經失去了通用類型參數,並且我留下了反射和各種骯髒(如果甚至可能的話)。

例如:

public class RequestScheduler 
{ 
    private readonly BlockingCollection<IRequestItem> _queue = new BlockingCollection<IRequestItem>(); 

    public RequestScheduler() 
    { 
     this.Start(); 
    } 

    // This can't return Task<TResult>, so returns void. 
    // Instead RequestItem is generic but this poses problems when adding to the queue 
    public void Schedule<TResult>(RequestItem<TResult> request) 
    { 
     _queue.Add(request); 
    } 

    private void Start() 
    { 
     Task.Factory.StartNew(() => 
     { 
      foreach (var item in _queue.GetConsumingEnumerable()) 
      { 
       // I want to be able to use the original type parameters here 
       // is there a nice way without reflection? 
       // ProcessItem submits an HttpWebRequest 
       Task.Factory.StartNew(() => ProcessItem(item)) 
        .ContinueWith(t => { item.Callback(t); }); 
      } 
     }); 
    } 

    public void Stop() 
    { 
     _queue.CompleteAdding(); 
    } 
} 

public class RequestItem<TResult> : IRequestItem 
{ 
    public IOperation<TResult> Operation { get; set; } 
    public Action<Task<TResult>> Callback { get; set; } 
} 

我如何能繼續我的緩存請求,但返回Task<TResult>到客戶端時,請求從緩衝器拉出並提交API?

+0

有沒有什麼可以傳遞迴調的動作的任何方式,並使用lambda在點創建行動,你擁有所有的類型可用? – 2013-03-19 17:52:52

+0

你是什麼意思?動作委託不需要參數,我將如何返回數據? – MalcomTucker 2013-03-19 18:03:16

+1

您的調度程序爲什麼調用回調函數?應該在「ProcessItem」的定義中調用回調嗎?這將使整個事情變得更容易。 – Servy 2013-03-19 18:22:59

回答

1

首先,你可以返回Schedule()Task<TResult>,你只需要使用TaskCompletionSource了點。其次,爲了解決通用性問題,你可以隱藏它的全部內容(非通用)Action s。在Schedule()中,使用完全符合您需要的lambda創建一個操作。然後消費循環將執行該動作,它不需要知道里面的內容。

第三,我不明白你爲什麼要在循環的每次迭代中啓動一個新的Task。首先,這意味着你實際上不會得到任何限制。

有了這些修改,代碼看起來是這樣的:

public class RequestScheduler 
{ 
    private readonly BlockingCollection<Action> m_queue = new BlockingCollection<Action>(); 

    public RequestScheduler() 
    { 
     this.Start(); 
    } 

    private void Start() 
    { 
     Task.Factory.StartNew(() => 
     { 
      foreach (var action in m_queue.GetConsumingEnumerable()) 
      { 
       action(); 
      } 
     }, TaskCreationOptions.LongRunning); 
    } 

    public Task<TResult> Schedule<TResult>(IOperation<TResult> operation) 
    { 
     var tcs = new TaskCompletionSource<TResult>(); 

     Action action =() => 
     { 
      try 
      { 
       tcs.SetResult(ProcessItem(operation)); 
      } 
      catch (Exception e) 
      { 
       tcs.SetException(e); 
      } 
     }; 

     m_queue.Add(action); 

     return tcs.Task; 
    } 

    private T ProcessItem<T>(IOperation<T> operation) 
    { 
     // whatever 
    } 
} 
+0

非常感謝,昨天晚上我發現了TaskCompletionSource,並且只是敲了一些東西,所以我很高興我在正確的路線上。節制發生在BlockingCollection上的'foreach'中,爲了清晰起見,我已經將其省略了。 – MalcomTucker 2013-03-20 09:59:42

+0

最後一個問題 - 爲什麼使用「LongRunning」選項?我的API調用大約是70ms - 是否足夠長的時間來使用這個選項? – MalcomTucker 2013-03-20 10:00:21

+0

@MalcomTucker每個API調用需要多長時間無關緊要。重要的是整個「任務」將花費多長時間。在我看來,這可能是一個很長的時間,其中大部分可能會花費在等待'GetConsumingEnumerable()'返回另一個項目。 – svick 2013-03-20 11:16:36

相關問題