所以我有N個異步的,有時間戳的數據流。每個流都有一個固定的速率。我想處理所有的數據,但問題在於我必須按照儘可能接近數據的時間(它是一個實時流應用程序)處理數據。對N個數據流進行時間排序的算法
到目前爲止,我的實現是創建一個K消息的固定窗口,我使用優先級隊列按時間戳排序。然後,我按順序處理整個隊列,然後進入下一個窗口。這是可以的,但它並不理想,因爲它會產生與緩衝區大小成正比的延遲,並且如果在緩衝區處理結束後到達消息,有時也會導致丟失消息。它看起來是這樣的:
// Priority queue keeping track of the data in timestamp order.
ThreadSafeProrityQueue<Data> q;
// Fixed buffer size
int K = 10;
// The last successfully processed data timestamp
time_t lastTimestamp = -1;
// Called for each of the N data streams asyncronously
void receiveAsyncData(const Data& dat) {
q.push(dat.timestamp, dat);
if (q.size() > K) {
processQueue();
}
}
// Process all the data in the queue.
void processQueue() {
while (!q.empty()) {
const auto& data = q.top();
// If the data is too old, drop it.
if (data.timestamp < lastTimestamp) {
LOG("Dropping message. Too old.");
q.pop();
continue;
}
// Otherwise, process it.
processData(data);
lastTimestamp = data.timestamp;
q.pop();
}
}
關於數據的信息:他們一定會被自己的流中進行排序。他們的利率介於5至30赫茲之間。它們由圖像和其他數據組成。
一些爲什麼這比它看起來更難的例子。假設我有兩個流,A和B在1赫茲都在運行,我得到的數據按以下順序:
(stream, time)
(A, 2)
(B, 1.5)
(A, 3)
(B, 2.5)
(A, 4)
(B, 3.5)
(A, 5)
查看如何,如果我處理的時候,我收到他們的訂單數據,B會總是下降?這就是我想要避免的。現在在我的算法中,B每隔10幀就會丟棄一次,而我將以10幀的滯後時間處理數據。
您的應用程序是多線程的嗎? (如果沒有,爲什麼不?) – rici
是的,但接收端必須按順序處理數據。我不會指定具體的應用程序,但您可以將其想象爲像N個流式視頻源一樣以時間同步方式繪製到屏幕上的東西。 – mklingen
因此,每個來源都能保證產生有序的流? – rici