2016-06-10 116 views
1

我試圖實現一個無鎖棧以便與有界的普通c數組中的外部託管內存一起使用。我知道參考實現(例如來自Anthony Williams:Action中的Concurrency)以及其他書籍和博客/文章。C++無鎖棧已損壞

該實現遵循這些引用並避免ABA問題,因爲外部存儲器位置使用唯一索引而不是回收指針尋址。因此它根本不需要處理成員管理,而且很簡單。

我寫了一些測試,在高負載和爭用(壓力測試)和單線程下在該堆棧上執行彈出和推送操作。前者失敗後出現奇怪的問題,我不明白,對我來說看起來很模糊。

也許有人有一個想法?

  1. 問題:將已彈出的節點推回堆棧失敗,因爲違反了前提條件,該節點沒有後繼(下一個)。

    BOOST_ASSERT(!m_aData.m_aNodes[nNode-1].next); 
    

    複製設置:至少3個線程和16的容量。大約500次傳球。然後推送操作失敗。

  2. 問題:所有線程彈出的元素數量和連接後堆棧中剩餘的元素數量不匹配容量(轉換中丟失的節點)。

    BOOST_ASSERT(aNodes.size()+nPopped == nCapacity); 
    

    繁殖設置:2個線程和容量2.需要大量通行證發生,至少對我700後堆疊的該頭是0,但只有一個節點存在於彈出容器。節點{2,0}處於懸掛狀態。

我用vs2005,vs2013和vs2015編譯。都有同樣的問題(vs2005也是代碼看起來像C + + 03的原因)。

這裏是節點+堆棧

template <typename sizeT> struct node 
{ 
    sizeT   cur; //!< construction invariant 
    atomic<sizeT> next; 
    atomic<sizeT> data; 

    explicit node() // invalid node 
    : cur(0), next(0), data(0) 
    {} 

    explicit node(sizeT const& nCur, sizeT const& nNext, sizeT const& nData) 
    : cur(nCur), next(nNext), data(nData) 
    {} 

    node& operator=(node const& rhs) 
    { 
    cur = rhs.cur; 
    next.store(rhs.next.load(memory_order_relaxed)); 
    data.store(rhs.data.load(memory_order_relaxed)); 
    return *this; 
    } 
}; 

template <typename sizeT> struct stack 
{ 
private: 
    static memory_order const relaxed = memory_order_relaxed; 
    atomic<sizeT> m_aHead; 

public: 
    explicit stack(sizeT const& nHead) : m_aHead(nHead) {} 

    template <typename tagT, typename T, std::size_t N> 
    typename enable_if<is_same<tagT,Synchronized>,sizeT>::type 
    pop(T (&aNodes)[N]) 
    { 
    sizeT nOldHead = m_aHead.load(); 

    for(;;) 
    { 
     if(!nOldHead) return 0; 

     BOOST_ASSERT(nOldHead <= N); 
     T& aOldHead = aNodes[nOldHead-1]; 
     sizeT const nNewHead = aOldHead.next.load(/*relaxed*/); 
     BOOST_ASSERT(nNewHead <= N); 
     sizeT const nExpected = nOldHead; 

     if(m_aHead.compare_exchange_weak(nOldHead,nNewHead 
     /*,std::memory_order_acquire,std::memory_order_relaxed*/)) 
     { 
     BOOST_ASSERT(nExpected == nOldHead); 

     // <--- from here on aOldHead is thread local ---> // 
     aOldHead.next.store(0 /*,relaxed*/); 

     return nOldHead; 
     } 

     // TODO: add back-off strategy under contention (use loop var) 
    } 
    } 

    template <typename tagT, typename T, std::size_t N> 
    typename enable_if<is_same<tagT,Synchronized>,void>::type 
    push(T (&aNodes)[N], sizeT const& nNewHead) 
    { 
#ifndef NDEBUG 
    { 
     BOOST_ASSERT(0 < nNewHead && nNewHead <= N); 
     sizeT const nNext = aNodes[nNewHead-1].next; 
     BOOST_ASSERT(!nNext); 
    } 
#endif 

    sizeT nOldHead = m_aHead.load(/*relaxed*/); 

    for(;;) 
    { 
     aNodes[nNewHead-1].next.store(nOldHead /*,relaxed*/); 
     sizeT const nExpected = nOldHead; 
     BOOST_ASSERT(nOldHead <= N); 

     if(m_aHead.compare_exchange_weak(nOldHead,nNewHead 
     /*,std::memory_order_release,std::memory_order_relaxed*/)) 
     { 
     BOOST_ASSERT(nExpected == nOldHead); 
     return; 
     } 

     // TODO: add back-off strategy under contention (use loop var) 
    } 
    } 
}; 

和相當嘈雜測試類

class StackTest 
{ 
private: 

    typedef boost::mpl::size_t<64> Capacity; 
    //typedef boost::uint_t<static_log2_ceil<Capacity::value>::value>::least size_type; 
    typedef std::size_t size_type; 

    static size_type const nCapacity = Capacity::value; 
    static size_type const nNodes = Capacity::value; 

    typedef node<size_type> Node; 
    typedef stack<size_type> Stack; 

    typedef mt19937          Twister; 
    typedef random::uniform_int_distribution<std::size_t> Distribution; 
    typedef variate_generator<Twister,Distribution>  Die; 

    struct Data //!< shared along threads 
    { 
    Node m_aNodes[nNodes]; 
    Stack m_aStack; 

    explicit Data() : m_aStack(nNodes) 
    { 
     m_aNodes[0] = Node(1,0,0); // tail of stack 

     for(size_type i=1; i<nNodes; ++i) 
     { 
     m_aNodes[i] = Node(static_cast<size_type>(i+1),i,0); 
     } 
    } 

    template <typename syncT> 
    void Run(
     uuids::random_generator& aUUIDGen, 
     std::size_t const&  nPasses, 
     std::size_t const&  nThreads) 
    { 
     std::vector<ThreadLocalData> aThreadLocalDatas(nThreads,ThreadLocalData(*this)); 

     { 
     static std::size_t const N = 100000; 
     Die aRepetition(Twister(hash_value(aUUIDGen())),Distribution(0,N)); 
     Die aAction(Twister(hash_value(aUUIDGen())),Distribution(0,1)); 

     for(std::size_t i=0; i<nThreads; ++i) 
     { 
      std::vector<bool>& aActions = aThreadLocalDatas[i].m_aActions; 
      std::size_t const nRepetition = aRepetition(); 
      aActions.reserve(nRepetition); 

      for(std::size_t k=0; k<nRepetition; ++k) 
      { 
      aActions.push_back(static_cast<bool>(aAction())); 
      } 
     } 
     } 

     std::size_t nPopped = 0; 

     if(nThreads == 1) 
     { 
     std::size_t const i = 0; 
     aThreadLocalDatas[i].Run<syncT>(i); 
     nPopped += aThreadLocalDatas[i].m_aPopped.size(); 
     } 
     else 
     { 
     std::vector<boost::shared_ptr<thread> > aThreads; 
     aThreads.reserve(nThreads); 

     for(std::size_t i=0; i<nThreads; ++i) 
     { 
      aThreads.push_back(boost::make_shared<thread>(boost::bind(&ThreadLocalData::Run<syncT>,&aThreadLocalDatas[i],i))); 
     } 

     for(std::size_t i=0; i<nThreads; ++i) 
     { 
      aThreads[i]->join(); 
      nPopped += aThreadLocalDatas[i].m_aPopped.size(); 
     } 
     } 

     std::vector<size_type> aNodes; 
     aNodes.reserve(nCapacity); 

     while(size_type const nNode = m_aStack.pop<syncT>(m_aNodes)) 
     { 
     aNodes.push_back(nNode); 
     } 

     std::clog << dump(m_aNodes,4) << std::endl; 

     BOOST_ASSERT(aNodes.size()+nPopped == nCapacity); 
    } 
    }; 


    struct ThreadLocalData //!< local to each thread 
    { 
    Data&     m_aData; //!< shared along threads 
    std::vector<bool>  m_aActions; //!< either pop or push 
    std::vector<size_type> m_aPopped; //!< popp'ed nodes 

    explicit ThreadLocalData(Data& aData) 
     : m_aData(aData), m_aActions(), m_aPopped() 
    { 
     m_aPopped.reserve(nNodes); 
    } 

    template <typename syncT> 
    void Run(std::size_t const& k) 
    { 
     BOOST_FOREACH(bool const& aAction, m_aActions) 
     { 
     if(aAction) 
     { 
      if(size_type const nNode = m_aData.m_aStack.pop<syncT>(m_aData.m_aNodes)) 
      { 
      BOOST_ASSERT(!m_aData.m_aNodes[nNode-1].next); 
      m_aPopped.push_back(nNode); 
      } 
     } 
     else 
     { 
      if(!m_aPopped.empty()) 
      { 
      size_type const nNode = m_aPopped.back(); 
      size_type const nNext = m_aData.m_aNodes[nNode-1].next; 
      ASSERT_IF(!nNext,"nNext=" << nNext << " for " << m_aData.m_aNodes[nNode-1] << "\n\n" << dump(m_aData.m_aNodes)); 
      m_aData.m_aStack.push<syncT>(m_aData.m_aNodes,nNode); 
      m_aPopped.pop_back(); 
      } 
     } 
     } 
    } 
    }; 


    template <typename syncT> 
    static void PushPop(
    uuids::random_generator& aUUIDGen, 
    std::size_t const&  nPasses, 
    std::size_t const&  nThreads) 
    { 
    BOOST_ASSERT(nThreads > 0); 
    BOOST_ASSERT(nThreads == 1 || (is_same<syncT,Synchronized>::value)); 

    std::clog << BOOST_CURRENT_FUNCTION << " with threads=" << nThreads << std::endl; 

    for(std::size_t nPass=0; nPass<nPasses; ++nPass) 
    { 
     std::ostringstream s; 
     s << " " << nPass << "/" << nPasses << ": ..."; 
     std::clog << s.str() << std::endl; 

     Data().Run<syncT>(aUUIDGen,nPass,nThreads); 
    } 
    } 

public: 

    static void Run() 
    { 
    typedef StackTest self_t; 

    uuids::random_generator aUUIDGen; 

    static std::size_t const nMaxPasses = 1000; 
    Die aPasses(Twister(hash_value(aUUIDGen())),Distribution(0,nMaxPasses)); 

    { 
    //std::size_t const nThreads = 2; // thread::hardware_concurrency()+1; 
     std::size_t const nThreads = thread::hardware_concurrency()+1; 
     self_t().PushPop<Synchronized>(aUUIDGen,aPasses(),nThreads); 
    } 
    } 
}; 

這裏的基本代碼是link下載所有必需的文件。

+0

對於問題1:在pop上返回它之前,您不能將節點的下一個設置爲NULL嗎? – AndyG

+0

這正是奇怪的事情。接下來設置爲0,然後再從pop中返回。該節點再次被推入,其下一個不再是0。從技術上講,這是唯一可能的,如果另一個線程在同一時間再次修改該節點。但我不明白應該怎樣才能讓另一個線程訪問一個彈出的節點。 –

+0

「該實現遵循這些引用並避免了aba問題,因爲外部存儲器位置使用唯一索引來尋址,而不是回收指針。」嗯,這些索引不是唯一的。您仍然有ABA問題。 –

回答

0

這兩個問題都只是ABA問題的另一個方面。

堆棧:{2,1},{1,0}

  1. 線程A
    1. 彈出
      new_head = 1
      ...時間片超過
  2. 線程B
    1. 彈出
      堆棧:{1,0},彈出:{2,0}
    2. 彈出
      堆棧:{},彈出: {2,0},{1,0}
    3. 推({2,0})
      堆棧:{2,0}
  3. 線程A
  4. 彈出持續
    cmp_exch成功,因爲頭是2
    堆棧:{},頭= 1 --- WRONG,0將是正確

任何可能會出現問題,因爲訪問節點不是線程本地了。這包括對突出節點(問題1)或丟失節點(問題2)的意外修改。

head + next需要在一個cmp_exch中修改以避免該問題。