2016-12-05 47 views
0

我目前有一個生產者 - 消費者設置的兩個線程,它使用pthread_cond_wait()pthread_cond_signal()來交替讀取數據和處理數據。如何將第二個消費者添加到基於pthread的生產者 - 消費者設置?

說我有一個鎖,兩個條件,並且指出如果數據緩衝器中有數據的布爾標誌:

pthread_mutex_t lock; 
pthread_cond_t we_have_data; 
pthread_cond_t we_need_data; 
bool buffer_is_empty = true; 

我有一個使用以下函數來產生數據的pthread_t(讀數據成緩衝液):

static void* produce(void* arg) { 
    pthread_mutex_lock(&lock); 
    for (;;) { 
     while (!buffer_is_empty) { 
      pthread_cond_wait(&we_need_data, &lock); 
     } 
     pthread_mutex_unlock(&lock); 
     // read some data into our buffer 
     pthread_mutex_lock(&lock); 
     buffer_is_empty = false; 
     pthread_cond_signal(&we_have_data); 
    } 
} 

然後我有一個使用以下代碼來消耗該數據,在接收到we_have_data信號的第二pthread_t

static void* consume(void* arg) { 
    pthread_mutex_lock(&lock); 
    for (;;) { 
     while (buffer_is_empty) { 
      pthread_cond_wait(&we_have_data, &lock); 
     } 
     pthread_mutex_unlock(&lock); 
     // process the data in our buffer 
     pthread_mutex_lock(&lock); 
     buffer_is_empty = true; 
     pthread_cond_signal(&we_need_data); 
    } 
} 

這工作正常。

我現在想要做的是添加第三個線程,如果緩衝區中包含某些數據,它將對consume()函數的數據起作用。

我曾嘗試添加第三個條件,但我的程序掛起。

我設置了一個條件和布爾標誌:

bool processing_with_second_consumer; 
pthread_cond_t we_need_to_process_data_with_another_consumer; 

然後我修改消費者:

static void* consume(void* arg) { 
    pthread_mutex_lock(&lock); 
    for (;;) { 
     while (buffer_is_empty && !processing_with_second_consumer) { 
      pthread_cond_wait(&we_have_data, &lock); 
     } 
     pthread_mutex_unlock(&lock); 
     // process the data in our buffer 
     pthread_mutex_lock(&lock); 
     if (data_meets_our_conditions) { 
      processing_with_second_consumer = true; 
      pthread_cond_signal(&we_need_to_process_data_with_another_consumer); 
     } 
     buffer_is_empty = true; 
     pthread_cond_signal(&we_need_data); 
    } 
} 

然後我修改了生產者等待布爾:

static void* produce(void* arg) { 
    pthread_mutex_lock(&lock); 
    for (;;) { 
     while (!buffer_is_empty && !processing_with_second_consumer) { 
      pthread_cond_wait(&we_need_data, &lock); 
     } 
     pthread_mutex_unlock(&lock); 
     // read some data into our buffer 
     pthread_mutex_lock(&lock); 
     buffer_is_empty = false; 
     pthread_cond_signal(&we_have_data); 
    } 
} 

並添加第三個線程從消費者消費:

static void* consume_from_the_consumer(void* arg) { 
    pthread_mutex_lock(&lock); 
    for (;;) { 
     while (!buffer_is_empty && processing_with_second_consumer) { 
      pthread_cond_wait(&we_need_to_process_data_with_another_consumer, &lock); 
     } 
     pthread_mutex_unlock(&lock); 
     // do more specific processing of the data in our buffer 
     pthread_mutex_lock(&lock); 
     processing_with_second_consumer = false; 
    } 
} 

我似乎無法讓程序正確退出 - 它基本上掛在消費者消費的無限循環中。

爲了允許第三個(或第四個或第五個等)線程,如何正確設置pthread條件的信號?

回答

0

爲了解決這個問題的三個線程,我需要做一些修改:

  1. 移動互斥鎖進線迴路;循環應該做的第一件事是鎖定數據,它應該做的最後一件事是解鎖它。
  2. 設置三個bool標誌:is_new_line_availableis_new_subdata_availableis_eof
  3. 設置三個pthread_cond_t條件:new_line_is_available,new_line_is_emptynew_subdata_is_available
  4. 確保每個線程都有調用pthread_exit()來終止該線程的條件。

的生產線:

static void* produce(void* arg) { 
    for (;;) { 
     pthread_mutex_lock(&lock); 
     while (is_new_line_available) { 
      pthread_cond_wait(&new_line_is_empty, &lock); 
     } 
     // ... read a line of data into buffer ... 
     if (EOF) { 
      is_new_line_available = true; 
      is_new_subdata_available = true; 
      is_eof = false; 
      pthread_cond_signal(&new_line_is_available); 
      pthread_cond_signal(&new_subdata_is_available); 
      pthread_mutex_unlock(&lock); 
      pthread_exit(NULL); 
     } 
     is_new_line_available = true; 
     is_new_chromosome_available = false; 
     is_eof = false; 
     pthread_cond_signal(&new_line_is_available); 
     pthread_mutex_unlock(&lock); 
    } 
} 

的消耗螺紋:

static void* consume(void* arg) { 
    for (;;) { 
     pthread_mutex_lock(&lock); 
     while (is_new_line_available) { 
      pthread_cond_wait(&new_line_is_available, &lock); 
     } 
     // ... process line of data to look for subdata type ... 
     if (EOF) { 
      is_eof = true; 
      pthread_cond_signal(&new_subdata_is_available); 
      pthread_mutex_unlock(&lock); 
      pthread_exit(NULL); 
     } 
     else if (subdata_found) { 
      is_new_subdata_available = true; 
      is_new_line_available = false; 
      pthread_cond_signal(&new_line_is_empty); 
     } 
     pthread_mutex_unlock(&lock); 
    } 
} 

然後,第三個 「子數據」 - 處理線程:

static void* consume_subdata_from_the_consumer(void* arg) { 
    for (;;) { 
     if (is_eof) { 
      pthread_exit(NULL); 
     } 
     pthread_mutex_lock(&lock); 
     while (!is_new_subdata_available) { 
      pthread_cond_wait(&new_subdata_is_available, &lock); 
     } 
     // ... process subdata ... 
     is_new_subdata_available = false; 
     is_new_line_available = true; 
     pthread_cond_signal(new_line_is_available); 
     pthread_mutex_unlock(&lock); 
    } 
} 

一些觀察:

  • 所有的線程應該有一個條件,讓他們到pthread_exit(),或父進程將掛起。
  • 需要在鎖定和解鎖指令之間拉取修改狀態的所有代碼,或者無序處理的數據可能會損壞。
  • 任何緩衝區溢出或寫入已初始化的數據都可能導致問題。例如,使用calloc()可以在線程中使用它之前初始化字符緩衝區。
1

您只製作信號we_have_data。但是由於它將buffer_is_empty設置爲false,它可以使consume_from_the_consumer線程就緒,但它不會解除阻塞,因爲它在第二個條件變量上被阻止。

爲了使您的生活更簡單,我建議兩個轉變:

  1. 始終使用pthread_cond_broadcast
  2. 只能使用一個條件變量。

這可能效率會稍低一點,但有幾個完整類別的微妙的錯誤,這是不可能的。

+0

我無法得到這個工作,似乎。通過三個線程和一個「開/關」條件,兩個線程在給定時間將被解除阻塞。你有這樣的例子嗎? –

+0

@AlexReynolds兩個線程將被解鎖,但「錯誤」的線程會立即再次阻止。這就是'while'循環的意義所在。當你嘗試時出了什麼問題? –

+0

我的程序掛在處理數據上。 –