2008-10-15 49 views
6

我想實現的是服從它大致接口生產者/消費者方案:如何實現阻塞讀取使用POSIX線程

class Consumer { 
private: 
    vector<char> read(size_t n) { 
     // If the internal buffer has `n` elements, then dequeue them 
     // Otherwise wait for more data and try again 
    } 
public: 
    void run() { 
     read(10); 
     read(4839); 
     // etc 
    } 
    void feed(const vector<char> &more) { 
     // Safely queue the data 
     // Notify `read` that there is now more data 
    } 
}; 

在這種情況下,feedrun將在單獨的線程和read運行應是一個阻止讀取(如recvfread)。很顯然,我需要在我的雙殼牌上進行某種互斥,我需要某種通知系統來通知read再試一次。

我聽到條件變量是要走的路,但我所有的多線程的經驗在於與Windows和我有一個很難包裝我的頭周圍。

感謝您的幫助!

(是的,我知道這是低效的返回向量。讓我們不要進入該)

回答

8

此代碼尚未準備好生產。 對任何庫調用的結果都沒有進行錯誤檢查。

我已經在LockThread中封裝了互斥鎖的鎖定/解鎖,因此它是異常安全的。但就是這樣。

另外,如果我認真對待這個問題,我會在對象中包含互斥鎖和條件變量,以便它們可以在其他消費者方法中被濫用。但只要你注意到在使用條件變量(以任何方式)之前必須獲取鎖定,那麼這種簡單的情況可以保持原樣。

出於興趣,你檢查了增強線程庫嗎?

#include <iostream> 
#include <vector> 
#include <pthread.h> 

class LockThread 
{ 
    public: 
    LockThread(pthread_mutex_t& m) 
     :mutex(m) 
    { 
     pthread_mutex_lock(&mutex); 
    } 
    ~LockThread() 
    { 
     pthread_mutex_unlock(&mutex); 
    } 
    private: 
     pthread_mutex_t& mutex; 
}; 
class Consumer 
{ 
    pthread_mutex_t  lock; 
    pthread_cond_t  cond; 
    std::vector<char> unreadData; 
    public: 
    Consumer() 
    { 
     pthread_mutex_init(&lock,NULL); 
     pthread_cond_init(&cond,NULL); 
    } 
    ~Consumer() 
    { 
     pthread_cond_destroy(&cond); 
     pthread_mutex_destroy(&lock); 
    } 

    private: 
     std::vector<char> read(size_t n) 
     { 
      LockThread locker(lock); 
      while (unreadData.size() < n) 
      { 
       // Must wait until we have n char. 
       // This is a while loop because feed may not put enough in. 

       // pthread_cond() releases the lock. 
       // Thread will not be allowed to continue until 
       // signal is called and this thread reacquires the lock. 

       pthread_cond_wait(&cond,&lock); 

       // Once released from the condition you will have re-aquired the lock. 
       // Thus feed() must have exited and released the lock first. 
      } 

      /* 
      * Not sure if this is exactly what you wanted. 
      * But the data is copied out of the thread safe buffer 
      * into something that can be returned. 
      */ 
      std::vector<char> result(n); // init result with size n 
      std::copy(&unreadData[0], 
         &unreadData[n], 
         &result[0]); 

      unreadData.erase(unreadData.begin(), 
          unreadData.begin() + n); 
      return (result); 
     } 
public: 
    void run() 
    { 
     read(10); 
     read(4839); 
     // etc 
    } 
    void feed(const std::vector<char> &more) 
    { 
     LockThread locker(lock); 

     // Once we acquire the lock we can safely modify the buffer. 
     std::copy(more.begin(),more.end(),std::back_inserter(unreadData)); 

     // Only signal the thread if you have the lock 
     // Otherwise race conditions happen. 
     pthread_cond_signal(&cond); 

     // destructor releases the lock and thus allows read thread to continue. 
    } 
}; 


int main() 
{ 
    Consumer c; 
} 
1

我會扔了一些半僞代碼。這裏是我的意見:

1)在這裏非常大的晶粒鎖定。如果你需要更快的訪問,你會想重新考慮你的數據結構。 STL不是線程安全的。

2)鎖將阻塞,直到互斥量讓它通過。互斥體結構是,它可以讓鎖定/解鎖機制一次穿過它。無需輪詢或某種異常類型的結構。

3)這個問題在語法上很有效。我對API和C++語法並不精確,但我相信它提供了一個語義上正確的解決方案。

4)編輯迴應評論。

class piper 
{ 
pthread_mutex queuemutex; 
pthread_mutex readymutex; 
bool isReady; //init to false by constructor 

//whatever else 
}; 

piper::read() 
{//whatever 
pthread_mutex_lock(&queuemutex) 
if(myqueue.size() >= n) 
{ 
    return_queue_vector.push_back(/* you know what to do here */) 

    pthread_mutex_lock(&readymutex) 
    isReady = false; 
    pthread_mutex_unlock(&readymutex) 
} 
pthread_mutex_unlock(&queuemutex) 
} 

piper::push_em_in() 
{ 
//more whatever 
pthread_mutex_lock(&queuemutex) 
//push push push 
if(myqueue.size() >= n) 
{ 
    pthread_mutex_lock(&readymutex) 
    isReady = true; 
    pthread_mutex_unlock(&readymutex) 
} 
pthread_mutex_unlock(&queuemutex) 
} 
+0

好的開始,但請記住我希望我的閱讀成功。不能保證`push_em_in`會保存足夠的數據以便發生。所以閱讀需要等到足夠了。這是我想確保高效(不旋轉)的循環。 – 2008-10-15 23:19:09

+0

您也可以使用RAII來確保您的鎖()unlock()是異常安全的。 – 2008-10-15 23:20:30

+0

@Frank對這個概念採取了另一種方式。你現在更好地學習如何更好地使用pthread mutex嗎? – 2008-10-15 23:24:41

2

我傾向於使用我稱之爲「同步隊列」的東西。我換了正常的隊列,並使用Semaphore類兩個鎖,使讀取塊,就像你的願望:

#ifndef SYNCQUEUE_20061005_H_ 
#define SYNCQUEUE_20061005_H_ 

#include <queue> 
#include "Semaphore.h" 

// similar, but slightly simpler interface to std::queue 
// this queue implementation will serialize pushes and pops 
// and block on a pop while empty (as apposed to throwing an exception) 
// it also locks as neccessary on insertion and removal to avoid race 
// conditions 

template <class T, class C = std::deque<T> > class SyncQueue { 
protected: 
    std::queue<T, C> m_Queue; 
    Semaphore   m_Semaphore; 
    Mutex    m_Mutex; 

public: 
    typedef typename std::queue<T, C>::value_type value_type; 
    typedef typename std::queue<T, C>::size_type size_type; 

    explicit SyncQueue(const C& a = C()) : m_Queue(a), m_Semaphore(0) {} 

    bool empty() const    { return m_Queue.empty(); } 
    size_type size() const   { return m_Queue.size(); } 

    void push(const value_type& x); 
    value_type pop(); 
}; 

template <class T, class C> 
void SyncQueue<T, C>::push(const SyncQueue<T, C>::value_type &x) { 
    // atomically push item 
    m_Mutex.lock(); 
    m_Queue.push(x); 
    m_Mutex.unlock(); 

    // let blocking semaphore know another item has arrived 
    m_Semaphore.v(); 
} 

template <class T, class C> 
typename SyncQueue<T, C>::value_type SyncQueue<T, C>::pop() { 
    // block until we have at least one item 
    m_Semaphore.p(); 

    // atomically read and pop front item 
    m_Mutex.lock(); 
    value_type ret = m_Queue.front(); 
    m_Queue.pop(); 
    m_Mutex.unlock(); 

    return ret; 
} 

#endif 

您可以實現信號量和互斥,在你的線程執行相應的原語。

注意:這個實現是一個隊列中單個元素的例子,但是你可以很容易地用一個緩存結果的函數來包裝它,直到N被提供。像這樣的東西,如果它是一個字符隊列:

std::vector<char> func(int size) { 
    std::vector<char> result; 
    while(result.size() != size) { 
     result.push_back(my_sync_queue.pop()); 
    } 
    return result; 
} 
1

只是爲了好玩,這裏是一個快速和骯髒的實施使用Boost。它在支持它的平臺上使用pthreads,而在windows上使用windows操作。

boost::mutex access; 
boost::condition cond; 

// consumer 
data read() 
{ 
    boost::mutex::scoped_lock lock(access); 
    // this blocks until the data is ready 
    cond.wait(lock); 

    // queue is ready 
    return data_from_queue(); 
} 

// producer 
void push(data) 
{ 
    boost::mutex::scoped_lock lock(access); 
    // add data to queue 

    if (queue_has_enough_data()) 
    cond.notify_one(); 
} 
1

更好玩,這裏是我的最終版本。 STL沒有很好的理由。 :-)

#include <algorithm> 
#include <deque> 
#include <pthread.h> 

template<typename T> 
class MultithreadedReader { 
    std::deque<T> buffer; 
    pthread_mutex_t moreDataMutex; 
    pthread_cond_t moreDataCond; 

protected: 
    template<typename OutputIterator> 
    void read(size_t count, OutputIterator result) { 
     pthread_mutex_lock(&moreDataMutex); 

     while (buffer.size() < count) { 
      pthread_cond_wait(&moreDataCond, &moreDataMutex); 
     } 
     std::copy(buffer.begin(), buffer.begin() + count, result); 
     buffer.erase(buffer.begin(), buffer.begin() + count); 

     pthread_mutex_unlock(&moreDataMutex); 
    } 

public: 
    MultithreadedReader() { 
     pthread_mutex_init(&moreDataMutex, 0); 
     pthread_cond_init(&moreDataCond, 0); 
    } 

    ~MultithreadedReader() { 
     pthread_cond_destroy(&moreDataCond); 
     pthread_mutex_destroy(&moreDataMutex); 
    } 

    template<typename InputIterator> 
    void feed(InputIterator first, InputIterator last) { 
     pthread_mutex_lock(&moreDataMutex); 

     buffer.insert(buffer.end(), first, last); 
     pthread_cond_signal(&moreDataCond); 

     pthread_mutex_unlock(&moreDataMutex); 
    } 
};