我創建了一個調度程序類 - 這本身就是一個長時間運行的任務,可以隨時由用戶取消。該任務將輪詢數據庫以查看是否有任何需要完成的工作,並運行到X [5]個子任務。正確地創建一個任務來輪詢和派發每個子任務,並取消每個任務
據我所知 - 它工作得很好,但我對代碼有幾個問題/疑慮。或多或少 - 因爲我找不到這樣的另一個例子 - 我做對了嗎?有什麼我可以改進的?
- 我正在使用ConcurrentDictionary來跟蹤正在運行的子任務。該字典存儲正在處理的RequestKey以及該任務的CancellationTokenSource。
問:這是最好的方法嗎?在StartDownloadProcess(這是子任務)中,我創建CancellationTokenSource並將其添加到字典中,然後啓動任務。我已經添加了一個Continuation,它在處理完成後從Dictionary中移除這個項目,這樣它就不會在Cancel方法中調用。
在子任務中,我將取消令牌傳遞給實際執行工作的方法。然後該進程將通過定期檢查該令牌來檢查是否需要中止。它是否正確 ?
在取消方法 - 我在字典中創建一個Keys的副本,遍歷它並嘗試訪問並從字典中刪除該項目併發出取消請求。
問:這是最好的方法嗎?我是否需要等待以查看任務是否真的取消了?我可以嗎 ?
問:我應該處置CTS嗎?
- 我正在做一個Thread.Sleep的主要任務..好/壞?我應該使用SpinWait嗎?是否有另一種方法/更好的方法讓主要輪詢員按特定間隔睡覺並重新運行?
注:在StartDownloadProcess我使用了一段時間(真)進行循環,直至任務完成,或取消迭代直到J>時requestKey。在真正的代碼中,沒有while循環。它只是開始新的任務並運行實際的下載過程。
-
/// <summary>
/// Primary dispatcher token source
/// </summary>
CancellationTokenSource primaryTokenSource;
/// <summary>
/// A collection of Worker Tokens which can be used to cancel worker tasks and keep track of how many
/// there are.
/// </summary>
ConcurrentDictionary<int, CancellationTokenSource> workerTokens = new ConcurrentDictionary<int, CancellationTokenSource>();
/// <summary>
/// Runs this instance.
/// </summary>
public void Run() {
// Only one dispatcher can be running
if (IsRunning)
return;
// Create a new token source
primaryTokenSource = new CancellationTokenSource();
// Create the cancellation token to pass into the Task
CancellationToken token = primaryTokenSource.Token;
// Set flag on
IsRunning = true;
// Fire off the dispatcher
Task.Factory.StartNew(
() => {
// Loop forever
while (true) {
// If there are more than 5 threads running, don't add a new one
if (workerTokens.Count < 5) {
// Check to see if we've been cancelled
if (token.IsCancellationRequested)
return;
// Check to see if there are pending requests
int? requestKey = null;
// Query database (removed)
requestKey = new Random().Next(1550);
// If we got a request, start processing it
if (requestKey != null) {
// Check to see if we've been cancelled before running the child task
if (token.IsCancellationRequested)
return;
// Start the child downloader task
StartDownloadProcess(requestKey.Value);
}
} else {
// Do nothing, we've exceeded our max tasks
Console.WriteLine("MAX TASKS RUNNING, NOT STARTING NEW");
}
// Sleep for the alloted time
Thread.Sleep(Properties.Settings.Default.PollingInterval);
}
}, token)
// Turn running flag off
.ContinueWith((t) => IsRunning = false)
// Notify that we've finished
.ContinueWith(OnDispatcherStopped);
}
/// <summary>
/// Starts the download process.
/// </summary>
/// <param name="requestKey">The request key.</param>
private void StartDownloadProcess(int requestKey) {
CancellationTokenSource workerTokenSource = new CancellationTokenSource();
CancellationToken token = workerTokenSource.Token;
// Add the token source to the queue
workerTokens.GetOrAdd(requestKey, workerTokenSource);
// Start the child downloader task
Task.Factory.StartNew(
() => {
int j = 0;
while (true) {
if (token.IsCancellationRequested) {
Console.WriteLine("Sub-Task Cancelled {0}", requestKey);
return;
}
// Create a new downloader, pass it the RequestKey and token
//var downloader = new Downloader(requestKey, token);
//downloader.Run();
// Simulate work
Thread.Sleep(250);
Console.WriteLine("SUB-Task {0} is RUNNING! - #{1}", requestKey, j);
// Simulate - automatically end task when j > requestkey
if (j++ > requestKey) {
Console.WriteLine("SUB TASK {0} IS ENDING!", requestKey);
return;
}
}
},
token
).ContinueWith((t) => {
// If we ended naturally, the cancellationtoken will need to be removed from the dictionary
CancellationTokenSource source = null;
workerTokens.TryRemove(requestKey, out source);
});
}
/// <summary>
/// Cancels this instance.
/// </summary>
public void Cancel() {
// Cancel the primary task first so new new child tasks are created
if (primaryTokenSource != null)
primaryTokenSource.Cancel();
// Iterate over running cancellation sources and terminate them
foreach (var item in workerTokens.Keys.ToList()) {
CancellationTokenSource source = null;
if (workerTokens.TryRemove(item, out source)) {
source.Cancel();
}
}
}
此外,不要在上面的例子所示..幾個事件也能使用任務中得到提升......這些事件都類似如下:
public event EventHandler DispatcherStarted;
private void OnDispatcherStarted() {
EventHandler handler = DispatcherStarted;
if (handler != null)
Task.Factory.StartNew(() => handler(this, EventArgs.Empty), CancellationToken.None, TaskCreationOptions.None, taskScheduler).Wait();
}
在Run()方法中 - 在不同的點它會調用OnDispatcher *();提出事件,以便呼叫者可以訂閱並得到通知。這些事件創建的任務將在主線程上運行。
- Bonus問題:我正在考慮使調度器通用並傳入檢查數據庫的「poller」對象。如果成功,創建一個子任務並傳遞它所需的參數。我遇到了一些問題,如..如何通過周圍的數據,哪些對象傳遞進來。接口/類/ FUNC < ,,,> /動作<>等我怎麼會變成一個通用的調度程序/輪詢運行一個返回參數(我在想這個一個字典),然後創建一個使用這些參數並支持取消和事件通知的子任務B?
你可能想看看http://codereview.stackexchange.com –