2012-03-22 150 views
1

前言一個「排隊」:我是新的多線程編程,並用C++有點生疏。我的要求是使用一個互斥體,以及兩個條件mNotEmptymEmpty。我還必須按照下面提到的方式創建和填充矢量。一個生產者,兩位消費者作用於由生產者生產

我有一個生產者線程創建尺寸n*2的隨機數,以及兩個消費者插入這些值成大小n的兩個單獨的向量的向量。

我做的製片人以下:

  1. 鎖定互斥:pthread_mutex_lock(&mMutex1)
  2. 等待消費者說向量是空的:pthread_cond_wait(&mEmpty,&mMutex1)
  3. 推背值到載體
  4. 信號對消費者的載體不是空了:pthread_cond_signal(&mNotEmpty)
  5. 解鎖互斥:pthread_mutex_unlock(&mMutex1)
  6. 返回到步驟1

在消費者:

  1. 鎖定互斥:pthread_mutex_lock(&mMutex1)
  2. 檢查,看是否載體是空的,如果是這樣的信號生產商:pthread_cond_signal(&mEmpty)
  3. 其他刀片值成兩個新的矢量中的一個(取決於哪個線程),並從原來的矢量
  4. 除去解鎖互斥:pthread_mutex_unlock(&mMutex1)
  5. 返回到步驟1

我的過程出了什麼問題?我不斷收到分段錯誤或無限循環。

編輯:下面的代碼:

void Producer() 
{ 
    srand(time(NULL)); 
    for(unsigned int i = 0; i < mTotalNumberOfValues; i++){ 
     pthread_mutex_lock(&mMutex1); 

     pthread_cond_wait(&mEmpty,&mMutex1); 
     mGeneratedNumber.push_back((rand() % 100) + 1); 

     pthread_cond_signal(&mNotEmpty); 
     pthread_mutex_unlock(&mMutex1); 
    } 
} 

void Consumer(const unsigned int index) 
{ 
    for(unsigned int i = 0; i < mNumberOfValuesPerVector; i++){ 
     pthread_mutex_lock(&mMutex1); 
     if(mGeneratedNumber.empty()){ 
      pthread_cond_signal(&mEmpty); 
     }else{ 
      mThreadVector.at(index).push_back[mGeneratedNumber.at(0)]; 
      mGeneratedNumber.pop_back(); 
     } 

     pthread_mutex_unlock(&mMutex1); 
    } 
} 
+1

如果您只是提供了一個可以展示您的問題的最小可編譯程序,這將更容易回答。 – Mankarse 2012-03-22 13:09:51

+0

那麼,如果你在一個鎖內等待,那麼在未來的某個時間點會有一個死鎖。 – ActiveTrayPrntrTagDataStrDrvr 2012-03-22 13:16:42

+0

生產者的第4步說用mNotEmpty向消費者發信號,但mNotEmpty不被消費者使用。 – 2012-03-22 13:21:31

回答

4

我不知道我是否理解你在做 事情的方式背後的基本原理。在通常的消費者提供者成語,提供商推動作爲 許多項目儘可能進入通道,只有等待,如果有在通道 空間不足;它不會空着。所以 常用的成語是:

提供商(推一個項目):

pthread_mutex_lock(&mutex); 
while (! spaceAvailable()) { 
    pthread_cond_wait(&spaceAvailableCondition, &mutex); 
} 
pushTheItem(); 
pthread_cond_signal(&itemAvailableCondition); 
pthread_mutex_unlock(&mutex); 

,並在消費者一方,拿到一個項目:

pthread_mutex_lock(&mutex); 
while (! itemAvailable()) { 
    pthread_cond_wait(&itemAvailableCondition, &mutex); 
} 
getTheItem(); 
pthread_cond_signal(&spaceAvailableCondition); 
pthread_mutex_unlock(&mutex); 

請注意,每個條件,一邊發信號,另一邊等待。 (我 沒有看到你的消費者等待。)如果在任一方有多於一個的 處理,我建議使用pthread_cond_broadcast, 而不是pthread_cond_signal

您的代碼中還有其他一些問題。其中一些看起來像錯別字 :你應該複製/粘貼實際的代碼,以避免這種情況。你 真的意味着讀取和彈出mGeneratedValues,當你推入 mGeneratedNumber,並檢查是否是空的? (如果你實際上有 確實有兩個不同的隊列,那麼你就從隊列中彈出,沒有 人推送。)並且你沒有任何循環等待 條件;你繼續迭代你期望的元素數量(每次遞增計數器,所以你很可能在你應該之前很長時間)—我看不到無限循環,但我可以很容易地看到無盡的等待在pthread_cond_wait中的 生產者。我沒有看到核心轉儲,但當進程終止時(可能是消費者,因爲它從不 等待任何東西)會發生什麼情況;如果它最終破壞互斥或變量的條件,則當另一個進程嘗試使用它們時,可能會獲得核心轉儲。

0

您可能需要考慮採取互斥條件滿足後,才,例如

producer() 
{ 
    while true 
    { 
     waitForEmpty(); 
     takeMutex(); 
     produce(); 
     releaseMutex(); 
    } 
} 

consumer() 
{ 
    while true 
    { 
     waitForNotEmpty(); 
     takeMutex(); 
     consume(); 
     releaseMutex(); 
    } 
} 
+0

這就是我最初的想法,但是我查了一下pthread_cond_wait,它期望傳遞一個鎖定的互斥鎖,它在開始等待之前解鎖。 – 2012-03-22 13:20:03

+0

-1,因爲這是完全錯誤的。如果沒有持有互斥鎖,則不能'waitForEmpty',並且無法釋放'waitForEmpty'和'produce'之間的互斥鎖,而不會出現競爭條件。 – 2012-03-22 13:29:22

+0

對不起,你是對的。考慮後,我認爲按照我寫的做法可能會導致競爭條件,例如,如果兩個消費者完成了等待,只有一個接受互斥體並清空向量,另一個進入受保護的部分。 無論如何,下面是你正在嘗試做的一個很好的工作示例: http://www.cs.loyola.edu/~jglenn/702/S2008/Examples/ProducerConsumer/pc_cpp.html – 2012-03-22 13:30:34

1

在生產者中,只有在隊列不爲空時才調用pthread_cond_wait。否則,由於競賽條件,你永遠被阻擋。

0
Here is a solution to a similar problem like you. In this program producer produces a no and writes it to a array(buffer) and a maintains a file then update a status(status array) about it, while on getting data in the array(buffer) consumers start to consume(read and write to their file) and update a status that it has consumed. when producer looks that both the consumer has consumed the data it overrides the data with a new value and goes on. for convenience here i have restricted the code to run for 2000 nos. 

// Producer-consumer // 

#include <iostream> 
#include <fstream> 
#include <pthread.h> 

#define MAX 100 

using namespace std; 

int dataCount = 2000; 

int buffer_g[100]; 
int status_g[100]; 

void *producerFun(void *); 
void *consumerFun1(void *); 
void *consumerFun2(void *); 

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 

pthread_cond_t dataNotProduced = PTHREAD_COND_INITIALIZER; 
pthread_cond_t dataNotConsumed = PTHREAD_COND_INITIALIZER; 

int main() 
{ 

    for(int i = 0; i < MAX; i++) 
    status_g[i] = 0; 

    pthread_t producerThread, consumerThread1, consumerThread2; 

    int retProducer = pthread_create(&producerThread, NULL, producerFun, NULL); 
    int retConsumer1 = pthread_create(&consumerThread1, NULL, consumerFun1, NULL); 
    int retConsumer2 = pthread_create(&consumerThread2, NULL, consumerFun2, NULL); 

    pthread_join(producerThread, NULL); 
    pthread_join(consumerThread1, NULL); 
    pthread_join(consumerThread2, NULL); 

    return 0; 

} 

void *producerFun(void *) 
{ 
    //file to write produced data by producer 
    const char *producerFileName = "producer.txt"; 
    ofstream producerFile(producerFileName); 
    int index = 0, producerCount = 0; 

    while(1) 
    { 
     pthread_mutex_lock(&mutex); 

     if(index == MAX) 
     { 
     index = 0; 
     } 
     if(status_g[index] == 0) 
     { 

     static int data = 0; 
     data++; 

     cout << "Produced: " << data << endl; 

     buffer_g[index] = data; 

     producerFile << data << endl; 

     status_g[index] = 5; 

     index ++; 
     producerCount ++; 

     pthread_cond_broadcast(&dataNotProduced); 
     } 
     else 
     { 
     cout << ">> Producer is in wait.." << endl; 
     pthread_cond_wait(&dataNotConsumed, &mutex); 
     } 
     pthread_mutex_unlock(&mutex); 

     if(producerCount == dataCount) 
     { 
     producerFile.close(); 
     return NULL; 
     } 
    } 
} 

void *consumerFun1(void *) 
{ 
    const char *consumerFileName = "consumer1.txt"; 
    ofstream consumerFile(consumerFileName); 
    int index = 0, consumerCount = 0; 

    while(1) 
    { 
    pthread_mutex_lock(&mutex); 
    if(index == MAX) 
    { 
     index = 0; 
    } 

    if(status_g[index] != 0 && status_g[index] != 2) 
    { 
     int data = buffer_g[index]; 

     cout << "Cosumer1 consumed: " << data << endl; 
     consumerFile << data << endl; 

     status_g[index] -= 3; 

     index ++; 
     consumerCount ++; 

     pthread_cond_signal(&dataNotConsumed); 
    } 
    else 
    { 
     cout << "Consumer1 is in wait.." << endl; 
     pthread_cond_wait(&dataNotProduced, &mutex); 
    } 
     pthread_mutex_unlock(&mutex); 
    if(consumerCount == dataCount) 
    { 
     consumerFile.close(); 
     return NULL; 
    } 
    } 
} 

void *consumerFun2(void *) 
{ 
    const char *consumerFileName = "consumer2.txt"; 
    ofstream consumerFile(consumerFileName); 
    int index = 0, consumerCount = 0; 

    while(1) 
    { 
    pthread_mutex_lock(&mutex); 
    if(index == MAX) 
    { 
     index = 0; 
    } 

    if(status_g[index] != 0 && status_g[index] != 3) 
    { 


     int data = buffer_g[index]; 
     cout << "Consumer2 consumed: " << data << endl; 
     consumerFile << data << endl; 

     status_g[index] -= 2; 

     index ++; 
     consumerCount ++; 

     pthread_cond_signal(&dataNotConsumed); 
    } 
    else 
    { 
     cout << ">> Consumer2 is in wait.." << endl; 
     pthread_cond_wait(&dataNotProduced, &mutex); 
    } 
    pthread_mutex_unlock(&mutex); 

    if(consumerCount == dataCount) 
    { 
     consumerFile.close(); 
     return NULL; 
    } 
    } 
} 

Here is only one problem that producer in not independent to produce, that is it needs to take lock on the whole array(buffer) before it produces new data, and if the mutex is locked by consumer it waits for that and vice versa, i am trying to look for it.