說我有一個類型T
:在無擴展預定義的順序排序可觀察
class T {
public int identifier; //Arbitrary but unique for each character (Guids in real-life)
public char character; //In real life not a char, but I chose char here for easy demo purposes
}
而且我有標識的預定義的有序序列:
int[] identifierSequence = new int[]{
9, 3, 4, 4, 7
};
我現在需要訂購的IObservable<T>
這產生以下對象序列:
{identifier: 3, character 'e'},
{identifier: 9, character 'h'},
{identifier: 4, character 'l'},
{identifier: 4, character 'l'},
{identifier: 7, character 'o'}
因此,生成的IObservable生成hello
。 我不想使用ToArray,因爲我希望一到它就接收對象,而不是等到一切都被觀察到。 更具體地說,我希望得到他們這樣的:
Input: e h l l o
Output: he l l o
什麼是做到這一點的正確反應呢? 我能想出的最好的是這樣的:
Dictionary<int, T> buffer = new Dictionary<int, T>();
int curIndex = 0;
inputObserable.SelectMany(item =>
{
buffer[item.identifier] = item;
IEnumerable<ReportTemplate> GetReadyElements()
{
while (true)
{
int nextItemIdentifier = identifierSequence[curIndex];
T nextItem;
if (buffer.TryGetValue(nextItemIdentifier, out nextItem))
{
buffer.Remove(nextItem.identifier);
curIndex++;
yield return nextItem;
}
else
{
break;
}
}
}
return GetReadyElements();
});
編輯:
Schlomo提出了一些非常有效的問題,我的代碼,這就是爲什麼我標誌着他的答案是正確的。我做了一些修改,他的代碼,它是可用:
- 通用標識和對象類型
- 迭代而不是遞歸以防止在非常大的觀測
- 潛在計算器轉換匿名類型的實際類可讀性
- 只要有可能,查找只有一次在字典中的值,並存儲爲變量,而不是看它多次
- 固定式
這給了我下面的代碼:
public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
{
var initialState = new OrderByIdentifierSequenceState<T, TId>(0, ImmutableDictionary<TId, ImmutableList<T>>.Empty, Enumerable.Empty<T>());
return source.Scan(initialState, (oldState, item) =>
{
//Function to be called upon receiving new item
//If we can pattern match the first item, then it is moved into Output, and concatted continuously with the next possible item
//Otherwise, if nothing is available yet, just return the input state
OrderByIdentifierSequenceState<T, TId> GetOutput(OrderByIdentifierSequenceState<T, TId> state)
{
int index = state.Index;
ImmutableDictionary<TId, ImmutableList<T>> buffer = state.Buffer;
IList<T> output = new List<T>();
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
ImmutableList<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.IsEmpty)
{
//No values available yet
break;
}
T toOutput = nextValues[nextValues.Count - 1];
output.Add(toOutput);
buffer = buffer.SetItem(key, nextValues.RemoveAt(nextValues.Count - 1));
index++;
}
return new OrderByIdentifierSequenceState<T, TId>(index, buffer, output);
}
//Before calling the recursive function, add the new item to the buffer
TId itemIdentifier = identifierFunc(item);
ImmutableList<T> valuesList;
if (!oldState.Buffer.TryGetValue(itemIdentifier, out valuesList))
{
valuesList = ImmutableList<T>.Empty;
}
var remodifiedBuffer = oldState.Buffer.SetItem(itemIdentifier, valuesList.Add(item));
return GetOutput(new OrderByIdentifierSequenceState<T, TId>(oldState.Index, remodifiedBuffer, Enumerable.Empty<T>()));
})
// Use Dematerialize/Notifications to detect and emit end of stream.
.SelectMany(output =>
{
var notifications = output.Output
.Select(item => Notification.CreateOnNext(item))
.ToList();
if (output.Index == identifierSequence.Count)
{
notifications.Add(Notification.CreateOnCompleted<T>());
}
return notifications;
})
.Dematerialize();
}
class OrderByIdentifierSequenceState<T, TId>
{
//Index shows what T we're waiting on
public int Index { get; }
//Buffer holds T that have arrived that we aren't ready yet for
public ImmutableDictionary<TId, ImmutableList<T>> Buffer { get; }
//Output holds T that can be safely emitted.
public IEnumerable<T> Output { get; }
public OrderByIdentifierSequenceState(int index, ImmutableDictionary<TId, ImmutableList<T>> buffer, IEnumerable<T> output)
{
this.Index = index;
this.Buffer = buffer;
this.Output = output;
}
}
然而,該代碼仍然有幾個問題:
- 國家(主要是
ImmutableDictionary
)的不斷複製,這可能是非常昂貴的。可能的解決方案:爲每個觀察者保持一個單獨的狀態,而不是每個項目收到。 - 當源觀察中不存在
identifierSequence
中的一個或多個元素時,會出現問題。這目前阻止了有序的觀察,它永遠不會完成。可能的解決方案:超時,當源可觀察性完成時拋出異常,當源可觀察性完成時返回所有可用項,... - 當源observable包含的元素多於
identifierSequence
時,我們得到內存泄漏。可觀察源中但不在identifierSequence
中的項目目前已添加到字典中,但不會在源觀察者完成之前被刪除。這是潛在的內存泄漏。可能的解決方案:在將其添加到字典之前,檢查項目是否在identifierSequence
之前,繞過代碼並立即輸出項目...
我的解決方案:
/// <summary>
/// Takes the items from the source observable, and returns them in the order specified in identifierSequence.
/// If an item is missing from the source observable, the returned obserable returns items up until the missing item and then blocks until the source observable is completed.
/// All available items are then returned in order. Note that this means that while a correct order is guaranteed, there might be missing items in the result observable.
/// If there are items in the source observable that are not in identifierSequence, these items will be ignored.
/// </summary>
/// <typeparam name="T">The type that is produced by the source observable</typeparam>
/// <typeparam name="TId">The type of the identifiers used to uniquely identify a T</typeparam>
/// <param name="source">The source observable</param>
/// <param name="identifierSequence">A list of identifiers that defines the sequence in which the source observable is to be ordered</param>
/// <param name="identifierFunc">A function that takes a T and outputs its unique identifier</param>
/// <returns>An observable with the same elements as the source, but ordered by the sequence of items in identifierSequence</returns>
public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (identifierSequence == null)
{
throw new ArgumentNullException(nameof(identifierSequence));
}
if (identifierFunc == null)
{
throw new ArgumentNullException(nameof(identifierFunc));
}
if (identifierSequence.Count == 0)
{
return Observable.Empty<T>();
}
HashSet<TId> identifiersInSequence = new HashSet<TId>(identifierSequence);
return Observable.Create<T>(observer =>
{
//current index of pending item in identifierSequence
int index = 0;
//buffer of items we have received but are not ready for yet
Dictionary<TId, List<T>> buffer = new Dictionary<TId, List<T>>();
return source.Select(
item =>
{
//Function to be called upon receiving new item
//We search for the current pending item in the buffer. If it is available, we yield return it and repeat.
//If it is not available yet, stop.
IEnumerable<T> GetAvailableOutput()
{
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
List<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
{
//No values available yet
break;
}
yield return nextValues[nextValues.Count - 1];
nextValues.RemoveAt(nextValues.Count - 1);
index++;
}
}
//Get the identifier for this item
TId itemIdentifier = identifierFunc(item);
//If this item is not in identifiersInSequence, we ignore it.
if (!identifiersInSequence.Contains(itemIdentifier))
{
return Enumerable.Empty<T>();
}
//Add the new item to the buffer
List<T> valuesList;
if (!buffer.TryGetValue(itemIdentifier, out valuesList))
{
valuesList = new List<T>();
buffer[itemIdentifier] = valuesList;
}
valuesList.Add(item);
//Return all available items
return GetAvailableOutput();
})
.Subscribe(output =>
{
foreach (T cur in output)
{
observer.OnNext(cur);
}
if (index == identifierSequence.Count)
{
observer.OnCompleted();
}
},(ex) =>
{
observer.OnError(ex);
},() =>
{
//When source observable is completed, return the remaining available items
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
List<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
{
//No values available
index++;
continue;
}
observer.OnNext(nextValues[nextValues.Count - 1]);
nextValues.RemoveAt(nextValues.Count - 1);
index++;
}
//Mark observable as completed
observer.OnCompleted();
});
});
}
感謝您的回答,尤其是您使用我的代碼發現的問題。我喜歡你解決這些問題的方式,但是我無法承受不斷複製ImmutableDictionary的性能開銷。我已經更新了我的初始文章,其中包含一個修改後的代碼版本,用於維護每個觀察者的狀態,而不是每個收到的項目。 – Wouter