我正在尋找使用IObservable
在請求響應環境中獲取響應的方法,並替換一些較老的基於回調的代碼,但我發現如果將值推入(Subject.OnNext
)到可觀察值但FirstAsync
尚未處於等待狀態,則FirstAsync
將永遠不會被提供該消息。獲取第一個IObservable事件而不阻塞想要它的線程/任務
是否有直接的方式使它工作,沒有第二個任務/線程加同步?
public async Task<ResponseMessage> Send(RequestMessage message)
{
var id = Guid.NewGuid();
var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout); // Never even gets invoked if response is too fast
await DoSendMessage(id, message);
return await ret; // Will sometimes miss the event/message
}
// somewhere else reading the socket in a loop
// may or may not be the thread calling Send
Inbound = subject.AsObservable();
while (cond)
{
...
subject.OnNext(message);
}
我不能把await
爲FirstAsync
只是之前我發送請求,因爲這將防止被髮送的請求。
使用TaskCompletionSource是一個選項嗎? – MistyK
會是什麼樣子?假設設置一個訂閱來設置一個tcs結果然後取消訂閱自己。然後做一個發送,然後等待tcs? –