2011-03-23 50 views
2

我正在嘗試使用CCR迭代器作爲需要並行處理大量數據饋送的任務的解決方案,其中需要按順序處理來自每個饋送的數據。沒有一個飼料是相互依賴的,所以按順序的處理可以平行進行。什麼是使用CCR按順序處理事件的有效方法?

下面是一個快速和骯髒的模型與一個整數飼料,它只需將整數端口以約1.5K /秒的速度推入端口,然後使用CCR迭代器將它們拉出以保持按順序處理保證。

class Program 
{ 
    static Dispatcher dispatcher = new Dispatcher(); 
    static DispatcherQueue dispatcherQueue = 
     new DispatcherQueue("DefaultDispatcherQueue", dispatcher); 
    static Port<int> intPort = new Port<int>(); 

    static void Main(string[] args) 
    { 
     Arbiter.Activate(
      dispatcherQueue, 
      Arbiter.FromIteratorHandler(new IteratorHandler(ProcessInts))); 

     int counter = 0; 
     Timer t = new Timer((x) => 
      { for(int i = 0; i < 1500; ++i) intPort.Post(counter++);} 
       , null, 0, 1000); 

     Console.ReadKey(); 
    } 

    public static IEnumerator<ITask> ProcessInts() 
    { 
     while (true) 
     { 
      yield return intPort.Receive(); 
      int currentValue; 
      if((currentValue = intPort) % 1000 == 0) 
      { 
       Console.WriteLine("{0}, Current Items In Queue:{1}", 
        currentValue, intPort.ItemCount); 
      } 
     } 
    } 
} 

讓我感到驚訝的是,CCR無法跟上Corei7盒子,隊列大小不斷增長。在另一個測量從Post()到Receive()的負載或〜100 Post/sec的延遲的測試中,每個批次中第一個Post()和Receive()之間的延遲約爲1ms。

我的樣機有什麼問題嗎?如果是這樣,使用CCR做這件事更好的方法是什麼?

+0

你能詳細說明CCR是什麼嗎? – 2011-03-23 00:51:20

+0

CCR是Communication and Coordination Runtime,一個專爲異步工作流設計的庫。它是Microsoft Robotics Studio的一部分,現在免費提供。考慮到最初的700美元限制了潛在的觀衆,以及TPL的思想推動,除了某些行業之外,它已被調控到陰影。儘管如此,CCR工作流程是新的異步框架的一部分,CCR Devs表示將在1月份「下個月儘快」發佈一項重大聲明。檢查CCR標籤以獲取更多信息。 – Chuu 2011-03-23 01:32:24

+0

提及「大公告」在哪裏? – spender 2011-04-06 22:35:20

回答

1

是的,我同意,這確實看起來很奇怪。您的代碼似乎最初可以順利執行,但在數千個項目之後,處理器使用率會上升到性能真正低下的地步。這讓我感到不安,並在框架中提出了一個問題。在玩完你的代碼之後,我無法確定爲什麼會出現這種情況。我建議把這個問題帶到Microsoft Robotics Forums,看看你是否可以讓George Chrysanthakopoulos(或者其他CCR大腦之一)告訴你問題是什麼。然而,我可以推測,你的代碼是非常低效的。

您正在處理來自端口的「彈出」項目的方式效率非常低。本質上,迭代器在Port中每次有消息時都會被喚醒,並且它只處理一條消息(儘管端口中可能會有幾百個消息),然後在控制傳回框架時掛起yield 。在接收器產生另一個「喚醒」迭代器的時候,許多消息已經填充了端口。從分派器中拉出一個線程來處理只有一個項目(當許多堆積在同時)幾乎肯定不是獲得良好吞吐量的最佳方式。

我修改了你的代碼,在yield之後,我們檢查Port是否有任何進一步的消息排隊並處理它們,從而在我們回到框架之前完全清空端口。我也重構你的代碼有些使用CcrServiceBase這簡化了一些你在做任務的語法:

internal class Test:CcrServiceBase 
{ 
    private readonly Port<int> intPort = new Port<int>(); 
    private Timer timer; 
    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue", 
              new Dispatcher(0, 
                  "dispatcher"))) 
    { 

    } 

    public void StartTest() { 
     SpawnIterator(ProcessInts); 
     var counter = 0; 
     timer = new Timer(x => 
          { 
           for (var i = 0; i < 1500; ++i) 
            intPort.Post(counter++); 
          } 
          , 
          null, 
          0, 
          1000); 
    } 

    public IEnumerator<ITask> ProcessInts() 
    { 
     while (true) 
     { 
      yield return intPort.Receive(); 
      int currentValue = intPort; 
      ReportCurrent(currentValue); 
      while(intPort.Test(out currentValue)) 
      { 
       ReportCurrent(currentValue); 
      } 
     } 
    } 

    private void ReportCurrent(int currentValue) 
    { 
     if (currentValue % 1000 == 0) 
     { 
      Console.WriteLine("{0}, Current Items In Queue:{1}", 
           currentValue, 
           intPort.ItemCount); 
     } 
    } 
} 

或者,你可以破除迭代器完全,因爲它不是真的很好用的例如(雖然我不能完全肯定這有處理的順序產生怎樣的影響):由數量級

internal class Test : CcrServiceBase 
{ 
    private readonly Port<int> intPort = new Port<int>(); 
    private Timer timer; 

    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue", 
              new Dispatcher(0, 
                  "dispatcher"))) 
    { 

    } 

    public void StartTest() 
    { 
     Activate(
      Arbiter.Receive(true, 
          intPort, 
          i => 
          { 
           ReportCurrent(i); 
           int currentValue; 
           while (intPort.Test(out currentValue)) 
           { 
            ReportCurrent(currentValue); 
           } 
          })); 
     var counter = 0; 
     timer = new Timer(x => 
          { 
           for (var i = 0; i < 500000; ++i) 
           { 
            intPort.Post(counter++); 
           } 
          } 
          , 
          null, 
          0, 
          1000); 
    } 



    private void ReportCurrent(int currentValue) 
    { 
     if (currentValue % 1000000 == 0) 
     { 
      Console.WriteLine("{0}, Current Items In Queue:{1}", 
           currentValue, 
           intPort.ItemCount); 
     } 
    } 
} 

兩個例子顯著增加吞吐量。希望這可以幫助。

+0

這個週末終於有時間再試驗一些,這些解決方案絕對是一個巨大的改進。儘管我對原始解決方案的性能衰退非常感興趣,但由於不瞭解它爲什麼會發生,意味着我可能會再次遇到它,但看起來官方CCR論壇對於該討論來說是更好的地方。 – Chuu 2011-04-16 23:42:00

相關問題