2011-08-24 93 views
1

我創建了一個調度程序類 - 這本身就是一個長時間運行的任務,可以隨時由用戶取消。該任務將輪詢數據庫以查看是否有任何需要完成的工作,並運行到X [5]個子任務。正確地創建一個任務來輪詢和派發每個子任務,並取消每個任務

據我所知 - 它工作得很好,但我對代碼有幾個問題/疑慮。或多或少 - 因爲我找不到這樣的另一個例子 - 我做對了嗎?有什麼我可以改進的?

  1. 我正在使用ConcurrentDictionary來跟蹤正在運行的子任務。該字典存儲正在處理的RequestKey以及該任務的CancellationTokenSource。

問:這是最好的方法嗎?在StartDownloadProcess(這是子任務)中,我創建CancellationTokenSource並將其添加到字典中,然後啓動任務。我已經添加了一個Continuation,它在處理完成後從Dictionary中移除這個項目,這樣它就不會在Cancel方法中調用。

  1. 在子任務中,我將取消令牌傳遞給實際執行工作的方法。然後該進程將通過定期檢查該令牌來檢查是否需要中止。它是否正確 ?

  2. 在取消方法 - 我在字典中創建一個Keys的副本,遍歷它並嘗試訪問並從字典中刪除該項目併發出取消請求。

問:這是最好的方法嗎?我是否需要等待以查看任務是否真的取消了?我可以嗎 ?
問:我應該處置CTS嗎?

  1. 我正在做一個Thread.Sleep的主要任務..好/壞?我應該使用SpinWait嗎?是否有另一種方法/更好的方法讓主要輪詢員按特定間隔睡覺並重新運行?

注:在StartDownloadProcess我使用了一段時間(真)進行循環,直至任務完成,或取消迭代直到J>時requestKey。在真正的代碼中,沒有while循環。它只是開始新的任務並運行實際的下載過程。

-

/// <summary> 
/// Primary dispatcher token source 
/// </summary> 
CancellationTokenSource primaryTokenSource; 
/// <summary> 
/// A collection of Worker Tokens which can be used to cancel worker tasks and keep track of how many 
/// there are. 
/// </summary> 
ConcurrentDictionary<int, CancellationTokenSource> workerTokens = new ConcurrentDictionary<int, CancellationTokenSource>(); 

/// <summary> 
/// Runs this instance. 
/// </summary> 
public void Run() { 
    // Only one dispatcher can be running 
    if (IsRunning) 
    return; 

    // Create a new token source 
    primaryTokenSource = new CancellationTokenSource(); 
    // Create the cancellation token to pass into the Task 
    CancellationToken token = primaryTokenSource.Token; 

    // Set flag on 
    IsRunning = true; 

    // Fire off the dispatcher 
    Task.Factory.StartNew(
    () => { 
     // Loop forever 
     while (true) { 
     // If there are more than 5 threads running, don't add a new one 
     if (workerTokens.Count < 5) { 
      // Check to see if we've been cancelled 
      if (token.IsCancellationRequested) 
      return; 

      // Check to see if there are pending requests 
      int? requestKey = null; 

      // Query database (removed) 
      requestKey = new Random().Next(1550); 

      // If we got a request, start processing it 
      if (requestKey != null) { 
      // Check to see if we've been cancelled before running the child task 
      if (token.IsCancellationRequested) 
       return; 

      // Start the child downloader task 
      StartDownloadProcess(requestKey.Value); 
      } 
     } else { 
      // Do nothing, we've exceeded our max tasks 
      Console.WriteLine("MAX TASKS RUNNING, NOT STARTING NEW"); 
     } 

     // Sleep for the alloted time 
     Thread.Sleep(Properties.Settings.Default.PollingInterval); 
    } 
    }, token) 
    // Turn running flag off 
    .ContinueWith((t) => IsRunning = false) 
    // Notify that we've finished 
    .ContinueWith(OnDispatcherStopped); 
} 

/// <summary> 
/// Starts the download process. 
/// </summary> 
/// <param name="requestKey">The request key.</param> 
private void StartDownloadProcess(int requestKey) { 
    CancellationTokenSource workerTokenSource = new CancellationTokenSource(); 
    CancellationToken token = workerTokenSource.Token; 

    // Add the token source to the queue 
    workerTokens.GetOrAdd(requestKey, workerTokenSource); 

    // Start the child downloader task 
    Task.Factory.StartNew(
    () => { 
     int j = 0; 
     while (true) { 
     if (token.IsCancellationRequested) { 
      Console.WriteLine("Sub-Task Cancelled {0}", requestKey); 
      return; 
     } 

     // Create a new downloader, pass it the RequestKey and token 
     //var downloader = new Downloader(requestKey, token); 
     //downloader.Run(); 

     // Simulate work 
     Thread.Sleep(250); 
     Console.WriteLine("SUB-Task {0} is RUNNING! - #{1}", requestKey, j); 

     // Simulate - automatically end task when j > requestkey 
     if (j++ > requestKey) { 
      Console.WriteLine("SUB TASK {0} IS ENDING!", requestKey); 
      return; 
     } 
     } 
    }, 
    token 
).ContinueWith((t) => { 
    // If we ended naturally, the cancellationtoken will need to be removed from the dictionary 
    CancellationTokenSource source = null; 
    workerTokens.TryRemove(requestKey, out source); 
    }); 
} 

/// <summary> 
/// Cancels this instance. 
/// </summary> 
public void Cancel() { 
    // Cancel the primary task first so new new child tasks are created 
    if (primaryTokenSource != null) 
    primaryTokenSource.Cancel(); 

    // Iterate over running cancellation sources and terminate them 
    foreach (var item in workerTokens.Keys.ToList()) { 
    CancellationTokenSource source = null; 
    if (workerTokens.TryRemove(item, out source)) { 
     source.Cancel(); 
    } 
    } 
} 

此外,不要在上面的例子所示..幾個事件也能使用任務中得到提升......這些事件都類似如下:

public event EventHandler DispatcherStarted; 
private void OnDispatcherStarted() { 
    EventHandler handler = DispatcherStarted; 
    if (handler != null) 
    Task.Factory.StartNew(() => handler(this, EventArgs.Empty), CancellationToken.None, TaskCreationOptions.None, taskScheduler).Wait();  
} 

在Run()方法中 - 在不同的點它會調用OnDispatcher *();提出事件,以便呼叫者可以訂閱並得到通知。這些事件創建的任務將在主線程上運行。

  • Bonus問題:我正在考慮使調度器通用並傳入檢查數據庫的「poller」對象。如果成功,創建一個子任務並傳遞它所需的參數。我遇到了一些問題,如..如何通過周圍的數據,哪些對象傳遞進來。接口/類/ FUNC < ,,,> /動作<>等我怎麼會變成一個通用的調度程序/輪詢運行一個返回參數(我在想這個一個字典),然後創建一個使用這些參數並支持取消和事件通知的子任務B?
+1

你可能想看看http://codereview.stackexchange.com –

+0

  • 子任務延續西港島線總是被執行不,我當然沒有! - 我甚至不知道那是存在的。謝謝!絕對更合適。 –

  • 回答

    1

    我趕緊看了扔的代碼,並有幾點意見:

    • IsRunning標誌不是線程安全的,多線程的使用可以REA它爲假,然後將其設置同時爲true,你會有不止一個調度線程進行!,避免你必須使用Interlocked.CompareExchange來設置它,你也需要將其標記爲voaltile。
    • 我建議不要使用休眠,也SpinWait不會有所幫助這裏,你可以使用一個Timer對象池的數據庫,並添加請求到BlockingCollection其中,分派CLAS消耗來自請求。即使父任務被取消,那麼就可以避免通過傳遞這個TaskContinuationOptions.NotOnCanceled