2012-02-10 104 views
2

背景:響應異步SQL調用,因爲他們完成

我有一個服務,分批查詢歷史結果分貝。 批次基於開始時間和結束時間。所有批次之間的數據是相互排斥的,因此對於給定的一批批次,它們可以按任意順序運行。有些批次可能需要更多時間才能執行。如果有任何批次失敗,則整個過程中止/停止並記錄錯誤。將數據返回給客戶端時,需要合併來自所有批次的數據。

問題:

儘快將數據返回給客戶端。

初步解決方案:

起初我是爲了同步執行批次。這沒有利用數據互斥的事實。初次使用後,發現此加載方法耗時過長。經過一些調試後,我發現導致速度慢的主要原因是sql查詢的執行時間。因此,下一個解決方案是嘗試使用BeginExecuteReader()EndExecuteReader()異步執行每個批次。在異步調用所有批次後,該服務將在一個while循環中等待並每隔300ms輪詢一次以檢查是否有任何查詢已完成。如果是,那麼它會被讀取。

int batchCount = 0, resultCount = 0; 
List<AsyncDataResult> asyncCalls = new List<AsyncDataResult>(); 

while (true) 
{ 
    if (asyncCalls.Count == 0) 
    { 
     break; 
    } 

    if ((asyncCalls[resultCount]).AsyncResult.IsCompleted) 
    { 
     ReadAsyncResultsFromDb(asyncCalls[resultCount]); 
     asyncCalls.RemoveAt(resultCount); 
    } 

    resultCount++; 
    if (resultCount >= asyncCalls.Count) 
    { 
     resultCount = 0; 
     Thread.Sleep(300); 
    } 
} 

上述方法降低了裝載時間對於大數據集,但對於非常小的數據集(但在許多批次),輪詢被實際添加到裝載延遲。

問:

  1. 如何異步執行的SQL,但不會做投票?
  2. 開始讀取每批完成後?

更新: 對不起,但我忘了添加我需要返回的部分在調用異步調用相同的方法。原因是我需要填充的數據集作爲參數傳入此方法。從開始閱讀器使用IAsyncCallback將需要我改變整個班級。我希望我不會那樣做。

+0

哪.net框架4? – 2012-02-10 21:19:07

+0

是的。它的.net 4.0 – EndlessSpace 2012-02-10 21:23:51

回答

1

沒有足夠的信息來提出一個方法,你應該去,但defintely一個你可能,這將是任務並行庫和Task<T>

荷載的樂趣。但是,不要因此而生氣,你可能很容易就會因爲你的超級多線程工作比同步批處理慢而結束。

如果說T,連接到一個數據庫,拋出一個查詢,返回一個datareader,並說批量中有8個查詢。

您設置了任務0到7,啓動計時器超時,將其全部關閉。 完成後,您可以讓讀者保留並根據任務ID設置您的標誌位。當它達到255時產生一個OnBatchComplete事件,複製你的讀者並將它們傳遞給組合任務。超時首先熄滅,並採取相應措施。如果任務出錯,是否返回一些合適的原因,向調用者冒泡,可能會導致任何仍在運行的查詢。

不知道你的組合過程是如何工作的,但是如果它可以這樣組織,所以說一旦查詢完成,你可以做一箇中間過程,或者如果它是按照某種邏輯順序的話,一旦所有的查詢準備閱讀,你可以開始閱讀到簡單的類,然後拋出每一個在合併任務....

這是不公平我不明白的樂趣這樣的東西....

2

爲什麼民調積極?您產生的每個異步操作都返回一個具有WaitHandle的IAsyncResult。使用WaitAny()並讓系統通知您:

/// <summary> 
/// Do something useful with a completed query 
/// </summary> 
/// <param name="result"></param> 
/// <returns> 
/// true if the query failed; 
/// false if the query was successful 
/// </returns> 
private static bool DoSomethingUseful(IAsyncResult result) 
{ 
    throw new NotImplementedException() ; 
} 

static void Main(string[] args) 
{ 
    List<IAsyncResult> batch   = SpawnBatch() ; 
    bool    errorOccurred = ProcessCompletedBatches(batch , DoSomethingUseful) ; 

    if (errorOccurred) 
    { 
     CancelPending(batch) ; 
    } 

    return ; 

} 

public static bool ProcessCompletedBatches(List<IAsyncResult> pending , Func<IAsyncResult,bool> resultHandler) 
{ 
    bool errorOccurred = false ; 

    while (! errorOccurred && pending.Count > 0) 
    { 
     WaitHandle[] inFlight = pending.Select(x => x.AsyncWaitHandle).ToArray() ; 

     int offset = WaitHandle.WaitAny(inFlight) ; 

     IAsyncResult result = pending[offset] ; 
     pending.RemoveAt(offset) ; 

     errorOccurred = resultHandler(result) ; 

    } 

    return errorOccurred ; 

}