2012-04-09 64 views
16

交叉張貼到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9表觀BufferBlock.Post/Receive/ReceiveAsync種族/錯誤

我知道......我真的不使用TplDataflow其最大潛力。 ATM我只是使用BufferBlock作爲消息傳遞的安全隊列,其中生產者和消費者以不同的速率運行。我看到了一些奇怪的行爲,讓我難以接受如何進行 。

private BufferBlock<object> messageQueue = new BufferBlock<object>(); 

public void Send(object message) 
{ 
    var accepted=messageQueue.Post(message); 
    logger.Info("Send message was called qlen = {0} accepted={1}", 
    messageQueue.Count,accepted); 
} 

public async Task<object> GetMessageAsync() 
{ 
    try 
    { 
     var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30)); 
     //despite messageQueue.Count>0 next line 
     //occasionally does not execute 
     logger.Info("message received"); 
     //....... 
    } 
    catch(TimeoutException) 
    { 
     //do something 
    } 
} 

在上面的代碼(其是2000線的分佈式解決方案的一部分),Send被定期調用每100ms左右。這意味着一個項目是Post編輯到messageQueue每秒約10次。這已驗證。然而,偶爾看起來ReceiveAsync在超時時間內沒有完成(即Post不會導致ReceiveAsync完成)並且TimeoutException在30秒後上升。在這一點上,messageQueue.Count是在數百。這是意想不到的。在發帖速度較慢的情況下(1帖/秒)也觀察到這個問題,並且通常在1000個項目通過BufferBlock之前發生。

所以,要解決這個問題,我使用下面的代碼,它的工作原理,但在接收時(由於上述發生的錯誤)

public async Task<object> GetMessageAsync() 
    { 
     try 
     { 
      object m; 
      var attempts = 0; 
      for (; ;) 
      { 
       try 
       { 
        m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1)); 
       } 
       catch (TimeoutException) 
       { 
        attempts++; 
        if (attempts >= 30) throw; 
        continue; 
       } 
       break; 

      } 

      logger.Info("message received"); 
      //....... 
     } 
     catch(TimeoutException) 
     { 
      //do something 
     } 
    } 

這看起來像一個競爭條件有時會導致1秒的延遲在TDF給我,但我不能深究爲什麼這不會發生在其他地方,我以類似的方式使用BufferBlock。從ReceiveAsyncReceive的實驗更改沒有幫助。我沒有檢查,但我想孤立地看,上面的代碼完美地工作。這是我見過的「TPL數據流入門」tpldataflow.docx中記錄的模式。

我能做些什麼來達到這個目的?有沒有任何指標可以幫助推斷髮生了什麼?如果我無法創建可靠的測試用例,我可以提供哪些更多信息?

幫助!

+1

我沒有看到你正在做什麼或你的期望在這裏有什麼問題。我絕對認爲你需要在MSDN論壇上保持活躍狀態​​。你已經獲得了@StephenToub的關注,他絕對是你想要的人。 – 2012-04-11 17:13:50

+0

不是。從來沒有達到它的底部。我無法在一個小而獨立的例子中重現問題。因爲我只使用BufferBlock,所以我改爲使用自己的異步隊列實現。我不必更改任何其他代碼......我只是重新實現了我使用的BufferBlock接口的各個部分。現在工作,這讓我覺得有什麼不妥之處,但我無法證明它。格兒。 – spender 2012-09-18 00:44:49

+0

@spendor非常有趣,奇怪的是,我找到BufferBlock之後取消了自己的異步併發隊列實現...現在我必須重新考慮。謝謝。 – 2012-09-19 20:34:11

回答

1

斯蒂芬似乎認爲以下是解

變種米=等待messageQueue.ReceiveAsync();代替

變種米=等待messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));

你能確認或否定這一點嗎?

+0

這並沒有奏效。我選擇哪種ReceiveAsync超載並不重要,結果是一樣的。看到我的評論上面我的決議。 – spender 2012-09-18 00:46:31