2017-07-14 60 views
1

我正在尋找使用IObservable在請求響應環境中獲取響應的方法,並替換一些較老的基於回調的代碼,但我發現如果將值推入(Subject.OnNext)到可觀察值但FirstAsync尚未處於等待狀態,則FirstAsync將永遠不會被提供該消息。獲取第一個IObservable事件而不阻塞想要它的線程/任務

是否有直接的方式使它工作,沒有第二個任務/線程加同步?

public async Task<ResponseMessage> Send(RequestMessage message) 
{ 
    var id = Guid.NewGuid(); 
    var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout); // Never even gets invoked if response is too fast 
    await DoSendMessage(id, message); 
    return await ret; // Will sometimes miss the event/message 
} 

// somewhere else reading the socket in a loop 
// may or may not be the thread calling Send 
Inbound = subject.AsObservable(); 
while (cond) 
{ 
    ... 
    subject.OnNext(message); 
} 

我不能把awaitFirstAsync只是之前我發送請求,因爲這將防止被髮送的請求。

+0

使用TaskCompletionSource是一個選項嗎? – MistyK

+0

會是什麼樣子?假設設置一個訂閱來設置一個tcs結果然後取消訂閱自己。然後做一個發送,然後等待tcs? –

回答

1

await將訂閱observable。你可以通過調用ToTask獨立於await訂閱:

public async Task<ResponseMessage> Send(RequestMessage message) 
{ 
    var id = Guid.NewGuid(); 
    var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout).ToTask(); 
    await DoSendMessage(id, message); 
    return await ret; 
} 
+0

推測這是https://msdn.microsoft.com/en-us/library/system.reactive.threading.tasks.taskobservableextensions.totask(v=vs.103).aspx?說明似乎並沒有真正解釋這一點。但我是否明白,Where,FirstAsync等鏈條就像是「等待」,ToTask,await(或其他什麼東西)使其處於活動狀態並開始觀察事件? –

+0

是的。和其他的LINQ結構一樣,Rx和它的運算符一起構建一個「定義」,並且它只在訂閱時執行(在這種情況下,「等待」可觀察或者轉換爲「任務」)。 –

1

我接過定睛一看,有一個很簡單的解決方案,您的問題將只是轉換成熱的冷觀察到。將Subject替換爲ReplaySubject。這裏是文章:http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html

這裏的解釋是:

重播擴展方法允許你利用現有的可觀測 序列,並給它「重播」語義每ReplaySubject。作爲 提醒,ReplaySubject將緩存所有值,以便任何晚期的訂購者都能獲得所有的值。