2014-10-17 46 views
1

有些人可能已經注意到我試圖實現一個環形緩衝區。我希望在數據結構中有一定的安全措施,同時不會失去太多的效率。有效的邊界檢查環緩衝區

我現在的解決方案實現了兩種類型的計數器,一種是直接偏移到緩衝存儲器中的索引和一個序列號,它只是一個size_t類型的計數器。序列號被迭代器用來訪問環形緩衝區。因此,環形緩衝區必須在每個緩衝區訪問時從序列號轉換爲緩衝區索引。這通常是相當有效的:

size_t offset = seqNum - m_tailSeq; 
size_t index = (m_tailIdx + offset) % m_size; 

其中seqNum是要轉換的序列號,m_tailSeq是在緩衝器中的最舊的元素的序列號,m_tailIdx是最古老的元件在緩衝器和m_size緩衝器索引是緩衝存儲器的大小。

但是,如果我不斷向緩衝區添加足夠長的元素,序列號將會溢出。所以我必須檢查這個。當我這樣做,我的短暫的甜蜜轉換變成這個怪物:

size_type getIndex(size_type seqNum) const 
{ 
    size_type headSeq = m_tailSeq + m_numElements; 

    // sequence does not wrap around 
    if (m_tailSeq < headSeq) 
    { 
     // bounds check 
     if(m_tailSeq <= seqNum && seqNum < headSeq) { 
      size_type offset = seqNum - m_tailSeq; 
      return (m_tailIdx + offset) % m_size; 
     } else { 
      throw BaseException("RingBuffer: access out of bounds", __FILE__, __LINE__); 
     } 
    } 
    // sequence does wrap around 
    else if (headSeq < m_tailSeq) 
    { 
     //bounds check (inverted from above) 
     if(seqNum < headSeq) { 
      size_type offset = (SIZE_TYPE_MAX - m_tailSeq) + seqNum; 
      return (m_tailIdx + offset) % m_size; 
     } else if (seqNum >= m_tailSeq) { 
      size_type offset = seqNum - m_tailSeq; 
      return (m_tailIdx + offset) % m_size; 
     } else { 
      throw BaseException("RingBuffer: access out of bounds", __FILE__, __LINE__); 
     } 
    } 
    else if (isEmpty()) { 
     throw BaseException("RingBufferIterator: accessing empty buffer", __FILE__, __LINE__); 
    } 
} 

這相當於兩個整數加法,一個整數減法,三個整數比較,和一個模運算在最好的情況下上有史以來單緩衝區訪問。不用說,迭代緩衝區變得非常昂貴。但是,由於我想在高性能場景(即軟實時應用程序中的事件隊列)中使用此緩衝區,我希望此數據結構儘可能高效。

當前用例將作爲事件緩衝區。一個(或可能不止一個)系統會將事件寫入緩衝區,其他系統(多於一個)會按照自己的步調處理這些事件而不刪除它們。當緩衝區已滿時,舊的事件將被簡單覆蓋。通過這種方式,我總是記錄最近幾百次事件,不同的系統可以按照各自的更新速率覆蓋它們,並挑選與它們相關的事件。不同的系統會保持一個指向環形緩衝區的迭代器,以便他們知道上次停止的位置以及恢復的位置。當系統開始處理事件時,需要確定其迭代器是否仍然有效或是否已被覆蓋。事件可能一次以大塊處理,所以遞增和解引用應該很快。所以基本上我們在潛在的多線程上下文中尋找MPMC環形緩衝區。

我自己想出的唯一解決方案是將錯誤檢查的負擔移到緩衝區的用戶。即用戶必須首先檢查(通過某種方式)其迭代器是否進入緩衝區是有效的,確保緩衝區的一段延伸保持有效,然後在沒有進一步檢查的情況下遍歷該延伸。然而,這看起來很容易出錯,因爲我必須在程序的多個部分檢查訪問的安全性,而不僅僅是一個地方,如果我決定讓緩衝區線程安全,它會變得毛茸茸的。

我錯過了什麼嗎?這能做得更好嗎?我是否犯了一些初學者的錯誤?

+2

把你的邊界檢查下定義哪些可以啓用/禁用取決於調試模式或不。例如,只有矢量 ::在保證界限檢查。但是許多編譯器在調試模式下檢查矢量 :: operator []。 – 2014-10-17 16:07:49

+0

查看網絡協議中的序列號。在C++中定義的無符號整數溢出。 – magras 2014-10-17 16:29:23

+0

您可能不想使用例外情況。此外,分支指令可能比指令更昂貴。分工/模數也可能很昂貴。最好的建議可能是做準確的基準測試。你可能想看一看[ring buffer](https://github.com/LMAX-Exchange/disruptor)的開源實現,看看實現它們的不同方式。 – Jason 2014-10-17 20:28:33

回答

1

正如我在評論unsigned integer overflow is well defined operation中提到的那樣。在C++中實現高效的序列號是關鍵。所以我們可以簡單地減去兩個無符號整數來獲得距離。然後只是將距離向前移動到實現具有邊界檢查的索引訪問的函數。一如既往,它將工作,而所有可能的指標都低於序列號最大值的一半。

#include <array> 
#include <climits> 
#include <iostream> 

unsigned int const SEQUENCE_NUMBER_FIRST = UINT_MAX-10; 

class RingBuffer 
{ 
public: 
    void PushBack(char c) 
    { 
     GetBySeqNumber(m_tailSeq++) = c; 
     if(Size() == m_buffer.size()+1) 
      PopFront(); 
    } 
    void PopFront() 
    { 
     ++m_headSeq; 
     if(++m_offset % m_buffer.size() == 0) 
      m_offset = 0; 
    } 
    char& GetByIndex(size_t n) 
    { 
     if(n >= Size()) 
      throw std::out_of_range("Hello, world!"); 
     return m_buffer[ (n+m_offset) % m_buffer.size() ]; 
    } 
    char& GetBySeqNumber(unsigned int n) 
    { 
     // It is well defined operation in C++, 
     // but if you try to use signed integer 
     // it will become undefined behavior 
     return GetByIndex(n-m_headSeq); 
    } 
    size_t Size() const 
    { 
     return m_tailSeq - m_headSeq; 
    } 
private: 
    size_t m_offset = 0; 
    unsigned int m_headSeq = SEQUENCE_NUMBER_FIRST; 
    unsigned int m_tailSeq = SEQUENCE_NUMBER_FIRST; 
    std::array<char,26> m_buffer; 
}; 

int main() 
{ 
    // initialize 
    RingBuffer buf; 
    for(char i=0; i<26; ++i) 
     buf.PushBack('a'+i); 

    // access trough sequence numbers 
    // add or subtract one to get out of range exception 
    for(unsigned int i=0; i<buf.Size(); ++i) 
     std::cout << buf.GetBySeqNumber(SEQUENCE_NUMBER_FIRST+i); 
    std::cout << std::endl; 

    // push some more to overwrite first 10 values 
    for(char i=0; i<10; ++i) 
     buf.PushBack('0'+i); 

    // access trough indexes 
    // add or subtract one to get out of range exception 
    for(size_t i=0; i<buf.Size(); ++i) 
     std::cout << buf.GetByIndex(i); 
    std::cout << std::endl; 

    return 0; 
} 
+0

恐怕我看不出這是如何工作的。據我所知,它只在'head_'引用'buffer_ [0]'時有效。但是,環形緩衝區的想法是,數據基本上可以在數組中的任何位置開始並且只是繞過。如果你不斷添加數據,數據的「開始」就會移動。那麼你的代碼如何處理有效數據從'buffer_ [5]開始'並且一直到'buffer_ [4]'開始的情況呢? – MadMonkey 2014-10-18 15:52:38

+0

你沒有明白我的觀點。當然,你需要將適當的範圍檢查插入'GetByIndex()'。所以我提供了完整版本的環形緩衝區。 – magras 2014-10-18 17:33:43

+0

哇,我花了很長時間纔看到它,但現在我知道你的意思了!這工作出色! – MadMonkey 2014-10-18 22:30:25