2014-10-11 242 views
0

我的代碼只用於一個生產者 - 一個消費者的情況。pthread_cond_wait fifo循環隊列中的死鎖

這裏是我的測試代碼:

static void *afunc(void * arg) { 
    Queue* q = arg; 
    for(int i= 0; i< 100000; i++) { 
     *queue_pull(q) = i; //get one element space 
     queue_push(q);  //increase the write pointer 
    } 
    return NULL; 
} 
static void *bfunc(void * arg) { 
    Queue* q = arg; 
    for(;;) { 
     int *i = queue_fetch(q); //get the first element in queue 
     printf("%d\n", *i); 
     queue_pop(q); //increase the read pointer 
    } 
} 
int main() { 
    Queue queue; 
    pthread_t a, b; 
    queue_init(&queue); 
    pthread_create(&a, NULL, afunc, &queue); 
    pthread_create(&b, NULL, bfunc, &queue); 

    sleep(100000); 
    return 0; 
} 

,這裏是循環隊列

#define MAX_QUEUE_SIZE 3 
typedef struct Queue{ 
    int data[MAX_QUEUE_SIZE] ; 
    int read,write; 
    pthread_mutex_t mutex, mutex2; 
    pthread_cond_t not_empty, not_full; 
}Queue; 
int queue_init(Queue *queue) { 
    memset(queue, 0, sizeof(Queue)); 
    pthread_mutex_init(&queue->mutex, NULL); 
    pthread_cond_init(&queue->not_empty, NULL); 
    pthread_mutex_init(&queue->mutex2, NULL); 
    pthread_cond_init(&queue->not_full, NULL); 
    return 0; 
} 
int* queue_fetch(Queue *queue) { 
    int* ret; 
    if (queue->read == queue->write) { 
     pthread_mutex_lock(&queue->mutex); 
     pthread_cond_wait(&queue->not_empty, &queue->mutex); 
     pthread_mutex_unlock(&queue->mutex); 
    } 
    ret = &(queue->data[queue->read]); 
    return ret; 
} 
void queue_pop(Queue *queue) { 
    nx_atomic_set(queue->read, (queue->read+1)%MAX_QUEUE_SIZE); 
    pthread_cond_signal(&queue->not_full); 
} 
int* queue_pull(Queue *queue) { 
    int* ret; 
    if ((queue->write+1)%MAX_QUEUE_SIZE == queue->read) { 
     pthread_mutex_lock(&queue->mutex2); 
     pthread_cond_wait(&queue->not_full, &queue->mutex2); 
     pthread_mutex_unlock(&queue->mutex2); 
    } 
    ret = &(queue->data[queue->write]); 
    return ret; 
} 
void queue_push(Queue *queue) { 
     nx_atomic_set(queue->write, (queue->write+1)%MAX_QUEUE_SIZE); 
     pthread_cond_signal(&queue->not_empty); 
} 

後片刻的實施,似乎兩個子線程會變成僵局..

編輯:我使用兩個信號量,但它也有一些問題..它很漂亮 奇怪,如果只是執行./main,它似乎很好,但如果我重定向到一個文件,如./main> A.TXT,然後WC -l A.TXT,結果不等於所述排隊號碼..

int queue_init(Queue *queue) { 
    memset(queue, 0, sizeof(Queue)); 
    pthread_mutex_init(&queue->mutex, NULL); 
    sem_unlink("/not_empty"); 
    queue->not_empty = sem_open("/not_empty", O_CREAT, 644, 0); 
    sem_unlink("/not_full"); 
    queue->not_full = sem_open("/not_full", O_CREAT, 644, MAX_QUEUE_SIZE); 
    return 0; 
} 

int* queue_fetch(Queue *queue) { 
    sem_wait(queue->not_empty); 
    return &(queue->data[queue->read]); 
} 
void queue_pop(Queue *queue) { 
    nx_atomic_set(queue->read, (queue->read+1)%MAX_QUEUE_SIZE); 
    sem_post(queue->not_full); 
} 

int* queue_pull(Queue *queue) { 
    sem_wait(queue->not_full); 
    return &(queue->data[queue->write]); 
} 
void queue_push(Queue *queue) { 
    nx_atomic_set(queue->write, (queue->write+1)%MAX_QUEUE_SIZE); 
    sem_post(queue->not_empty); 
} 
+0

pthreads條件不是標誌......它們不保留「設置」。如果您沒有等待*條件發出信號*,您將錯過信號。 – Dmitri 2014-10-11 05:46:45

+0

更好的解決方案嗎?一個sem_t是不夠的 – user3682618 2014-10-11 06:03:11

+0

如果你打算使用信號量,一個sem_t和一個互斥對於一個無界隊列就足夠了(對任何數量的生產者/消費者來說都是安全的)。如果你需要一個有界的隊列,你需要另一個sem_t。 – 2014-10-11 13:21:03

回答

1

很有可能你的線程中的一個等待以信號的條件信號發生後,導致兩個線程無限期地等待對方。

Pthreads條件變量不保持發信號 - 信號是一個瞬間動作。條件變量不用來決定是否等待 - 它只是用來喚醒已經在等待的線程;您需要使用不同的方法來確定是否等待,如檢查標誌或某種測試條件。

通常情況下,你的信號如下:

  1. 鎖定互斥
  2. 做你的更新,一般留下您的測試條件「真」(例如,設置你的標誌)
  3. 呼叫pthread_cond_signal()pthread_cond_broadcast()
  4. 解鎖互斥

...並等待如下:

  1. 鎖定互斥
  2. 循環,直到你的測試表達式是「真」(例如:直到您的標誌被設置),僅當測試爲假時才呼叫pthread_cond_wait()(在循環內)。
  3. 循環結束後,當您的測試成功時,請執行您的工作。
  4. 解鎖互斥

例如,信號可能會去是這樣的:

pthread_mutex_lock(&mtx);  /* 1: lock mutex */ 
    do_something_important(); /* 2: do your work... */ 
    ready_flag = 1;    /* ...and set the flag */ 
    pthread_cond_signal(&cond); /* 3: signal the condition (before unlocking) */ 
pthread_mutex_unlock(&mtx); /* 4: unlock mutex */ 

,並等待可能是這樣的:

pthread_mutex_lock(&mtx);   /* 1: lock mutex */ 
    while (ready_flag == 0)   /* 2: Loop until flag is set... */ 
    pthread_cond_wait(&cond, &mtx); /* ...waiting when it isn't */ 
    do_something_else();    /* 3: Do your work... */ 
    ready_flag = 0;      /* ...and clear the flag if it's all done */ 
pthread_mutex_unlock(&mtx);   /* 4: unlock mutex */ 

服務員不會錯過這樣的條件,因爲互斥鎖確保服務員的測試和等待以及信號員的設置和信號不能同時發生。


queue_fetch()功能的這一部分:

if (queue->read == queue->write) { 
    pthread_mutex_lock(&queue->mutex); 
    pthread_cond_wait(&queue->not_empty, &queue->mutex); 
    pthread_mutex_unlock(&queue->mutex); 
} 
ret = &(queue->data[queue->read]); 

..might被改寫爲:

pthread_mutex_lock(&queue->mutex); 
    while (queue->read == queue->write) 
     pthread_cond_wait(&queue->not_empty, &queue->mutex); 
    ret = &(queue->data[queue->read]); 
pthread_mutex_unlock(&queue->mutex); 

...其中:

  1. 上鎖/解鎖互斥體的方法是在附近移動,所以直到條件等待開始
  2. if萬一條件等待過早中斷變更爲while互斥體,而測試表達式,並仍持有舉行
  3. queue->readqueue->write訪問完成與互斥鎖舉行

類似的更改將作出queue_pull()

對於信令碼,的queue_pop()以下部分:

nx_atomic_set(queue->read, (queue->read+1)%MAX_QUEUE_SIZE); 
pthread_cond_signal(&queue->not_full); 

..might改爲:

pthread_mutex_lock(&queue->mutex); 
    queue->read = (queue->read + 1) % MAX_QUEUE_SIZE; 
    pthread_cond_signal(&queue->not_full); 
pthread_mutex_unlock(&queue->mutex); 

..where:

  1. 互斥是在發信號通知情況下保持(這確保了在決定是否等待的服務員之間不能發出信號)真正開始等待,因爲服務員將持有該間隔期間互斥)
  2. 互斥舉行而改變queue->read以及而非因爲信號的條件時,無論如何

類似的變化需要互斥使用nx_atomic_set()將作出queue_push()


此外,(訪問readwrite時,使相同的互斥總是保持),你應該只使用一個單一的互斥體,一旦while循環被添加到條件等待幾乎沒有令人信服的理由使用更多比一個條件變量。如果切換到一個條件變量,剛剛完成的等待後再次信號的條件:

pthread_mutex_lock(&queue->mutex); 
    while (queue->read == queue->write) { 
     pthread_cond_wait(&queue->cond, &queue->mutex); 
     pthread_cond_signal(&queue->cond); /* <-- signal next waiter, if any */ 
    } 
    ret = &(queue->data[queue->read]); 
pthread_mutex_unlock(&queue->mutex); 
+0

你的回答是對的,但是你有什麼建議? – user3682618 2014-10-11 07:30:07

1

你操縱的互斥體外面的隊列的狀態,這是天生racey。

我會建議使用單個互斥量,但每當您更改或測試讀取&寫入標記時都會採用它。這也意味着你不需要原子集。

+0

單個互斥體對此不夠,因爲這裏將存儲器函數(fetch&pull)和索引函數(pop&push)分開,在將內存從隊列中拉出後,我會對存儲器做一些耗時的計算,比增加索引 – user3682618 2014-10-11 07:07:57

+0

從隊列中獲取元素後,不需要鎖定互斥鎖。只有在測試或更改讀取和寫入標記時才需要保持互斥鎖。除非你打算互斥體覆蓋隊列元素的*內容* – Rich 2014-10-11 14:55:12