2017-05-03 42 views
1

如何構造一個異步任務,以便一次只能運行一個任務實例?如果任務在前一個實例運行時被調用一次或多次,則先前的實例應該完成,然後該任務應該再運行一次。線程安全異步非重入任務

任務調用可以來自任何線程。該任務不帶參數並且沒有結果;調用方法簽名是這樣的:Task DoItAsync()

這種按需,不可重入任務的用例包括執行後臺索引和服務器同步。

+0

我會使用線程,並簡單地終止(通過發送一個事件[!])一個正在運行的線程,並開始一個新的或可能的時候發送一個事件給我的線程,以指示他再次做一切。 – user743414

+0

在這種情況下,支持增量更新。因此,不要放棄,最好運行完成,然後再次運行以獲取任何新的更改。 –

+0

這應該也可以。您只需將您的線程指示爲「重新啓動」即可。首先你完成當前的運行,然後你的線程開始你的下一次(增量)運行。 – user743414

回答

1

這是一個包裝器,它包含要運行的動作並負責根據需要運行它,以便在完成運行完成後收到調用者通知。

/// <summary> 
/// Runs an asynchronous action such that at most one instance of the action runs at a time. 
/// If the action is invoked one or more times while a previous instance is running, 
/// the previous instance completes, and then the action runs one additional time. 
/// </summary> 
public class RepeatableActionRunner 
{ 
    enum RunState { NotRunning, RunningOnce, RunningAndWillRunAgain }; 

    readonly Func<Task> action; 
    RunState runState; 
    Task currentTask = Task.CompletedTask; 
    Task nextTask = Task.CompletedTask; 
    readonly object lockObject = new object(); 

    public RepeatableActionRunner(Func<Task> action) 
    { 
     this.action = action; 
    } 

    /// <summary> 
    /// Runs the action and returns a task that completes when the action completes. 
    /// </summary> 
    /// <remarks>This method is thread safe.</remarks> 
    public Task RunAsync() 
    { 
     lock (lockObject) { 
      switch (runState) { 
       case RunState.NotRunning: 
        return StartTaskAsync(); 
       case RunState.RunningAndWillRunAgain: 
        return nextTask; 
       default: 
        runState = RunState.RunningAndWillRunAgain; 
        return nextTask = currentTask.ContinueWith(_ => { 
         lock (lockObject) 
          return StartTaskAsync(); 
        }).Unwrap(); 
      } 
     } 
    } 

    Task StartTaskAsync() 
    { 
     runState = RunState.RunningOnce; 
     return currentTask = action().ContinueWith(_ => { 
      lock (lockObject) 
       runState = runState - 1; 
     }); 
    } 
} 
+1

唯一要注意的是,如果'DoItNowAsync'在第一個'await'之前有很長時間沒有等待代碼,你可以長時間保持鎖。你可能要考慮切換到'SemaphoreSlim'並使用'await someSemaphoreObject.WaitAsync()'而不是'lock(someLockObject)' –

+0

此外,你需要在'ContinueWith'後面加一個'.Unwrap()'來創建'nextTask'保持正確的東西。目前它返回一個'Task ',調用'.Unwrap()'將它轉換爲一個適合你的'Task',可以在'nextTask'中持有 –

+0

難道你不能使用'ActionBlock < T>'嗎?它本身涵蓋除「經常性」以外的所有要求。定期向其發佈請求的計時器也將覆蓋此問題 –

1

這是使用旗語等,所以如果我們確實需要等待鎖定變爲免費,我們等待asyncronously的Edward's original answer一個扭捏版本。

readonly SemaphoreSlim _someSemaphore = new SemaphoreSlim(1); 
Task _currentTask = Task.CompletedTask; 
Task _nextTask = Task.CompletedTask; 

public async Task DoItAsync() 
{ 
    Task taskToAwait; 
    await _someSemaphore.WaitAsync(); 
    try 
    { 
     if (!_nextTask.IsCompleted) 
     { 
      taskToAwait = _nextTask; 
     } 
     else if(_currentTask.IsCompleted) 
     { 
      taskToAwait = _currentTask = DoItNowAsync(null); 
     } 
     else 
     { 
      taskToAwait = _nextTask = _currentTask.ContinueWith(DoItNowAsync).Unwrap(); 
     } 
    } 
    finally 
    { 
     _someSemaphore.Release(); 
    } 

    await taskToAwait; 
} 

async Task DoItNowAsync(Task _) 
{ 
    // Do the work, including async operations. 
} 
+0

我不認爲這將工作出於同樣的原因,我的答案的早期版本將無法正常工作。除了Damien發現的競爭條件之外,還存在一個問題,即當_nextTask不會觸發完全重新運行時,請求重新運行該動作。 –

0

ActionBlock< T>類已經可以讓你請求發送給塊並將其以異步方式使用指定的DOP執行它們。默認的DOP是1.

它本身確保您一次只能執行一次執行,並且後續請求將排隊。要根據計劃請求執行,您可以使用計時器將請求發送到該塊。

例如:

//Block field with gratuitous timestamp 
ActionBlock<DateTime> _rebuildBlock; 
_rebuildBlock=new ActionBlock<DateTime>(async dt=>await RebuildIndex(dt)); 

//From any thread: 

_rebuildBlock.Post(DateTime.Now); 

即足夠排隊和執行請求。默認DOP爲1時,一次只允許執行一次執行。

當你有沒有更多的請求發送,例如在應用程序終止,你告訴塊完成並等待它來處理任何掛起的請求:

_rebuildBlock.Complete(); 
await _rebuildBlock.Completion; 

您可以創建一個類來抽象塊,或多個塊,例如:

class MyProcessor 
{ 
    ActionBlock<DateTime> _rebuildBlock; 

    MyProcessor() 
    { 
     _rebuildBlock=new ActionBlock<DateTime>(async dt=>await RebuildIndex(dt)); 
    } 

    public void Rebuild() 
    { 
     _rebuildBlock.Post(DateTime.Now); 
    } 

    private async Task RebuildIndex(DateTime timestamp) 
    { 
     //... 
    } 

    public Task StopAsync() 
    { 
     _rebuildBlock.Complete(); 
     return _rebuilcBlock.Completion; 
    } 
} 

的ActionBlock可以鏈接到在TPL數據流的命名空間的其它塊來創建的處理步驟,類似於Powershell的或SSIS管線的管線。

例如,流水線執行的CSV文件批量導入看起來是這樣的:

//Create the blocks 
var folderBlock=new TransformManyBlock<string,string>(folder=>Directory.EnumerateFiles(folder)); 
var csvBlock=new TransformBlock<string,DataRow>(filePath=>ParseCsv(filePath)); 
var batchBlock=new BatchBlock<DataRow>(1000); 
var dbBlock=new ActionBlock<DataRow[]>(rows=>RunSqlBulkCopy(rows)); 

//Link them 
var options=new DataflowLinkOptions{PropagateCompletion=true}; 
folderBlock.LinkTo(csvBlock,options); 
csvBlock.LinkTo(batchBlock,options); 
batchBlock.LinkTo(dbBlock,options); 

//Process 100 folders 
foreach(var path in aLotOfFolders) 
{ 
    folderBlock.Post(path); 
} 

//Finished with the folders 
folderBlock.Complete(); 
//Wait for the entire pipeline to complete 
await dbBlock.Completion; 

如果要排隊在同一時間只有一個要求,你可以創建一個管道,它包含一個BroadcastBlock和爲1的隊列長度的ActionBlock:

var execOptions = new ExecutionDataflowBlockOptions{BoundedCapacity=1}; 

var rebuildBlock=new ActionBlock<DateTime>(async dt=>await RebuildIndex(dt),execOptions); 
var broadcast=new BroadcastBlock<DateTime>(msg=>msg);  

var options=new DataflowLinkOptions{PropagateCompletion=true}; 
broadcast.LinkTo(rebuildBlock,options); 

在這之後,任何張貼到廣播塊而Rebuild執行將覆蓋任何以前的請求。

+0

我不相信他希望他們都排隊。我想他希望最大隊列大小爲1,並且一旦達到該大小,任何新的請求都會得到單個排隊請求的結果。 –

+0

Scott是正確的。另外,什麼是DOP? –

+0

@EdwardBrey程度的Paralellism。這是使用[Sql Server]時使用的常用術語(https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-query#8f1c24b0454382764ba918fedf349ebb:0)(請參閱'MAXDOP'),並且可能Panagiotis從中挑選它。 –