1
比方說,我有一個數據服務類,它將數據逐批地批量提取給用戶。如何避免在下次完成之前調用onComplete?
public class DataService {
public IObservable<IList<T>> QuerySegmentedObservable<T>(string tableName) where T : TableEntity, new(){
return Observable.Create<IList<T>>(async (observer, token) =>{
TableContinuationToken continuationToken = null;
do{
var currentSegment = CallData();
observer.OnNext(currentSegment.Results);
continuationToken = currentSegment.ContinuationToken;
} while (continuationToken != null);
observer.OnCompleted();
}
}
}
我正在訂閱這個可觀察的如下。
public async Task<bool> MyMethod()
{
var tcs = new TaskCompletionSource<bool>();
var observable = _dataService.QuerySegmentedObservable<TSource>(_sourceTableName);
var dataCount = 0;
_databaseService.OpenConnection();
observable.Subscribe(async data =>
{
await _databaseService.DoSomething(data);
dataCount += data.Count;
Console.WriteLine($"Processing - {dataCount}");
},
err =>
{
Console.WriteLine($"Error - {err.Message}");
tcs.SetResult(false);
},
() =>
{
_databaseService.CloseConnection();
Console.WriteLine($"Finished");
tcs.SetResult(true);
}
);
return await tcs.Task;
}
問題是OnComplete()在最後的OnNext()完成之前調用。因此,在完成我在Subscribe()中完成的任務之前,我最終關閉了連接。
有什麼方法可以解決它嗎?謝謝。
嘗試從'onNext'塊中刪除異步操作。 – redent84
'_databaseService'的類型是什麼? – Enigmativity
@En這只是一個將數據插入數據庫的類,或者做一些可能需要幾秒鐘才能達到40秒的類。 –