如何構造一個異步任務,以便一次只能運行一個任務實例?如果任務在前一個實例運行時被調用一次或多次,則先前的實例應該完成,然後該任務應該再運行一次。線程安全異步非重入任務
任務調用可以來自任何線程。該任務不帶參數並且沒有結果;調用方法簽名是這樣的:Task DoItAsync()
這種按需,不可重入任務的用例包括執行後臺索引和服務器同步。
如何構造一個異步任務,以便一次只能運行一個任務實例?如果任務在前一個實例運行時被調用一次或多次,則先前的實例應該完成,然後該任務應該再運行一次。線程安全異步非重入任務
任務調用可以來自任何線程。該任務不帶參數並且沒有結果;調用方法簽名是這樣的:Task DoItAsync()
這種按需,不可重入任務的用例包括執行後臺索引和服務器同步。
這是一個包裝器,它包含要運行的動作並負責根據需要運行它,以便在完成運行完成後收到調用者通知。
/// <summary>
/// Runs an asynchronous action such that at most one instance of the action runs at a time.
/// If the action is invoked one or more times while a previous instance is running,
/// the previous instance completes, and then the action runs one additional time.
/// </summary>
public class RepeatableActionRunner
{
enum RunState { NotRunning, RunningOnce, RunningAndWillRunAgain };
readonly Func<Task> action;
RunState runState;
Task currentTask = Task.CompletedTask;
Task nextTask = Task.CompletedTask;
readonly object lockObject = new object();
public RepeatableActionRunner(Func<Task> action)
{
this.action = action;
}
/// <summary>
/// Runs the action and returns a task that completes when the action completes.
/// </summary>
/// <remarks>This method is thread safe.</remarks>
public Task RunAsync()
{
lock (lockObject) {
switch (runState) {
case RunState.NotRunning:
return StartTaskAsync();
case RunState.RunningAndWillRunAgain:
return nextTask;
default:
runState = RunState.RunningAndWillRunAgain;
return nextTask = currentTask.ContinueWith(_ => {
lock (lockObject)
return StartTaskAsync();
}).Unwrap();
}
}
}
Task StartTaskAsync()
{
runState = RunState.RunningOnce;
return currentTask = action().ContinueWith(_ => {
lock (lockObject)
runState = runState - 1;
});
}
}
唯一要注意的是,如果'DoItNowAsync'在第一個'await'之前有很長時間沒有等待代碼,你可以長時間保持鎖。你可能要考慮切換到'SemaphoreSlim'並使用'await someSemaphoreObject.WaitAsync()'而不是'lock(someLockObject)' –
此外,你需要在'ContinueWith'後面加一個'.Unwrap()'來創建'nextTask'保持正確的東西。目前它返回一個'Task
難道你不能使用'ActionBlock < T>'嗎?它本身涵蓋除「經常性」以外的所有要求。定期向其發佈請求的計時器也將覆蓋此問題 –
這是使用旗語等,所以如果我們確實需要等待鎖定變爲免費,我們等待asyncronously的Edward's original answer一個扭捏版本。
readonly SemaphoreSlim _someSemaphore = new SemaphoreSlim(1);
Task _currentTask = Task.CompletedTask;
Task _nextTask = Task.CompletedTask;
public async Task DoItAsync()
{
Task taskToAwait;
await _someSemaphore.WaitAsync();
try
{
if (!_nextTask.IsCompleted)
{
taskToAwait = _nextTask;
}
else if(_currentTask.IsCompleted)
{
taskToAwait = _currentTask = DoItNowAsync(null);
}
else
{
taskToAwait = _nextTask = _currentTask.ContinueWith(DoItNowAsync).Unwrap();
}
}
finally
{
_someSemaphore.Release();
}
await taskToAwait;
}
async Task DoItNowAsync(Task _)
{
// Do the work, including async operations.
}
我不認爲這將工作出於同樣的原因,我的答案的早期版本將無法正常工作。除了Damien發現的競爭條件之外,還存在一個問題,即當_nextTask不會觸發完全重新運行時,請求重新運行該動作。 –
的ActionBlock< T>類已經可以讓你請求發送給塊並將其以異步方式使用指定的DOP執行它們。默認的DOP是1.
它本身確保您一次只能執行一次執行,並且後續請求將排隊。要根據計劃請求執行,您可以使用計時器將請求發送到該塊。
例如:
//Block field with gratuitous timestamp
ActionBlock<DateTime> _rebuildBlock;
_rebuildBlock=new ActionBlock<DateTime>(async dt=>await RebuildIndex(dt));
//From any thread:
_rebuildBlock.Post(DateTime.Now);
即足夠排隊和執行請求。默認DOP爲1時,一次只允許執行一次執行。
當你有沒有更多的請求發送,例如在應用程序終止,你告訴塊完成並等待它來處理任何掛起的請求:
_rebuildBlock.Complete();
await _rebuildBlock.Completion;
您可以創建一個類來抽象塊,或多個塊,例如:
class MyProcessor
{
ActionBlock<DateTime> _rebuildBlock;
MyProcessor()
{
_rebuildBlock=new ActionBlock<DateTime>(async dt=>await RebuildIndex(dt));
}
public void Rebuild()
{
_rebuildBlock.Post(DateTime.Now);
}
private async Task RebuildIndex(DateTime timestamp)
{
//...
}
public Task StopAsync()
{
_rebuildBlock.Complete();
return _rebuilcBlock.Completion;
}
}
的ActionBlock可以鏈接到在TPL數據流的命名空間的其它塊來創建的處理步驟,類似於Powershell的或SSIS管線的管線。
例如,流水線執行的CSV文件批量導入看起來是這樣的:
//Create the blocks
var folderBlock=new TransformManyBlock<string,string>(folder=>Directory.EnumerateFiles(folder));
var csvBlock=new TransformBlock<string,DataRow>(filePath=>ParseCsv(filePath));
var batchBlock=new BatchBlock<DataRow>(1000);
var dbBlock=new ActionBlock<DataRow[]>(rows=>RunSqlBulkCopy(rows));
//Link them
var options=new DataflowLinkOptions{PropagateCompletion=true};
folderBlock.LinkTo(csvBlock,options);
csvBlock.LinkTo(batchBlock,options);
batchBlock.LinkTo(dbBlock,options);
//Process 100 folders
foreach(var path in aLotOfFolders)
{
folderBlock.Post(path);
}
//Finished with the folders
folderBlock.Complete();
//Wait for the entire pipeline to complete
await dbBlock.Completion;
如果要排隊在同一時間只有一個要求,你可以創建一個管道,它包含一個BroadcastBlock和爲1的隊列長度的ActionBlock:
var execOptions = new ExecutionDataflowBlockOptions{BoundedCapacity=1};
var rebuildBlock=new ActionBlock<DateTime>(async dt=>await RebuildIndex(dt),execOptions);
var broadcast=new BroadcastBlock<DateTime>(msg=>msg);
var options=new DataflowLinkOptions{PropagateCompletion=true};
broadcast.LinkTo(rebuildBlock,options);
在這之後,任何張貼到廣播塊而Rebuild
執行將覆蓋任何以前的請求。
我不相信他希望他們都排隊。我想他希望最大隊列大小爲1,並且一旦達到該大小,任何新的請求都會得到單個排隊請求的結果。 –
Scott是正確的。另外,什麼是DOP? –
@EdwardBrey程度的Paralellism。這是使用[Sql Server]時使用的常用術語(https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-query#8f1c24b0454382764ba918fedf349ebb:0)(請參閱'MAXDOP'),並且可能Panagiotis從中挑選它。 –
我會使用線程,並簡單地終止(通過發送一個事件[!])一個正在運行的線程,並開始一個新的或可能的時候發送一個事件給我的線程,以指示他再次做一切。 – user743414
在這種情況下,支持增量更新。因此,不要放棄,最好運行完成,然後再次運行以獲取任何新的更改。 –
這應該也可以。您只需將您的線程指示爲「重新啓動」即可。首先你完成當前的運行,然後你的線程開始你的下一次(增量)運行。 – user743414