交叉張貼到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9表觀BufferBlock.Post/Receive/ReceiveAsync種族/錯誤
我知道......我真的不使用TplDataflow其最大潛力。 ATM我只是使用BufferBlock
作爲消息傳遞的安全隊列,其中生產者和消費者以不同的速率運行。我看到了一些奇怪的行爲,讓我難以接受如何進行 。
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
在上面的代碼(其是2000線的分佈式解決方案的一部分),Send
被定期調用每100ms左右。這意味着一個項目是Post
編輯到messageQueue
每秒約10次。這已驗證。然而,偶爾看起來ReceiveAsync
在超時時間內沒有完成(即Post
不會導致ReceiveAsync
完成)並且TimeoutException
在30秒後上升。在這一點上,messageQueue.Count
是在數百。這是意想不到的。在發帖速度較慢的情況下(1帖/秒)也觀察到這個問題,並且通常在1000個項目通過BufferBlock
之前發生。
所以,要解決這個問題,我使用下面的代碼,它的工作原理,但在接收時(由於上述發生的錯誤)
public async Task<object> GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ;)
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
這看起來像一個競爭條件有時會導致1秒的延遲在TDF給我,但我不能深究爲什麼這不會發生在其他地方,我以類似的方式使用BufferBlock
。從ReceiveAsync
到Receive
的實驗更改沒有幫助。我沒有檢查,但我想孤立地看,上面的代碼完美地工作。這是我見過的「TPL數據流入門」tpldataflow.docx中記錄的模式。
我能做些什麼來達到這個目的?有沒有任何指標可以幫助推斷髮生了什麼?如果我無法創建可靠的測試用例,我可以提供哪些更多信息?
幫助!
我沒有看到你正在做什麼或你的期望在這裏有什麼問題。我絕對認爲你需要在MSDN論壇上保持活躍狀態。你已經獲得了@StephenToub的關注,他絕對是你想要的人。 – 2012-04-11 17:13:50
不是。從來沒有達到它的底部。我無法在一個小而獨立的例子中重現問題。因爲我只使用BufferBlock,所以我改爲使用自己的異步隊列實現。我不必更改任何其他代碼......我只是重新實現了我使用的BufferBlock接口的各個部分。現在工作,這讓我覺得有什麼不妥之處,但我無法證明它。格兒。 – spender 2012-09-18 00:44:49
@spendor非常有趣,奇怪的是,我找到BufferBlock之後取消了自己的異步併發隊列實現...現在我必須重新考慮。謝謝。 – 2012-09-19 20:34:11