2017-07-06 93 views
1

所以我有N個異步的,有時間戳的數據流。每個流都有一個固定的速率。我想處理所有的數據,但問題在於我必須按照儘可能接近數據的時間(它是一個實時流應用程序)處理數據。對N個數據流進行時間排序的算法

到目前爲止,我的實現是創建一個K消息的固定窗口,我使用優先級隊列按時間戳排序。然後,我按順序處理整個隊列,然後進入下一個窗口。這是可以的,但它並不理想,因爲它會產生與緩衝區大小成正比的延遲,並且如果在緩衝區處理結束後到達消息,有時也會導致丟失消息。它看起來是這樣的:

// Priority queue keeping track of the data in timestamp order. 
ThreadSafeProrityQueue<Data> q; 
// Fixed buffer size 
int K = 10; 
// The last successfully processed data timestamp 
time_t lastTimestamp = -1; 

// Called for each of the N data streams asyncronously 
void receiveAsyncData(const Data& dat) { 
    q.push(dat.timestamp, dat); 
    if (q.size() > K) { 
     processQueue(); 
    } 
} 

// Process all the data in the queue. 
void processQueue() { 
    while (!q.empty()) { 
     const auto& data = q.top(); 
     // If the data is too old, drop it. 
     if (data.timestamp < lastTimestamp) { 
      LOG("Dropping message. Too old."); 
      q.pop(); 
      continue; 
     } 
     // Otherwise, process it. 
     processData(data); 
     lastTimestamp = data.timestamp; 
     q.pop(); 
    } 
} 

關於數據的信息:他們一定會被自己的流中進行排序。他們的利率介於5至30赫茲之間。它們由圖像和其他數據組成。

一些爲什麼這比它看起來更難的例子。假設我有兩個流,A和B在1赫茲都在運行,我得到的數據按以下順序:

(stream, time) 
(A, 2) 
(B, 1.5) 
(A, 3) 
(B, 2.5) 
(A, 4) 
(B, 3.5) 
(A, 5) 

查看如何,如果我處理的時候,我收到他們的訂單數據,B會總是下降?這就是我想要避免的。現在在我的算法中,B每隔10幀就會丟棄一次,而我將以10幀的滯後時間處理數據。

+0

您的應用程序是多線程的嗎? (如果沒有,爲什麼不?) – rici

+0

是的,但接收端必須按順序處理數據。我不會指定具體的應用程序,但您可以將其想象爲像N個流式視頻源一樣以時間同步方式繪製到屏幕上的東西。 – mklingen

+0

因此,每個來源都能保證產生有序的流? – rici

回答

0

我會建議一個生產者/消費者結構。讓每個流都將數據放入隊列中,並有一個單獨的線程讀取隊列。那就是:

// your asynchronous update: 
void receiveAsyncData(const Data& dat) { 
    q.push(dat.timestamp, dat); 
} 

// separate thread that processes the queue 
void processQueue() 
{ 
    while (!stopRequested) 
    { 
     data = q.pop(); 
     if (data.timestamp >= lastTimestamp) 
     { 
      processData(data); 
      lastTimestamp = data.timestamp; 
     } 
    } 
} 

這可以防止您在處理批次時在當前實施中看到的「滯後」。

processQueue功能在一個單獨的,持久的線程上運行。 stopRequested是程序在關閉時設置的標誌 - 強制線程退出。有些人會爲此使用volatile標誌。我更喜歡使用類似手動重置事件的東西。

爲了使這項工作,你需要一個優先級隊列實現,允許併發更新,或者你需要一個同步鎖來包裝你的隊列中。特別是,當隊列爲空時,您要確保q.pop()等待下一個項目。或者,當隊列爲空時,您永遠不會撥打q.pop()。我不知道你的ThreadSafePriorityQueue的具體細節,所以我不能確切地說你會怎麼寫。

時間戳檢查仍然是必要的,因爲稍後的項目可能會在先前的項目之前被處理。例如:

  1. 從數據流1收到的事件,但線程在可以添加到隊列之前被換出。
  2. 從數據流2接收到的事件,並將其添加到隊列中。
  3. 來自數據流2的事件由processQueue函數從隊列中移除。
  4. 上述步驟1的線程獲取另一個時間片並將項添加到隊列中。

這是不尋常的,只是罕見。時差通常會在幾微秒左右。

如果您經常無法獲得更新,那麼您可以引入人爲延遲。例如,在更新後的問題中,您將顯示消息以500毫秒的順序發送。假設500毫秒是您想要支持的最大容差。也就是說,如果一條消息晚了500多毫秒,它就會被丟棄。

當您將事物添加到優先級隊列時,您所做的是將500毫秒添加到時間戳。那就是:

q.push(AddMs(dat.timestamp, 500), dat); 

而且在處理事物的循環,你不離隊的時間戳之前的東西。喜歡的東西:

while (true) 
{ 
    if (q.peek().timestamp <= currentTime) 
    { 
     data = q.pop(); 
     if (data.timestamp >= lastTimestamp) 
     { 
      processData(data); 
      lastTimestamp = data.timestamp; 
     } 
    } 
} 

這引入了所有項目的處理的500毫秒的延遲,但會阻止下降落在500毫秒的閾值內的「遲到」的更新。您必須平衡您對「實時」更新的渴望與防止更新丟失的願望。

+0

對不起,但這正是我寫的。這會一直放棄不同步的消息。將用一個爲什麼比看起來更難的例子來修改我的問題。 – mklingen

+0

@mklingen:查看我的更新。 –

+0

人工延遲是完全正確的方式,並解決我的問題。 – mklingen

0

總會有一個滯後,而這個滯後將取決於您願意等待最慢的「固定速率」流的時間。

建議:

  1. 保持緩衝器
  2. 保持布爾標誌的陣列與所述含義:「如果位置IX是真實的,在緩衝器中有至少一個樣品源自流IX」
  3. 排序/過程,只要你擁有所有標誌設置爲true

不是完整的證明(每個緩衝器進行排序,而是從一個緩衝到另一個你可能有時間戳反轉),但也許不夠好?

利用「滿意」標誌的計數來觸發處理(在步驟3)可以用於使滯後更小,但是存在更多的緩衝器間時間戳反轉的風險。在極端情況下,只接受一個滿意的標記就意味着「一收到它就立即推送一個框架,時間戳分類將被詛咒」。
我提到這個以支持我的觀點,即延遲/時間戳倒數平衡是您的問題所固有的 - 除了絕對相等的幀速率,將會有完美的解決方案,其中一方不會被犧牲。由於「解決方案」將是一種平衡行爲,因此任何解決方案都需要收集/使用額外的信息來幫助做出決定(例如「標誌陣列」)。如果我的建議對你的案例聽起來很愚蠢(很可能,你選擇分享的細節不是太多),開始考慮哪些指標與你的目標「體驗質量」水平相關,並使用額外的數據結構幫助收集/處理/使用這些指標。

+0

我認爲這是最接近正確的答案。將嘗試它,讓你知道它是如何去。 – mklingen

相關問題