2010-04-09 63 views
0

我需要從外部系統檢索多個對象。外部系統支持多個同時發生的請求(即線程),但是可能使外部系統氾濫 - 因此我希望能夠異步地檢索多個對象,但我希望能夠限制同時發生的異步請求數量。即我需要檢索100個項目,但不想一次檢索超過25個項目的。當25的每個請求完成時,我想觸發另一個檢索,並且一旦它們全部完成,我想按照它們被請求的順序返回所有結果(即,在返回整個調用之前沒有結果返回結果)。這種事情有推薦的模式嗎?用於限制同時異步呼叫數的模式

會是這樣的事情是合適的(僞代碼,顯然)?

private List<externalSystemObjects> returnedObjects = new List<externalSystemObjects>; 

    public List<externalSystemObjects> GetObjects(List<string> ids) 
    { 
     int callCount = 0; 
     int maxCallCount = 25; 
     WaitHandle[] handles; 

     foreach(id in itemIds to get) 
     { 
      if(callCount < maxCallCount) 
      { 
       WaitHandle handle = executeCall(id, callback); 
       addWaitHandleToWaitArray(handle) 
      } 
     else 
     { 
      int returnedCallId = WaitHandle.WaitAny(handles); 
      removeReturnedCallFromWaitHandles(handles); 
     } 
    } 

    WaitHandle.WaitAll(handles); 

    return returnedObjects; 
    } 

    public void callback(object result) 
    { 
     returnedObjects.Add(result); 
    } 

回答

1

考慮項目的列表作爲隊列從25個線程處理出隊的任務,處理任務來處理,增加的結果,然後重複,直到隊列爲空:

class Program 
    { 
    class State 
    { 
     public EventWaitHandle Done; 
     public int runningThreads; 
     public List<string> itemsToProcess; 
     public List<string> itemsResponses; 
    } 

    static void Main(string[] args) 
    { 
     State state = new State(); 

     state.itemsResponses = new List<string>(1000); 
     state.itemsToProcess = new List<string>(1000); 
     for (int i = 0; i < 1000; ++i) 
     { 
     state.itemsToProcess.Add(String.Format("Request {0}", i)); 
     } 

     state.runningThreads = 25; 
     state.Done = new AutoResetEvent(false); 

     for (int i = 0; i < 25; ++i) 
     { 
     Thread t =new Thread(new ParameterizedThreadStart(Processing)); 
     t.Start(state); 
     } 

     state.Done.WaitOne(); 

     foreach (string s in state.itemsResponses) 
     { 
     Console.WriteLine("{0}", s); 
     } 
    } 

    private static void Processing(object param) 
    { 
     Debug.Assert(param is State); 
     State state = param as State; 

     try 
     { 
     do 
     { 
      string item = null; 
      lock (state.itemsToProcess) 
      { 
      if (state.itemsToProcess.Count > 0) 
      { 
       item = state.itemsToProcess[0]; 
       state.itemsToProcess.RemoveAt(0); 
      } 
      } 
      if (null == item) 
      { 
      break; 
      } 
      // Simulate some processing 
      Thread.Sleep(10); 
      string response = String.Format("Response for {0} on thread: {1}", item, Thread.CurrentThread.ManagedThreadId); 
      lock (state.itemsResponses) 
      { 
      state.itemsResponses.Add(response); 
      } 
     } while (true); 

     } 
     catch (Exception) 
     { 
     // ... 
     } 
     finally 
     { 
     int threadsLeft = Interlocked.Decrement(ref state.runningThreads); 
     if (0 == threadsLeft) 
     { 
      state.Done.Set(); 
     } 
     } 
    } 
    } 

你可以做同樣使用異步回調,不需要使用線程。

0

擁有一些類似隊列的結構來保存掛起的請求是一種非常常見的模式。在可能有幾個處理層的Web應用程序中,您會看到一種「漏斗」風格方法,處理變化的早期部分會有更大的隊列。隊列中可能還會有某種優先級,優先級較高的請求將被排序到隊列頂部。

您的解決方案中需要考慮的一件重要事情是,如果請求到達率高於您的處理速率(這可能是由於拒絕服務攻擊,或者只是某些部分處理過程今天異常緩慢),那麼你的隊列將不受限制地增加。您需要制定一些策略,例如在隊列深度超過某個值時立即拒絕新請求。