2016-02-05 44 views
0

我正在使用pthreads中的生產者 - 消費者示例。這個想法如下。生產者生成一個在所有消費者線程中共享的新值k。有n_consumers線程和只有一個生產者。爲了便於訪問k,使用了一個包含n_consumers元素的數組。這樣,每次生成一個新的k時,它都被複制到整個poolpool[0]=k; pool[1]=k;...; pool[n_consumers-1]=k)。pthreads條件 - 有可能忽略cond信號?

這是我的代碼片段:

void *consumer (void *args) { 

    int id = *(int *) args; 

    while (1) { 

     barrier (&barrier1, id); 

     // 1. lock 
     pthread_mutex_lock (&mu); 

     // 2. wait 
     pthread_cond_wait (&cond_producer_is_ready, &mu); 

     // 3. unlock 
     pthread_mutex_unlock (&mu); 

     // 4. do something with pool[id] value 
     printf ("thread %d using value %d\n", id, pool[id]); 

     // 5. stop? 
     if (stop_condition(pool[id])) 
      break; 
    }  
    return NULL; 
} 

void *producer (void *args) { 

    int i; 
    int id = n_consumers; 

    while (1) { 

     barrier(&barrier1, id); 

     // 1. lock 
     pthread_mutex_lock (&mu);  

     // 2. produce some new values 
     for (i=0; i<n_consumers; i++) 
      pool[i]++; 

     // 3. send message indicating a new value is available 
     printf ("producer sends broadcast...\n"); 
     pthread_cond_broadcast (&cond_producer_is_ready); 

     // 4. unlock 
     pthread_mutex_unlock (&mu); 

     // 5. stop? 
     // it could be pool[x], it does not matter the index 
     if (stop_condition(pool[0])) 
      break; 
    } 
    return NULL; 
} 

這是輸出:

thread 0 in barrier (count 1) 
thread 2 in barrier (count 2) 
thread 1 in barrier (count 3) 
thread 3 in barrier (count 4) 
thread 4 in barrier (count 5) <- all the threads are in the barrier (OK) 
producer sends broadcast... <- producer send the message to access to k 
thread 4 in barrier (count 1) <- the producer waits in the barrier 
thread 1 using value 1   <- consumer 1 received the message and use k=1 
thread 1 in barrier (count 2) 
thread 3 using value 1   <- consumer 2 and 3 received the message too 
thread 3 in barrier (count 3) 
thread 2 using value 1 
thread 2 in barrier (count 4) <- only the consumer 0 did not received the message 

看來,只有少數消費者接收信號cond_producer_is_ready,在這種情況下,消費者線程1,2和3,而消費者線程0仍在等待這樣的消息。屏障工作正常,因爲所有的線程都可以達到它。然而,問題在於消息的接收。是否可以確定給定線程是否收到或忽略給定的已發送消息(如cond_producer_is_ready)?

更新 感謝@caf的回答。這是我原來的代碼修正版本,沒有以前的錯誤,只是爲了完整性(或pastebin複製):

/* 
| barrier_and_pool.c 
| $ gcc barrier_and_pool.c -pthread -o barrier_and_pool.out 
* */ 

#include <stdio.h> 
#include <stdlib.h> 
#include <pthread.h> 
#include <unistd.h> 

// shared resource 
int *pool        = NULL;  

// producer and consumers 
int n_consumers       = 4; 
int numproc        = 5; // n_consumers + 1 producer 
int new_value_available     = 0; // mutex flag 
int last_value_produced     = 0; // mutex flag 

pthread_mutex_t mu      = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t cond_producer_is_ready = PTHREAD_COND_INITIALIZER; 

typedef struct 
{ 
    int    cur_count; 
    pthread_mutex_t barrier_mutex; 
    pthread_cond_t barrier_cond; 
}    barrier_t; 

barrier_t pbarrier; 

void barrier_init(barrier_t * mybarrier) 
{ 
    pthread_mutex_init(&(mybarrier->barrier_mutex), NULL); 
    pthread_cond_init(&(mybarrier->barrier_cond), NULL); 
    mybarrier->cur_count = 0; 
} 


void barrier(barrier_t * mybarrier, int id) { 
    pthread_mutex_lock(&(mybarrier->barrier_mutex)); 
    mybarrier->cur_count++; 
    printf("thread %d in barrier (count %d)\n", id, mybarrier->cur_count); 
    if (mybarrier->cur_count!=numproc) { 
     pthread_cond_wait(&(mybarrier->barrier_cond), &(mybarrier->barrier_mutex)); 
    } 
    else 
    { 
     mybarrier->cur_count=0; 
     printf ("broadcast sent by thread %d\n", id); 
     pthread_cond_broadcast(&(mybarrier->barrier_cond)); 
    } 
    pthread_mutex_unlock(&(mybarrier->barrier_mutex)); 

    printf ("thread %d out of barrier\n", id); 
} 

int stop_condition (int k_value) { 
    if (k_value >= 5)  // we stop after five iters 
     return 1; 
    else 
     return 0; 
} 

void *consumer (void *args) { 

    int id = *(int *) args; 
    int last_value_consumed = 0; 

    while (1) { 

     barrier (&pbarrier, id); 

     // 1. lock 
     pthread_mutex_lock (&mu); 

     // 2. wait 
     while (last_value_produced == last_value_consumed) 
      pthread_cond_wait (&cond_producer_is_ready, &mu); 

     last_value_consumed++; 

     // 3. unlock 
     pthread_mutex_unlock (&mu); 

     // 4. do something with pool[id] value 
     printf ("thread %d using value %d\n", id, pool[id]); 

     // 5. stop? 
     if (stop_condition(pool[id])) 
      break; 
    }  
    return NULL; 
} 

void *producer (void *args) { 

    int i; 
    int id = n_consumers; 

    while (1) { 

     barrier(&pbarrier, id); 

     // 1. lock 
     pthread_mutex_lock (&mu);  

     // 2. produce some new values 
     for (i=0; i<n_consumers; i++) 
      pool[i]++; 

     // 3. send message indicating a new value is available 
     last_value_produced++; 
     printf ("producer sends broadcast (last_value_produced: %d)...\n", last_value_produced); 
     pthread_cond_broadcast (&cond_producer_is_ready); 

     // 4. unlock 
     pthread_mutex_unlock (&mu); 

     // 5. stop? 
     // it could be pool[x], it does not matter the index 
     if (stop_condition(pool[0])) 
      break; 
    } 
    return NULL; 
} 

int main (int argc, char *argv[]) { 


    pool = (int*) malloc (sizeof(int)*n_consumers); 

    int *ids = (int*) malloc(sizeof(int)*n_consumers); 

    pthread_t pro; 
    pthread_t cons[n_consumers]; 

    // init barrier 
    barrier_init(&pbarrier); 

    // init consumers 
    int i; 
    for (i=0; i<n_consumers; i++) { 
     ids[i] = i; 
     pool[i] = 0; 
     pthread_create (&cons[i], NULL, consumer, &ids[i]); 
    } 

    // init producer 
    pthread_create (&pro, NULL, producer, NULL); 

    // join 
    for (i=0; i<n_consumers; i++) { 
     pthread_join (cons[i], NULL); 
    } 

    pthread_join (pro, NULL); 

    return 0; 
} 

回答

2

與您的代碼的問題是,生產者可以前的任何或全部獲得互斥體消費者,在這種情況下消費者將在pthread_mutex_lock();處等待條件被髮信號時 - 所以他們將永遠等待pthread_cond_wait()(信號不排隊:如果你不等待條件變量獲得發信號,你會錯過它)。

這就是爲什麼pthread條件變量必須與某個共享狀態的條件配對 - 稱爲謂詞。而不是隻打電話pthread_cond_wait(),你把它的循環中測試謂詞:

while (!new_value_available) 
    pthread_cond_wait (&cond_producer_is_ready, &mu); 

這樣,如果生產商搶先或消費者也沒關係:如果消費者之前達到其臨界區生產者,謂詞將是錯誤的,消費者會等待;如果生產者在消費者之前到達其關鍵部分,則情況將是真實的,並且消費者將繼續。

在這種情況下,要創建謂詞,可以將全局共享變量last_value_produced初始化爲零,並在生成器剛剛廣播條件變量之前將其加1。每個消費者保持一個局部變量last_value_consumed也初始化爲零,且條件變爲:

while (last_value_consumed == last_value_produced) 
    pthread_cond_wait(&cond_producer_is_ready, &mu); 

消費者開始遞增last_value_consumed