2012-01-18 273 views
25

我讀得越多,我就變得越困惑...我會認爲找到一個在C++中實現的正式正確的mpsc隊列是微不足道的。C++中是否存在多生產者單消費者無鎖隊列?

每當我找到另一個刺它,進一步研究似乎表明有如ABA或其他微妙的競爭條件的問題。

很多人都在談論垃圾收集的必要性。這是我想避免的。

有沒有一個公認的正確的開放源代碼實現了嗎?

+0

我的建議是尋找替代品。對於性能來說,鎖定空閒隊列並不是那麼好。只要有多個線程寫入相同的緩存行,就無法快速創建任何內容。爲每個生產者使用單獨的SPSC隊列的速度更快。缺點是你失去了物品的順序。如果你需要一個有序的隊列,並且想要爲自己節省很多頭痛,只需使用自旋鎖。在實踐中,它幾乎與無鎖定一樣好,比例如數百倍快。 Win32關鍵部分。 – Timo 2012-01-19 03:06:03

+0

您是否考慮使用環形緩衝區而不是隊列?有一定的性能優勢 - 固定內存位置,分配一次插槽,非常簡單的計數器指向尾部/頭部,不需要鎖定,可預測緩存友好的內存訪問模式等。 – bronekk 2012-01-19 12:16:43

+0

@bronekk請你詳細說明一下嗎?你有沒有一個可行的例子? – 2012-01-19 17:51:40

回答

10

你可能想檢查干擾器;在C的可用++這裏:http://lmax-exchange.github.io/disruptor/

您還可以找到解釋它是如何工作here on stackoverflow基本上它是沒有鎖定,在一個固定大小的槽在線程之間傳遞消息的FIFO優化循環緩衝區。

這裏有兩種實現方式,我發現有用:Lock-free Multi-producer Multi-consumer Queue on Ring Buffer @ NatSys Lab. Blog
Yet another implementation of a lock-free circular array queue @ CodeProject

注意:下面的代碼是不正確的,我把它僅僅作爲一個例子,這些東西是如何棘手的是。

如果你不喜歡谷歌版本的複雜性,這裏有點類似於我 - 它更簡單,但我把它作爲一個練習給讀者使其工作(它是大型項目的一部分,而不是便攜式)。整個想法是保持數據的緩衝區和一小組計數器來識別寫入/寫入和讀取/讀取的時隙。由於每個計數器都在其自己的緩存行中,並且(通常)每個計數器在消息的實況中僅被原子更新一次,所以它們都可以在沒有任何同步的情況下被讀取。在寫入線程post_done之間存在一個潛在的爭用點,它是FIFO保證所需的。計數器(head_,wrtn_,rdng_,tail_)的選擇,以確保正確性 FIFO,所以FIFO下降也需要計數器的變化(這可能是困難的,而不sacrifying正確性做)。有可能略微提高一個用戶情景下的性能,但我不會打擾 - 如果發現有多個讀者的其他用例,則必須將其撤銷。

在我的機器延遲看起來像以下(百分位在左邊,這個百分比在右側中的意思是,單位爲毫秒,通過RDTSC測量):

total=1000000 samples, avg=0.24us 
    50%=0.214us, avg=0.093us 
    90%=0.23us, avg=0.151us 
    99%=0.322us, avg=0.159us 
    99.9%=15.566us, avg=0.173us 

這些結果是單一投票的消費者,即工作線程在緊密循環中調用wheel.read()並檢查是否爲空(例如,滾動到底部)。等待消費者(CPU利用率低得多)將等待事件(acquire...函數中的一個),由於上下文切換,這會爲平均延遲增加大約1-2us。由於在讀取上幾乎沒有爭用,因此消費者通過工作者線程的數量非常好地擴展,例如,我的機器上的3個線程:

total=1500000 samples, avg=0.07us 
    50%=0us, avg=0us 
    90%=0.155us, avg=0.016us 
    99%=0.361us, avg=0.038us 
    99.9%=8.723us, avg=0.044us 

補丁會受到歡迎:)

// Copyright (c) 2011-2012, Bronislaw (Bronek) Kozicki 
// 
// Distributed under the Boost Software License, Version 1.0. (See accompanying 
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 

#pragma once 

#include <core/api.hxx> 
#include <core/wheel/exception.hxx> 

#include <boost/noncopyable.hpp> 
#include <boost/type_traits.hpp> 
#include <boost/lexical_cast.hpp> 
#include <typeinfo> 

namespace core { namespace wheel 
{ 
    struct bad_size : core::exception 
    { 
    template<typename T> explicit bad_size(const T&, size_t m) 
     : core::exception(std::string("Slot capacity exceeded, sizeof(") 
        + typeid(T).name() 
        + ") = " 
        + boost::lexical_cast<std::string>(sizeof(T)) 
        + ", capacity = " 
        + boost::lexical_cast<std::string>(m) 
       ) 
    {} 
    };   

    // inspired by Disruptor 
    template <typename Header> 
    class wheel : boost::noncopyable 
    { 
    __declspec(align(64)) 
    struct slot_detail 
    { 
     // slot write: (memory barrier in wheel) > post_done > (memory barrier in wheel) 
     // slot read: (memory barrier in wheel) > read_done > (memory barrier in wheel) 

     // done writing or reading, must update wrtn_ or tail_ in wheel, as appropriate 
     template <bool Writing> 
     void done(wheel* w) 
     { 
     if (Writing) 
      w->post_done(sequence); 
     else 
      w->read_done(); 
     } 

     // cache line for sequence number and header 
     long long sequence; 
     Header header; 

     // there is no such thing as data type with variable size, but we need it to avoid thrashing 
     // cache - so we invent one. The memory is reserved in runtime and we simply go beyond last element. 
     // This is well into UB territory! Using template parameter for this is not good, since it 
     // results in this small implementation detail leaking to all possible user interfaces. 
     __declspec(align(8)) 
     char data[8]; 
    }; 

    // use this as a storage space for slot_detail, to guarantee 64 byte alignment 
    _declspec(align(64)) 
    struct slot_block { long long padding[8]; }; 

    public: 
    // wrap slot data to outside world 
    template <bool Writable> 
    class slot 
    { 
     template<typename> friend class wheel; 

     slot& operator=(const slot&); // moveable but non-assignable 

     // may only be constructed by wheel 
     slot(slot_detail* impl, wheel<Header>* w, size_t c) 
     : slot_(impl) , wheel_(w) , capacity_(c) 
     {} 

    public: 
     slot(slot&& s) 
     : slot_(s.slot_) , wheel_(s.wheel_) , capacity_(s.capacity_) 
     { 
     s.slot_ = NULL; 
     } 

     ~slot() 
     { 
     if (slot_) 
     { 
      slot_->done<Writable>(wheel_); 
     } 
     } 

     // slot accessors - use Header to store information on what type is actually stored in data 
     bool empty() const   { return !slot_; } 
     long long sequence() const { return slot_->sequence; } 
     Header& header()   { return slot_->header; } 
     char* data()    { return slot_->data; } 

     template <typename T> T& cast() 
     { 
     static_assert(boost::is_pod<T>::value, "Data type must be POD"); 
     if (sizeof(T) > capacity_) 
      throw bad_size(T(), capacity_); 
     if (empty()) 
      throw no_data(); 
     return *((T*) data()); 
     } 

    private: 
     slot_detail* slot_; 
     wheel<Header>* wheel_; 
     const size_t capacity_; 
    }; 

    private: 
    // dynamic size of slot, with extra capacity, expressed in 64 byte blocks 
    static size_t sizeof_slot(size_t s) 
    { 
     size_t m = sizeof(slot_detail); 
     // add capacity less 8 bytes already within sizeof(slot_detail) 
     m += max(8, s) - 8; 
     // round up to 64 bytes, i.e. alignment of slot_detail 
     size_t r = m & ~(unsigned int)63; 
     if (r < m) 
     r += 64; 
     r /= 64; 
     return r; 
    } 

    // calculate actual slot capacity back from number of 64 byte blocks 
    static size_t slot_capacity(size_t s) 
    { 
     return s*64 - sizeof(slot_detail) + 8; 
    } 

    // round up to power of 2 
    static size_t round_size(size_t s) 
    { 
     // enfore minimum size 
     if (s <= min_size) 
     return min_size; 

     // find rounded value 
     --s; 
     size_t r = 1; 
     while (s) 
     { 
     s >>= 1; 
     r <<= 1; 
     }; 
     return r; 
    } 

    slot_detail& at(long long sequence) 
    { 
     // find index from sequence number and return slot at found index of the wheel 
     return *((slot_detail*) &wheel_[(sequence & (size_ - 1)) * blocks_]); 
    } 

    public: 
    wheel(size_t capacity, size_t size) 
     : head_(0) , wrtn_(0) , rdng_(0) , tail_(0) , event_() 
     , blocks_(sizeof_slot(capacity)) , capacity_(slot_capacity(blocks_)) , size_(round_size(size)) 
    { 
     static_assert(boost::is_pod<Header>::value, "Header type must be POD"); 
     static_assert(sizeof(slot_block) == 64, "This was unexpected"); 

     wheel_ = new slot_block[size_ * blocks_]; 
     // all slots must be initialised to 0 
     memset(wheel_, 0, size_ * 64 * blocks_); 
     active_ = 1; 
    } 

    ~wheel() 
    { 
     stop(); 
     delete[] wheel_; 
    } 

    // all accessors needed 
    size_t capacity() const { return capacity_; } // capacity of a single slot 
    size_t size() const  { return size_; }  // number of slots available 
    size_t queue() const { return (size_t)head_ - (size_t)tail_; } 
    bool active() const  { return active_ == 1; } 

    // enough to call it just once, to fine tune slot capacity 
    template <typename T> 
    void check() const 
    { 
     static_assert(boost::is_pod<T>::value, "Data type must be POD"); 
     if (sizeof(T) > capacity_) 
     throw bad_size(T(), capacity_); 
    } 

    // stop the wheel - safe to execute many times 
    size_t stop() 
    { 
     InterlockedExchange(&active_, 0); 
     // must wait for current read to complete 
     while (rdng_ != tail_) 
     Sleep(10); 

     return size_t(head_ - tail_); 
    } 

    // return first available slot for write 
    slot<true> post() 
    { 
     if (!active_) 
     throw stopped(); 

     // the only memory barrier on head seq. number we need, if not overflowing 
     long long h = InterlockedIncrement64(&head_); 
     while(h - (long long) size_ > tail_) 
     { 
     if (InterlockedDecrement64(&head_) == h - 1) 
      throw overflowing(); 

     // protection against case of race condition when we are overflowing 
     // and two or more threads try to post and two or more messages are read, 
     // all at the same time. If this happens we must re-try, otherwise we 
     // could have skipped a sequence number - causing infinite wait in post_done 
     Sleep(0); 
     h = InterlockedIncrement64(&head_); 
     } 

     slot_detail& r = at(h); 
     r.sequence = h; 

     // wrap in writeable slot 
     return slot<true>(&r, this, capacity_); 
    } 

    // return first available slot for write, nothrow variant 
    slot<true> post(std::nothrow_t) 
    { 
     if (!active_) 
     return slot<true>(NULL, this, capacity_); 

     // the only memory barrier on head seq. number we need, if not overflowing 
     long long h = InterlockedIncrement64(&head_); 
     while(h - (long long) size_ > tail_) 
     { 
     if (InterlockedDecrement64(&head_) == h - 1) 
      return slot<true>(NULL, this, capacity_); 

     // must retry if race condition described above 
     Sleep(0); 
     h = InterlockedIncrement64(&head_); 
     } 

     slot_detail& r = at(h); 
     r.sequence = h; 

     // wrap in writeable slot 
     return slot<true>(&r, this, capacity_); 
    } 

    // read first available slot for read 
    slot<false> read() 
    { 
     slot_detail* r = NULL; 
     // compare rdng_ and wrtn_ early to avoid unnecessary memory barrier 
     if (active_ && rdng_ < wrtn_) 
     { 
     // the only memory barrier on reading seq. number we need 
     const long long h = InterlockedIncrement64(&rdng_); 
     // check if this slot has been written, step back if not 
     if (h > wrtn_) 
      InterlockedDecrement64(&rdng_); 
     else 
      r = &at(h); 
     } 

     // wrap in readable slot 
     return slot<false>(r , this, capacity_); 
    } 

    // waiting for new post, to be used by non-polling clients 
    void acquire() 
    { 
     event_.acquire(); 
    } 

    bool try_acquire() 
    { 
     return event_.try_acquire(); 
    } 

    bool try_acquire(unsigned long timeout) 
    { 
     return event_.try_acquire(timeout); 
    } 

    void release() 
    {} 

    private: 
    void post_done(long long sequence) 
    { 
     const long long t = sequence - 1; 

     // the only memory barrier on written seq. number we need 
     while(InterlockedCompareExchange64(&wrtn_, sequence, t) != t) 
     Sleep(0); 

     // this is outside of critical path for polling clients 
     event_.set(); 
    } 

    void read_done() 
    { 
     // the only memory barrier on tail seq. number we need 
     InterlockedIncrement64(&tail_); 
    } 

    // each in its own cache line 
    // head_ - wrtn_ = no. of messages being written at this moment 
    // rdng_ - tail_ = no. of messages being read at the moment 
    // head_ - tail_ = no. of messages to read (including those being written and read) 
    // wrtn_ - rdng_ = no. of messages to read (excluding those being written or read) 
    __declspec(align(64)) volatile long long head_; // currently writing or written 
    __declspec(align(64)) volatile long long wrtn_; // written 
    __declspec(align(64)) volatile long long rdng_; // currently reading or read 
    __declspec(align(64)) volatile long long tail_; // read 
    __declspec(align(64)) volatile long active_; // flag switched to 0 when stopped 

    __declspec(align(64)) 
    api::event event_;   // set when new message is posted 
    const size_t blocks_;  // number of 64-byte blocks in a single slot_detail 
    const size_t capacity_;  // capacity of data() section per single slot. Initialisation depends on blocks_ 
    const size_t size_;   // number of slots available, always power of 2 
    slot_block* wheel_; 
    }; 
}} 

這裏是輪詢消費者工作者線程可能是什麼樣子:

while (wheel.active()) 
    { 
    core::wheel::wheel<int>::slot<false> slot = wheel.read(); 
    if (!slot.empty()) 
    { 
     Data& d = slot.cast<Data>(); 
     // do work 
    } 
    // uncomment below for waiting consumer, saving CPU cycles 
    // else 
    // wheel.try_acquire(10); 
    } 

編輯增加消費示例

+0

請你能解釋標題和數據是什麼/有什麼區別?如果我想在每個插槽中存儲3 * 64位字(即整個有效載荷),我將如何使用它? – 2012-06-26 01:46:42

0

我猜沒有這樣的事情存在 - 如果確實如此,它要麼是無法移植或不是開源。

概念,您要同時控制兩個指針:在tail指針和tail->next指針。這通常不能用無鎖基元來完成。

+0

您的猜測是錯誤的。生產者只需要移動尾巴。 你所描述的是一個入侵隊列。在這種情況下,你可以更新tai​​l-> next,然後原子地移動尾部。如果你沒有成功,再次循環。 – edwinc 2017-03-01 05:49:51

3

最合適的實現取決於隊列的所需性能。它應該是無界的還是有界的呢?應該是linearizable,還是不太嚴格的要求會好?先進先出保證您需要多強?你是否願意支付消費者恢復列表的成本(存在一個非常簡單的實現,即消費者抓住單鏈表的尾部,從而立即獲得生產者的所有項目)。它是否應該保證沒有線程被阻塞,或者線程被阻塞的機率很小?等

一些有用的鏈接:
Is multiple-producer, single-consumer possible in a lockfree setting?
http://www.1024cores.net/home/lock-free-algorithms/queues
http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
https://groups.google.com/group/comp.programming.threads/browse_frm/thread/33f79c75146582f3

希望有所幫助。

+1

我創建了一個基於Dmitry Vyukov實現的版本:https://github.com/samanbarghi/MPSCQ/ – 2016-09-19 22:52:06

0

以下是技術我用於我的合作多任務/多線程庫(MACE)http://bytemaster.github.com/mace/。除了隊列爲空時外,它具有無鎖的好處。

struct task { 
    boost::function<void()> func; 
    task* next; 
}; 


boost::mutex      task_ready_mutex; 
boost::condition_variable  task_ready; 
boost::atomic<task*>    task_in_queue; 

// this can be called from any thread 
void thread::post_task(task* t) { 
    // atomically post the task to the queue. 
    task* stale_head = task_in_queue.load(boost::memory_order_relaxed); 
    do { t->next = stale_head; 
    } while(!task_in_queue.compare_exchange_weak(stale_head, t, boost::memory_order_release)); 

    // Because only one thread can post the 'first task', only that thread will attempt 
    // to aquire the lock and therefore there should be no contention on this lock except 
    // when *this thread is about to block on a wait condition. 
    if(!stale_head) { 
     boost::unique_lock<boost::mutex> lock(task_ready_mutex); 
     task_ready.notify_one(); 
    } 
} 

// this is the consumer thread. 
void process_tasks() { 
    while(!done) { 
    // this will atomically pop everything that has been posted so far. 
    pending = task_in_queue.exchange(0,boost::memory_order_consume); 
    // pending is a linked list in 'reverse post order', so process them 
    // from tail to head if you want to maintain order. 

    if(!pending) { // lock scope 
     boost::unique_lock<boost::mutex> lock(task_ready_mutex);     
     // check one last time while holding the lock before blocking. 
     if(!task_in_queue) task_ready.wait(lock); 
    } 
} 
+0

我認爲它應該是pending = task_in_queue.exchange(0,boost :: memory_order_acquire);因爲在ISO C++ 11標準中聲明爲29.3.2「對原子對象M執行釋放操作的原子操作A與原子操作B同步,操作B對M執行獲取操作並從任意一側取其值在以A爲首的版本 中發揮作用。「 – ipapadop 2013-08-30 02:17:37

+0

由於內存分配/回收,當「隊列」不爲空時,我不會說它的鎖定空閒 – 2017-03-09 13:48:59