2015-05-29 29 views
2

我此刻已經得到的是一個觸發間隔爲5000毫秒計時:創建文件皮卡過程與封閉集合

static Timer _aTimer = new System.Timers.Timer(); 

    static void Main(string[] args) 
    { 
     _aTimer.Elapsed += new ElapsedEventHandler(OnTimedEvent); 

     _aTimer.Interval = 5000; 
     _aTimer.Enabled = true; 

     Console.WriteLine("Press \'q\' to quit the sample."); 
     while (Console.Read() != 'q') ; 

    } 

在火就接着設置隊列用於處理文件:

 private static void OnTimedEvent(object source, ElapsedEventArgs e) 
    { 
     // stop the timer so we dont reprocess files we already have in the queue 
     StopTimer(); 

     // setup a list of queues 
     var lists = new List<IncomingOrderQueue>(); 
     //get the accounts in which the files we are looking in 
     var accounts = new List<string>() { "Account1", "Account2" }; 
     //loop through the accounts and set up the queue 
     foreach (var acc in accounts) 
     { 
      // create the queue 
      var tmp = new IncomingOrderQueue(); 
      // for each file in the folders add it to be processed in the queue 
      foreach (var orderFile in OrderFiles(acc)) 
      { 
       tmp.EnqueueSweep(new QueueVariables() { Account = acc, File = orderFile }); 
      } 
      // add the queue to the list of queues 
      lists.Add(tmp); 
     } 
     // for each of the queues consume all the contents of them 
     Parallel.ForEach(lists, l => l.Consume()); 

     //start the timer back up again because we have finished all the files we have in the current queue 
     StartTimer(); 
    } 

     public static void StopTimer() 
    { 
     Console.WriteLine("Stop Timer"); 
     _aTimer.Stop(); 
     _aTimer.Enabled = false; 
    } 

    public static void StartTimer() 
    { 
     Console.WriteLine("Start Timer"); 
     _aTimer.Enabled = true; 
     _aTimer.Start(); 
    } 

阻塞隊列自身:

public class IncomingOrderQueue 
{ 
    BlockingCollection<QueueVariables> _orderQ = new BlockingCollection<QueueVariables>(); 

    public void EnqueueSweep(QueueVariables incoming) 
    { 
     // add items to the queue 
     _orderQ.Add(incoming); 
    } 

    public void Consume() 
    { 
     // stop anything been adding to the queue 
     _orderQ.CompleteAdding(); 
     // consume all the objects in the blocking collection 
     Parallel.ForEach(_orderQ.GetConsumingEnumerable(), Processor.Order.Object); 
    } 

    public int QueueCount 
    { 
     get 
     { 
      return _orderQ.Count; 
     } 
    } 
} 

我有什麼工作應該如何,啓動定時器 - >停止添呃 - >觸發收集文件夾內所有文件的過程 - >處理所有文件 - >重新啓動計時器。

我不能幫助,但認爲這是一個更好的辦法做即時通訊做特別是當那些即將在帳戶創建隊列數量爲200 - 400

感謝

回答

3

我想你不需要停下來開始你的生產者和消費者。 BlockingCollection可以阻止生產者,如果它達到最大容量,並阻止消費者,如果它是空的。

我也可能會開始一個BlockingCollection,直到分析表明我需要另一個。根據你的生產者和消費者的相對速度,你可能需要調整他們的數字。如果它們是IO綁定的,則它們應該是異步的,並且可以有很多,如果它們是CPU綁定的,則可能不會超過可用處理器的數量。

我重做了你的例子,假設IO綁定生產者和消費者,希望它給你一些想法。它以10秒爲間隔關閉生產商,並可繼續進行,直到您通過CanellationToken取消生產。只有在您取消並完成生產之後,您纔可以CompleteAdding發佈被阻止的消費者。

public class QueueVariables 
{ 
    public string Account {get;set;} 
    public string File {get;set;} 
} 

public static ConcurrentQueue<string> GetACcounts() 
{ 
    return new ConcurrentQueue<string>(new [] 
     { 
     "Account1", 
     "Account2", 
     "Account3", 
     "Account4", 
     "Account5", 
     "Account6", 
     "Account7", 
     "Account8", 
     "Account9", 
     "Account10", 
     "Account11", 
     "Account12", 
    }); 
} 

public static List<string> GetFiles(string acct) 
{ 
    return new List<string> 
    { 
     "File1", 
     "File2", 
     "File3", 
     "File4", 
     "File5", 
     "File6", 
     "File7", 
     "File8", 
     "File9", 
     "File10", 
     "File11", 
     "File12", 
    }; 
} 

public static async Task StartPeriodicProducers(int numProducers, TimeSpan period, CancellationToken ct) 
{ 
    while(!ct.IsCancellationRequested) 
    { 
     var producers = StartProducers(numProducers, ct); 

     // wait for production to finish 
     await Task.WhenAll(producers.ToArray()); 

     // wait before running again 
     Console.WriteLine("***Waiting " + period); 
     await Task.Delay(period, ct); 
    } 
} 

public static List<Task> StartProducers(int numProducers, CancellationToken ct) 
{ 
    List<Task> producingTasks = new List<Task>(); 
    var accounts = GetACcounts(); 

    for (int i = 0; i < numProducers; i++) 
    { 
     producingTasks.Add(Task.Run(async() => 
     { 
      string acct; 
      while(accounts.TryDequeue(out acct) && !ct.IsCancellationRequested) 
      { 
       foreach (var file in GetFiles(acct)) 
       { 
        _orderQ.Add(new UserQuery.QueueVariables{ Account = acct, File = file }); 
        Console.WriteLine("Produced Account:{0} File:{1}", acct, file); 
        await Task.Delay(50, ct); // simulate production delay 
       } 
      } 

      Console.WriteLine("Finished producing"); 
     })); 
    } 

    return producingTasks; 
} 

public static List<Task> StartConsumers(int numConsumers) 
{ 
    List<Task> consumingTasks = new List<Task>(); 

    for (int j = 0; j < numConsumers; j++) 
    { 
     consumingTasks.Add(Task.Run(async() => 
     { 
      try 
      { 
       while(true) 
       { 
        var queueVar = _orderQ.Take(); 
        Console.WriteLine("Consumed Account:{0} File:{1}", queueVar.Account, queueVar.File); 
        await Task.Delay(200); // simulate consumption delay 
       } 
      } 
      catch(InvalidOperationException) 
      { 
       Console.WriteLine("Finished Consuming"); 
      } 
     })); 
    } 

    return consumingTasks; 
} 

private static async Task MainAsync() 
{ 
    CancellationTokenSource cts = new CancellationTokenSource(); 
    var periodicProducers = StartPeriodicProducers(2, TimeSpan.FromSeconds(10), cts.Token); 
    var consumingTasks = StartConsumers(4); 

    await Task.Delay(TimeSpan.FromSeconds(120)); 

    // stop production 
    cts.Cancel(); 

    try 
    { 
     // wait for producers to finish producing 
     await periodicProducers; 
    } 
    catch(OperationCanceledException) 
    { 
     // operation was cancelled 
    } 

    // complete adding to release blocked consumers 
    _orderQ.CompleteAdding(); 

    // wait for consumers to finish consuming 
    await Task.WhenAll(consumingTasks.ToArray()); 
} 

// maximum size 10, after that capaicity is reached the producers block 
private static BlockingCollection<QueueVariables> _orderQ = new BlockingCollection<QueueVariables>(10); 

void Main() 
{ 
    MainAsync().Wait(); 
    Console.ReadLine(); 
} 

// Define other methods and classes here 
+0

這是一個很好的解決方案,但是一旦消費者完成了,他們就完全停止。我不熟悉任務的運作方式,我怎麼能夠無限期地踢這個?謝謝 – Houlahan

+0

您可以讓它們繼續運行,直到應用程序關閉,然後調用'_orderQ.CompleteAdding()',這將允許它們終止。或者,如果你創建一個新的'BlockingCollection',你可以再次調用'StartConsumers'。 –

+0

當然感謝您的幫助! – Houlahan