2016-09-23 190 views
-1

我在寫一個基本的生產者/消費者線程代碼。我得到了大部分的工作,但我有一個問題與蘭特()。或者,也許我的問題比rand()更深,而rand只是冰山一角。我不想做任何太複雜的事情(無法運行或等待)。多線程生產者/消費者

我有一個全局的整數作爲緩衝區。我允許用戶輸入大小和運行時間的限制。 我讓計數器成爲一個靜態全局變量。 這是我的製片人:

DWORD WINAPI producer(LPVOID n) 
{ 
    cout << "\nPRODUCER:The producer is running right now" << endl; 
    int size = (int)n; 
    int num = rand()%10;// this is for making item. 
    while (buffer.size() > size) 
    { 
    } 

    buffer.push_back(num); 
    counter++; 
    return (DWORD)n; 

} 

這是我consumer--

DWORD WINAPI consumer(LPVOID n) 
{ 

    cout << "\nCONSUMER:The consumer is running right now" << endl; 
    while (buffer.empty()) 
    { } 
    int item= buffer.front(); 
    cout << "\nCONSUMER:The consumer ate" << item << endl; 
    counter++; 

    return (DWORD)n; 


    } 
在main-

while (counter < l) 
    { 
    hThreads[0] = CreateThread(NULL, 0, producer, (LPVOID)s, NULL, &id[0]); 
    hThreads[1] = CreateThread(NULL, 0, consumer, (LPVOID)l, NULL, &id[1]); 

    waiter = WaitForMultipleObjects(MAX_THREADS, hThreads, TRUE, INFINITE); 
    } 
    for (int i = 0; i < MAX_THREADS; i++) { 
    CloseHandle(hThreads[i]); 
    } 

我的輸出是這樣的: enter image description here

所以每次只產生1個。 Srand也沒有工作。但它運行正確的次數。

編輯--- 所以我固定的生產和消費,以應對競爭條件:

DWORD WINAPI producer(LPVOID s) 
    { 
    WaitForSingleObject(Empty, INFINITE);  
    WaitForSingleObject(Mutex, INFINITE); 
    cout << "\nPRODUCER...." << endl; 
    int size = (int)s; 
    srand(size); 
    int in = rand() % 10; 
    cout << "THIS IS IN:::" << in << endl; 
    while (buffer.size() == size) 
    { 
     ReleaseMutex(Mutex); 
     } 
    buffer.push_back(in); 
    counter++; 
    cout << "\nThe producer produces " << buffer.front() << endl; 
    ReleaseMutex(Mutex); 
    ReleaseSemaphore(Full, 1, NULL); 

    return (DWORD)s; 
    } 



    DWORD WINAPI consumer(LPVOID l) 
    { 
     WaitForSingleObject(Full, INFINITE); 
     WaitForSingleObject(Mutex, INFINITE); 
     cout << "\nCONSUMER...." << endl; 
     while (buffer.empty()) 
     { 
      ReleaseMutex(Mutex); 

     } 
    int out = buffer.front(); 
    counter++; 
    ReleaseMutex(Mutex); 
    ReleaseSemaphore(Empty, 1, NULL); 
    return (DWORD)l; 
    } 

但隨意的事情仍保持演戲了。它只會一直重複生成一個數字(即使播種時也是如此)。

+0

那麼同步在哪裏呢? – Mysticial

+0

如果消費者沒有任何東西先運行,那麼生產者就會進入,然後消費者就會從中斷。這取決於首先啓動哪個 –

+0

緩衝區上存在未受保護的競爭條件。這是未定義的行爲。 – Mysticial

回答

0

是的,創建(並銷燬)一個線程來創建或處理一個數字是沒有意義的 - 額外的開銷不值得。加上你的代碼(就像)有一些非常明顯的錯誤或誤解。它們是:

  • 在主線程中創建工作線程(在while(){}循環中),但只在最後一次銷燬它們,那就是隻銷燬最後一個循環中創建的句柄。
  • 正如我的消息所述,srand()被調用來生成每個數字,並始終使用相同的初始種子,因此獲得相同的數字是正常的。
  • while()循環檢查緩衝區是空還是滿是沒有意義的,並且不應釋放互斥鎖。
  • counter變量的操作可能是錯誤的。生產者和消費者線程都增加它,主線程使用它來確定生成/打印的數量。
  • 在您的初始代碼片段中,counter是一個全局變量,其上運行着多個線程,因此您應該以線程安全的方式讀取或修改它,而不是以這種方式。您應該使用一些鎖定機制,如關鍵部分或互鎖變量訪問。

我的建議將是創建一個生產者線程(產生的所有數字)和一個消費者線程(打印所有數),通過緩衝器傳送。爲此,您將需要以下項目(您已經實現了其中的大部分):

  • 一個「完整」信號量,計算緩衝區中的數字,最初爲0。
  • 一個互補的「Empty」信號量,計算緩衝區中的空項目,最初設置爲緩衝區大小。這些信號量的「總和」當然總是等於緩衝區大小。
  • 用於訪問緩衝區的互斥鎖(或臨界區)。
  • 一個全局變量,用於告訴線程是否退出。

我在這裏發佈以下代碼示例。不知道是否他們的工作,你可能需要修改或進行調試,但是這僅僅是展示的概念:

// Global 
#define MAX_BUF 5 
BOOL bExit = FALSE; 

// Main Thread 
    Empty = CreateSemaphore(NULL, MAX_BUF, MAX_BUF, NULL); 
    Full = CreateSemaphore(NULL, 0, MAX_BUF, NULL); 
    . 
    . 
    hThreads[0] = CreateThread(NULL, 0, producer, (LPVOID)l, NULL, &id[0]); 
    hThreads[1] = CreateThread(NULL, 0, consumer, (LPVOID)l, NULL, &id[1]); 
    waiter = WaitForMultipleObjects(MAX_THREADS, hThreads, TRUE, INFINITE); 

    for (int i = 0; i < MAX_THREADS; i++) 
     CloseHandle(hThreads[i]); 


DWORD WINAPI producer(LPVOID nCount) 
{ 
    int nItems = (int)nCount; 
    // Initialize rand() seed - each run will be generating a different sequence 
    srand(GetTickCount()); // May need to AND GetTickCount() with RAND_MAX ??? 
    // Generate nCount numbers 
    for (int i = 0; i < nItems; i++) 
    { 
     if (bExit) return 9; // Aborted 
     WaitForSingleObject(Empty, INFINITE); // Wait until at least one item empty 
     // Lock the buffer and add an item 
     WaitForSingleObject(Mutex, INFINITE); // Could be EnterCriticalSection() instead 
     if (buffer.size() >= MAX_BUF) 
     { 
      cout << "\nInternal Error: Buffer-full Check Failed!" << endl; 
      bExit = TRUE; // Tell all threads to exit 
      ReleaseMutex(Mutex); 
      return 1; // Exit with Error 
     } 
     int in = rand() % 10; 
     buffer.push_back(in); 
     cout << "The Producer generated: " << in << endl; 
     ReleaseMutex(Mutex); // Could be LeaveCriticalSection() instead 
     ReleaseSemaphore(Full, 1, NULL); // 1 item added, update semaphore 
    } 
    cout << "\nThe PRODUCER produced " << nItems << " items." << endl; 
    return 0; // OK 
} 

DWORD WINAPI consumer(LPVOID nCount) 
{ 
    int nItems = (int)nCount; 
    // Generate nCount numbers 
    for (int i = 0; i < nItems; i++) 
    { 
     if (bExit) return 9; // Aborted 
     WaitForSingleObject(Full, INFINITE); // Wait until at least one item in buffer 
     // Lock the buffer and get an item 
     WaitForSingleObject(Mutex, INFINITE); // Could be EnterCriticalSection() instead 
     if (buffer.empty()) 
     { 
      cout << "\nInternal Error: Buffer-empty Check Failed!" << endl; 
      bExit = TRUE; // Tell all threads to exit 
      ReleaseMutex(Mutex); 
      return 2; // Exit with Error 
     } 
     int out = buffer.front(); 
     buffer.erase(buffer.begin()); // Remove item from the list 
     cout << "The Consumer ate: " << out << endl; 
     ReleaseMutex(Mutex); // Could be LeaveCriticalSection() instead 
     ReleaseSemaphore(Empty, 1, NULL); // 1 item removed, update semaphore 
    } 
    cout << "\nThe CONSUMER consumed " << nItems << " items." << endl; 
    return 0; // OK 
} 

注:

  • bExit變量是全球性的訪問受到越來越/修改而不是一個線程,但因爲這總是在關鍵部分(互斥鎖)內完成的,所以不需要使用另一個線程或互鎖變量訪問。
  • 診斷消息(例如cout << "The Consumer ate: " << out << endl;)或任何其他類型的「處理」這些數據(或在其上「工作」)可能已經在釋放對象後被放置。這會更高效,將對象提前釋放給其他線程。我已經這樣做了,以更好地說明測試中的事件序列。
  • 如果您將MAX_BUF設置爲1,則應該一次生成/打印一個數字,否則無法確定,但生成的項目減去消耗的項目當然不會超過緩衝區大小。