我正在使用pthreads中的生產者 - 消費者示例。這個想法如下。生產者生成一個在所有消費者線程中共享的新值k
。有n_consumers
線程和只有一個生產者。爲了便於訪問k
,使用了一個包含n_consumers
元素的數組。這樣,每次生成一個新的k
時,它都被複制到整個pool
(pool[0]=k; pool[1]=k;...; pool[n_consumers-1]=k
)。pthreads條件 - 有可能忽略cond信號?
這是我的代碼片段:
void *consumer (void *args) {
int id = *(int *) args;
while (1) {
barrier (&barrier1, id);
// 1. lock
pthread_mutex_lock (&mu);
// 2. wait
pthread_cond_wait (&cond_producer_is_ready, &mu);
// 3. unlock
pthread_mutex_unlock (&mu);
// 4. do something with pool[id] value
printf ("thread %d using value %d\n", id, pool[id]);
// 5. stop?
if (stop_condition(pool[id]))
break;
}
return NULL;
}
void *producer (void *args) {
int i;
int id = n_consumers;
while (1) {
barrier(&barrier1, id);
// 1. lock
pthread_mutex_lock (&mu);
// 2. produce some new values
for (i=0; i<n_consumers; i++)
pool[i]++;
// 3. send message indicating a new value is available
printf ("producer sends broadcast...\n");
pthread_cond_broadcast (&cond_producer_is_ready);
// 4. unlock
pthread_mutex_unlock (&mu);
// 5. stop?
// it could be pool[x], it does not matter the index
if (stop_condition(pool[0]))
break;
}
return NULL;
}
這是輸出:
thread 0 in barrier (count 1)
thread 2 in barrier (count 2)
thread 1 in barrier (count 3)
thread 3 in barrier (count 4)
thread 4 in barrier (count 5) <- all the threads are in the barrier (OK)
producer sends broadcast... <- producer send the message to access to k
thread 4 in barrier (count 1) <- the producer waits in the barrier
thread 1 using value 1 <- consumer 1 received the message and use k=1
thread 1 in barrier (count 2)
thread 3 using value 1 <- consumer 2 and 3 received the message too
thread 3 in barrier (count 3)
thread 2 using value 1
thread 2 in barrier (count 4) <- only the consumer 0 did not received the message
看來,只有少數消費者接收信號cond_producer_is_ready
,在這種情況下,消費者線程1,2和3,而消費者線程0仍在等待這樣的消息。屏障工作正常,因爲所有的線程都可以達到它。然而,問題在於消息的接收。是否可以確定給定線程是否收到或忽略給定的已發送消息(如cond_producer_is_ready
)?
更新 感謝@caf的回答。這是我原來的代碼修正版本,沒有以前的錯誤,只是爲了完整性(或pastebin複製):
/*
| barrier_and_pool.c
| $ gcc barrier_and_pool.c -pthread -o barrier_and_pool.out
* */
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
// shared resource
int *pool = NULL;
// producer and consumers
int n_consumers = 4;
int numproc = 5; // n_consumers + 1 producer
int new_value_available = 0; // mutex flag
int last_value_produced = 0; // mutex flag
pthread_mutex_t mu = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_producer_is_ready = PTHREAD_COND_INITIALIZER;
typedef struct
{
int cur_count;
pthread_mutex_t barrier_mutex;
pthread_cond_t barrier_cond;
} barrier_t;
barrier_t pbarrier;
void barrier_init(barrier_t * mybarrier)
{
pthread_mutex_init(&(mybarrier->barrier_mutex), NULL);
pthread_cond_init(&(mybarrier->barrier_cond), NULL);
mybarrier->cur_count = 0;
}
void barrier(barrier_t * mybarrier, int id) {
pthread_mutex_lock(&(mybarrier->barrier_mutex));
mybarrier->cur_count++;
printf("thread %d in barrier (count %d)\n", id, mybarrier->cur_count);
if (mybarrier->cur_count!=numproc) {
pthread_cond_wait(&(mybarrier->barrier_cond), &(mybarrier->barrier_mutex));
}
else
{
mybarrier->cur_count=0;
printf ("broadcast sent by thread %d\n", id);
pthread_cond_broadcast(&(mybarrier->barrier_cond));
}
pthread_mutex_unlock(&(mybarrier->barrier_mutex));
printf ("thread %d out of barrier\n", id);
}
int stop_condition (int k_value) {
if (k_value >= 5) // we stop after five iters
return 1;
else
return 0;
}
void *consumer (void *args) {
int id = *(int *) args;
int last_value_consumed = 0;
while (1) {
barrier (&pbarrier, id);
// 1. lock
pthread_mutex_lock (&mu);
// 2. wait
while (last_value_produced == last_value_consumed)
pthread_cond_wait (&cond_producer_is_ready, &mu);
last_value_consumed++;
// 3. unlock
pthread_mutex_unlock (&mu);
// 4. do something with pool[id] value
printf ("thread %d using value %d\n", id, pool[id]);
// 5. stop?
if (stop_condition(pool[id]))
break;
}
return NULL;
}
void *producer (void *args) {
int i;
int id = n_consumers;
while (1) {
barrier(&pbarrier, id);
// 1. lock
pthread_mutex_lock (&mu);
// 2. produce some new values
for (i=0; i<n_consumers; i++)
pool[i]++;
// 3. send message indicating a new value is available
last_value_produced++;
printf ("producer sends broadcast (last_value_produced: %d)...\n", last_value_produced);
pthread_cond_broadcast (&cond_producer_is_ready);
// 4. unlock
pthread_mutex_unlock (&mu);
// 5. stop?
// it could be pool[x], it does not matter the index
if (stop_condition(pool[0]))
break;
}
return NULL;
}
int main (int argc, char *argv[]) {
pool = (int*) malloc (sizeof(int)*n_consumers);
int *ids = (int*) malloc(sizeof(int)*n_consumers);
pthread_t pro;
pthread_t cons[n_consumers];
// init barrier
barrier_init(&pbarrier);
// init consumers
int i;
for (i=0; i<n_consumers; i++) {
ids[i] = i;
pool[i] = 0;
pthread_create (&cons[i], NULL, consumer, &ids[i]);
}
// init producer
pthread_create (&pro, NULL, producer, NULL);
// join
for (i=0; i<n_consumers; i++) {
pthread_join (cons[i], NULL);
}
pthread_join (pro, NULL);
return 0;
}