我讀得越多,我就變得越困惑...我會認爲找到一個在C++中實現的正式正確的mpsc隊列是微不足道的。C++中是否存在多生產者單消費者無鎖隊列?
每當我找到另一個刺它,進一步研究似乎表明有如ABA或其他微妙的競爭條件的問題。
很多人都在談論垃圾收集的必要性。這是我想避免的。
有沒有一個公認的正確的開放源代碼實現了嗎?
我讀得越多,我就變得越困惑...我會認爲找到一個在C++中實現的正式正確的mpsc隊列是微不足道的。C++中是否存在多生產者單消費者無鎖隊列?
每當我找到另一個刺它,進一步研究似乎表明有如ABA或其他微妙的競爭條件的問題。
很多人都在談論垃圾收集的必要性。這是我想避免的。
有沒有一個公認的正確的開放源代碼實現了嗎?
你可能想檢查干擾器;在C的可用++這裏:http://lmax-exchange.github.io/disruptor/
您還可以找到解釋它是如何工作here on stackoverflow基本上它是沒有鎖定,在一個固定大小的槽在線程之間傳遞消息的FIFO優化循環緩衝區。
這裏有兩種實現方式,我發現有用:Lock-free Multi-producer Multi-consumer Queue on Ring Buffer @ NatSys Lab. Blog和
Yet another implementation of a lock-free circular array queue @ CodeProject
注意:下面的代碼是不正確的,我把它僅僅作爲一個例子,這些東西是如何棘手的是。
如果你不喜歡谷歌版本的複雜性,這裏有點類似於我 - 它更簡單,但我把它作爲一個練習給讀者使其工作(它是大型項目的一部分,而不是便攜式)。整個想法是保持數據的緩衝區和一小組計數器來識別寫入/寫入和讀取/讀取的時隙。由於每個計數器都在其自己的緩存行中,並且(通常)每個計數器在消息的實況中僅被原子更新一次,所以它們都可以在沒有任何同步的情況下被讀取。在寫入線程post_done
之間存在一個潛在的爭用點,它是FIFO保證所需的。計數器(head_,wrtn_,rdng_,tail_)的選擇,以確保正確性和 FIFO,所以FIFO下降也需要計數器的變化(這可能是困難的,而不sacrifying正確性做)。有可能略微提高一個用戶情景下的性能,但我不會打擾 - 如果發現有多個讀者的其他用例,則必須將其撤銷。
在我的機器延遲看起來像以下(百分位在左邊,這個百分比在右側中的意思是,單位爲毫秒,通過RDTSC測量):
total=1000000 samples, avg=0.24us
50%=0.214us, avg=0.093us
90%=0.23us, avg=0.151us
99%=0.322us, avg=0.159us
99.9%=15.566us, avg=0.173us
這些結果是單一投票的消費者,即工作線程在緊密循環中調用wheel.read()並檢查是否爲空(例如,滾動到底部)。等待消費者(CPU利用率低得多)將等待事件(acquire...
函數中的一個),由於上下文切換,這會爲平均延遲增加大約1-2us。由於在讀取上幾乎沒有爭用,因此消費者通過工作者線程的數量非常好地擴展,例如,我的機器上的3個線程:
total=1500000 samples, avg=0.07us
50%=0us, avg=0us
90%=0.155us, avg=0.016us
99%=0.361us, avg=0.038us
99.9%=8.723us, avg=0.044us
補丁會受到歡迎:)
// Copyright (c) 2011-2012, Bronislaw (Bronek) Kozicki
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#pragma once
#include <core/api.hxx>
#include <core/wheel/exception.hxx>
#include <boost/noncopyable.hpp>
#include <boost/type_traits.hpp>
#include <boost/lexical_cast.hpp>
#include <typeinfo>
namespace core { namespace wheel
{
struct bad_size : core::exception
{
template<typename T> explicit bad_size(const T&, size_t m)
: core::exception(std::string("Slot capacity exceeded, sizeof(")
+ typeid(T).name()
+ ") = "
+ boost::lexical_cast<std::string>(sizeof(T))
+ ", capacity = "
+ boost::lexical_cast<std::string>(m)
)
{}
};
// inspired by Disruptor
template <typename Header>
class wheel : boost::noncopyable
{
__declspec(align(64))
struct slot_detail
{
// slot write: (memory barrier in wheel) > post_done > (memory barrier in wheel)
// slot read: (memory barrier in wheel) > read_done > (memory barrier in wheel)
// done writing or reading, must update wrtn_ or tail_ in wheel, as appropriate
template <bool Writing>
void done(wheel* w)
{
if (Writing)
w->post_done(sequence);
else
w->read_done();
}
// cache line for sequence number and header
long long sequence;
Header header;
// there is no such thing as data type with variable size, but we need it to avoid thrashing
// cache - so we invent one. The memory is reserved in runtime and we simply go beyond last element.
// This is well into UB territory! Using template parameter for this is not good, since it
// results in this small implementation detail leaking to all possible user interfaces.
__declspec(align(8))
char data[8];
};
// use this as a storage space for slot_detail, to guarantee 64 byte alignment
_declspec(align(64))
struct slot_block { long long padding[8]; };
public:
// wrap slot data to outside world
template <bool Writable>
class slot
{
template<typename> friend class wheel;
slot& operator=(const slot&); // moveable but non-assignable
// may only be constructed by wheel
slot(slot_detail* impl, wheel<Header>* w, size_t c)
: slot_(impl) , wheel_(w) , capacity_(c)
{}
public:
slot(slot&& s)
: slot_(s.slot_) , wheel_(s.wheel_) , capacity_(s.capacity_)
{
s.slot_ = NULL;
}
~slot()
{
if (slot_)
{
slot_->done<Writable>(wheel_);
}
}
// slot accessors - use Header to store information on what type is actually stored in data
bool empty() const { return !slot_; }
long long sequence() const { return slot_->sequence; }
Header& header() { return slot_->header; }
char* data() { return slot_->data; }
template <typename T> T& cast()
{
static_assert(boost::is_pod<T>::value, "Data type must be POD");
if (sizeof(T) > capacity_)
throw bad_size(T(), capacity_);
if (empty())
throw no_data();
return *((T*) data());
}
private:
slot_detail* slot_;
wheel<Header>* wheel_;
const size_t capacity_;
};
private:
// dynamic size of slot, with extra capacity, expressed in 64 byte blocks
static size_t sizeof_slot(size_t s)
{
size_t m = sizeof(slot_detail);
// add capacity less 8 bytes already within sizeof(slot_detail)
m += max(8, s) - 8;
// round up to 64 bytes, i.e. alignment of slot_detail
size_t r = m & ~(unsigned int)63;
if (r < m)
r += 64;
r /= 64;
return r;
}
// calculate actual slot capacity back from number of 64 byte blocks
static size_t slot_capacity(size_t s)
{
return s*64 - sizeof(slot_detail) + 8;
}
// round up to power of 2
static size_t round_size(size_t s)
{
// enfore minimum size
if (s <= min_size)
return min_size;
// find rounded value
--s;
size_t r = 1;
while (s)
{
s >>= 1;
r <<= 1;
};
return r;
}
slot_detail& at(long long sequence)
{
// find index from sequence number and return slot at found index of the wheel
return *((slot_detail*) &wheel_[(sequence & (size_ - 1)) * blocks_]);
}
public:
wheel(size_t capacity, size_t size)
: head_(0) , wrtn_(0) , rdng_(0) , tail_(0) , event_()
, blocks_(sizeof_slot(capacity)) , capacity_(slot_capacity(blocks_)) , size_(round_size(size))
{
static_assert(boost::is_pod<Header>::value, "Header type must be POD");
static_assert(sizeof(slot_block) == 64, "This was unexpected");
wheel_ = new slot_block[size_ * blocks_];
// all slots must be initialised to 0
memset(wheel_, 0, size_ * 64 * blocks_);
active_ = 1;
}
~wheel()
{
stop();
delete[] wheel_;
}
// all accessors needed
size_t capacity() const { return capacity_; } // capacity of a single slot
size_t size() const { return size_; } // number of slots available
size_t queue() const { return (size_t)head_ - (size_t)tail_; }
bool active() const { return active_ == 1; }
// enough to call it just once, to fine tune slot capacity
template <typename T>
void check() const
{
static_assert(boost::is_pod<T>::value, "Data type must be POD");
if (sizeof(T) > capacity_)
throw bad_size(T(), capacity_);
}
// stop the wheel - safe to execute many times
size_t stop()
{
InterlockedExchange(&active_, 0);
// must wait for current read to complete
while (rdng_ != tail_)
Sleep(10);
return size_t(head_ - tail_);
}
// return first available slot for write
slot<true> post()
{
if (!active_)
throw stopped();
// the only memory barrier on head seq. number we need, if not overflowing
long long h = InterlockedIncrement64(&head_);
while(h - (long long) size_ > tail_)
{
if (InterlockedDecrement64(&head_) == h - 1)
throw overflowing();
// protection against case of race condition when we are overflowing
// and two or more threads try to post and two or more messages are read,
// all at the same time. If this happens we must re-try, otherwise we
// could have skipped a sequence number - causing infinite wait in post_done
Sleep(0);
h = InterlockedIncrement64(&head_);
}
slot_detail& r = at(h);
r.sequence = h;
// wrap in writeable slot
return slot<true>(&r, this, capacity_);
}
// return first available slot for write, nothrow variant
slot<true> post(std::nothrow_t)
{
if (!active_)
return slot<true>(NULL, this, capacity_);
// the only memory barrier on head seq. number we need, if not overflowing
long long h = InterlockedIncrement64(&head_);
while(h - (long long) size_ > tail_)
{
if (InterlockedDecrement64(&head_) == h - 1)
return slot<true>(NULL, this, capacity_);
// must retry if race condition described above
Sleep(0);
h = InterlockedIncrement64(&head_);
}
slot_detail& r = at(h);
r.sequence = h;
// wrap in writeable slot
return slot<true>(&r, this, capacity_);
}
// read first available slot for read
slot<false> read()
{
slot_detail* r = NULL;
// compare rdng_ and wrtn_ early to avoid unnecessary memory barrier
if (active_ && rdng_ < wrtn_)
{
// the only memory barrier on reading seq. number we need
const long long h = InterlockedIncrement64(&rdng_);
// check if this slot has been written, step back if not
if (h > wrtn_)
InterlockedDecrement64(&rdng_);
else
r = &at(h);
}
// wrap in readable slot
return slot<false>(r , this, capacity_);
}
// waiting for new post, to be used by non-polling clients
void acquire()
{
event_.acquire();
}
bool try_acquire()
{
return event_.try_acquire();
}
bool try_acquire(unsigned long timeout)
{
return event_.try_acquire(timeout);
}
void release()
{}
private:
void post_done(long long sequence)
{
const long long t = sequence - 1;
// the only memory barrier on written seq. number we need
while(InterlockedCompareExchange64(&wrtn_, sequence, t) != t)
Sleep(0);
// this is outside of critical path for polling clients
event_.set();
}
void read_done()
{
// the only memory barrier on tail seq. number we need
InterlockedIncrement64(&tail_);
}
// each in its own cache line
// head_ - wrtn_ = no. of messages being written at this moment
// rdng_ - tail_ = no. of messages being read at the moment
// head_ - tail_ = no. of messages to read (including those being written and read)
// wrtn_ - rdng_ = no. of messages to read (excluding those being written or read)
__declspec(align(64)) volatile long long head_; // currently writing or written
__declspec(align(64)) volatile long long wrtn_; // written
__declspec(align(64)) volatile long long rdng_; // currently reading or read
__declspec(align(64)) volatile long long tail_; // read
__declspec(align(64)) volatile long active_; // flag switched to 0 when stopped
__declspec(align(64))
api::event event_; // set when new message is posted
const size_t blocks_; // number of 64-byte blocks in a single slot_detail
const size_t capacity_; // capacity of data() section per single slot. Initialisation depends on blocks_
const size_t size_; // number of slots available, always power of 2
slot_block* wheel_;
};
}}
這裏是輪詢消費者工作者線程可能是什麼樣子:
while (wheel.active())
{
core::wheel::wheel<int>::slot<false> slot = wheel.read();
if (!slot.empty())
{
Data& d = slot.cast<Data>();
// do work
}
// uncomment below for waiting consumer, saving CPU cycles
// else
// wheel.try_acquire(10);
}
編輯增加消費示例
請你能解釋標題和數據是什麼/有什麼區別?如果我想在每個插槽中存儲3 * 64位字(即整個有效載荷),我將如何使用它? – 2012-06-26 01:46:42
我猜沒有這樣的事情存在 - 如果確實如此,它要麼是無法移植或不是開源。
概念,您要同時控制兩個指針:在tail
指針和tail->next
指針。這通常不能用無鎖基元來完成。
您的猜測是錯誤的。生產者只需要移動尾巴。 你所描述的是一個入侵隊列。在這種情況下,你可以更新tail-> next,然後原子地移動尾部。如果你沒有成功,再次循環。 – edwinc 2017-03-01 05:49:51
最合適的實現取決於隊列的所需性能。它應該是無界的還是有界的呢?應該是linearizable,還是不太嚴格的要求會好?先進先出保證您需要多強?你是否願意支付消費者恢復列表的成本(存在一個非常簡單的實現,即消費者抓住單鏈表的尾部,從而立即獲得生產者的所有項目)。它是否應該保證沒有線程被阻塞,或者線程被阻塞的機率很小?等
一些有用的鏈接:
Is multiple-producer, single-consumer possible in a lockfree setting?
http://www.1024cores.net/home/lock-free-algorithms/queues
http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
https://groups.google.com/group/comp.programming.threads/browse_frm/thread/33f79c75146582f3
希望有所幫助。
我創建了一個基於Dmitry Vyukov實現的版本:https://github.com/samanbarghi/MPSCQ/ – 2016-09-19 22:52:06
以下是技術我用於我的合作多任務/多線程庫(MACE)http://bytemaster.github.com/mace/。除了隊列爲空時外,它具有無鎖的好處。
struct task {
boost::function<void()> func;
task* next;
};
boost::mutex task_ready_mutex;
boost::condition_variable task_ready;
boost::atomic<task*> task_in_queue;
// this can be called from any thread
void thread::post_task(task* t) {
// atomically post the task to the queue.
task* stale_head = task_in_queue.load(boost::memory_order_relaxed);
do { t->next = stale_head;
} while(!task_in_queue.compare_exchange_weak(stale_head, t, boost::memory_order_release));
// Because only one thread can post the 'first task', only that thread will attempt
// to aquire the lock and therefore there should be no contention on this lock except
// when *this thread is about to block on a wait condition.
if(!stale_head) {
boost::unique_lock<boost::mutex> lock(task_ready_mutex);
task_ready.notify_one();
}
}
// this is the consumer thread.
void process_tasks() {
while(!done) {
// this will atomically pop everything that has been posted so far.
pending = task_in_queue.exchange(0,boost::memory_order_consume);
// pending is a linked list in 'reverse post order', so process them
// from tail to head if you want to maintain order.
if(!pending) { // lock scope
boost::unique_lock<boost::mutex> lock(task_ready_mutex);
// check one last time while holding the lock before blocking.
if(!task_in_queue) task_ready.wait(lock);
}
}
我認爲它應該是pending = task_in_queue.exchange(0,boost :: memory_order_acquire);因爲在ISO C++ 11標準中聲明爲29.3.2「對原子對象M執行釋放操作的原子操作A與原子操作B同步,操作B對M執行獲取操作並從任意一側取其值在以A爲首的版本 中發揮作用。「 – ipapadop 2013-08-30 02:17:37
由於內存分配/回收,當「隊列」不爲空時,我不會說它的鎖定空閒 – 2017-03-09 13:48:59
我的建議是尋找替代品。對於性能來說,鎖定空閒隊列並不是那麼好。只要有多個線程寫入相同的緩存行,就無法快速創建任何內容。爲每個生產者使用單獨的SPSC隊列的速度更快。缺點是你失去了物品的順序。如果你需要一個有序的隊列,並且想要爲自己節省很多頭痛,只需使用自旋鎖。在實踐中,它幾乎與無鎖定一樣好,比例如數百倍快。 Win32關鍵部分。 – Timo 2012-01-19 03:06:03
您是否考慮使用環形緩衝區而不是隊列?有一定的性能優勢 - 固定內存位置,分配一次插槽,非常簡單的計數器指向尾部/頭部,不需要鎖定,可預測緩存友好的內存訪問模式等。 – bronekk 2012-01-19 12:16:43
@bronekk請你詳細說明一下嗎?你有沒有一個可行的例子? – 2012-01-19 17:51:40