4

我是新來的線程編程,我有一個概念問題。我正在做矩陣乘法作爲我班的項目。然而,我沒有使用線程,然後使用線程爲回答矩陣的每個單元計算標量乘積,然後再次將第一個矩陣分成比例,以便每個線程都有相等的部分進行計算。我的問題是標量產品實現非常快速完成,這是我所期望的,但第三個實現不會比非線程實現計算出的答案快得多。例如,如果它使用2個線程,它會在大約一半時間內複製它,因爲它可以同時在矩陣的兩個半部分上工作,但事實並非如此。我覺得在第三個實現中有一個問題,我不認爲它並行運行,代碼如下。任何人都可以直接在我這裏?並非所有的代碼都與問題相關,但我將它包括在內以防問題不在本地。 謝謝,關於線程的問題

主要課程:

#include <iostream> 
#include <cstdio> 
#include <cstdlib> 
#include <cmath> 
#include<fstream> 
#include<string> 
#include<sstream> 

#include <matrix.h> 
#include <timer.h> 
#include <random_generator2.h> 

const float averager=2.0; //used to find the average of the time taken to multiply the matrices. 

//Precondition: The matrix has been manipulated in some way and is ready to output the statistics 
//Outputs the size of the matrix along with the user elapsed time. 
//Postconidition: The stats are outputted to the file that is specified with the number of threads used 
//file name example: "Nonparrallel2.dat" 
void output(string file, int numThreads , long double time, int n); 

//argv[1] = the size of the matrix 
//argv[2] = the number of threads to be used. 
//argv[3] = 
int main(int argc, char* argv[]) 
{ 
    random_generator rg; 
    timer t, nonparallel, scalar, variant; 
    int n, total = 0, numThreads = 0; 
    long double totalNonP = 0, totalScalar = 0, totalVar = 0; 

    n = 100; 

/* 
* check arguments 
*/ 
     n = atoi(argv[1]); 
     n = (n < 1) ? 1 : n; 
     numThreads = atoi(argv[2]); 
/* 
* allocated and generate random strings 
*/ 
    int** C; 
    int** A; 
    int** B; 

    cout << "**NOW STARTING ANALYSIS FOR " << n << " X " << n << " MATRICES WITH " << numThreads << "!**"<< endl; 

    for (int timesThrough = 0; timesThrough < averager; timesThrough++) 
    { 

     cout << "Creating the matrices." << endl; 
     t.start(); 
     C = create_matrix(n); 
     A = create_random_matrix(n, rg); 
     B = create_random_matrix(n, rg); 
     t.stop(); 

     cout << "Timer (generate): " << t << endl; 

     //---------------------------------------------------------Ends non parallel----------------------------- 
     /* 
     * run algorithms 
     */ 
      cout << "Running non-parallel matrix multiplication: " << endl; 
      nonparallel.start(); 
      multiply(C, A, B, n); 
      nonparallel.stop(); 
     //-----------------------------------------Ends non parallel---------------------------------------------- 


     //cout << "The correct matrix" <<endl; 
     //output_matrix(C, n); 

      cout << "Timer (multiplication): " << nonparallel << endl; 
      totalNonP += nonparallel.user(); 


      //D is the transpose of B so that the p_scalarproduct function does not have to be rewritten 
      int** D = create_matrix(n); 
      for (int i = 0; i < n; i++) 
      for(int j = 0; j < n; j++) 
       D[i][j] = B[j][i]; 
     //---------------------------------------------------Start Threaded Scalar Poduct-------------------------- 
      cout << "Running scalar product in parallel" << endl; 
      scalar.start(); 
      //Does the scalar product in parallel to multiply the two matrices. 
      for (int i = 0; i < n; i++) 
      for (int j = 0; j < n; j++){ 
      C[i][j] = 0; 
      C[i][j] = p_scalarproduct(A[i],D[j],n,numThreads); 
      }//ends the for loop with j 
      scalar.stop(); 

      cout << "Timer (scalar product in parallel): " << scalar << endl; 
      totalScalar += scalar.user(); 
     //---------------------------------------------------Ends Threaded Scalar Poduct------------------------ 


     //---------------------------------------------------Starts Threaded Variant For Loop--------------- 
      cout << "Running the variation on the for loop." << endl; 
      boost :: thread** thrds; 


      //create threads and bind to p_variantforloop_t 
      thrds = new boost::thread*[numThreads]; 

      variant.start(); 
      for (int i = 1; i <= numThreads; i++) 
       thrds[i-1] = new boost::thread(boost::bind(&p_variantforloop_t, 
         C, A, B, ((i)*n - n)/numThreads ,(i * n)/numThreads, numThreads, n)); 
cout << "before join" <<endl; 
      // join threads 
       for (int i = 0; i < numThreads; i++) 
      thrds[i]->join(); 
      variant.stop(); 

      // cleanup 
       for (int i = 0; i < numThreads; i++) 
      delete thrds[i]; 
       delete[] thrds; 

     cout << "Timer (variation of for loop): " << variant <<endl; 
     totalVar += variant.user(); 
     //---------------------------------------------------Ends Threaded Variant For Loop------------------------ 

     // output_matrix(A, n); 
     // output_matrix(B, n); 
     // output_matrix(E,n); 

     /* 
     * free allocated storage 
     */ 

     cout << "Deleting Storage" <<endl; 

      delete_matrix(A, n); 
      delete_matrix(B, n); 
      delete_matrix(C, n); 
      delete_matrix(D, n); 

     //avoids dangling pointers 
      A = NULL; 
      B = NULL; 
      C = NULL; 
      D = NULL; 
    }//ends the timesThrough for loop 

    //output the results to .dat files 
    output("Nonparallel", numThreads, (totalNonP/averager) , n); 
    output("Scalar", numThreads, (totalScalar/averager), n); 
    output("Variant", numThreads, (totalVar/averager), n); 

    cout << "Nonparallel = " << (totalNonP/averager) << endl; 
    cout << "Scalar = " << (totalScalar/averager) << endl; 
    cout << "Variant = " << (totalVar/averager) << endl; 

    return 0; 
} 

void output(string file, int numThreads , long double time, int n) 
{ 
    ofstream dataFile; 
    stringstream ss; 

    ss << numThreads; 
    file += ss.str(); 
    file += ".dat"; 

    dataFile.open(file.c_str(), ios::app); 
    if(dataFile.fail()) 
    { 
     cout << "The output file didn't open." << endl; 
     exit(1); 
    }//ends the if statement. 
    dataFile << n << "  " << time << endl; 
    dataFile.close(); 
}//ends optimalOutput function 

矩陣文件:

#include <matrix.h> 
#include <stdlib.h> 

using namespace std; 

int** create_matrix(int n) 
{ 
    int** matrix; 

    if (n < 1) 
    return 0; 

    matrix = new int*[n]; 
    for (int i = 0; i < n; i++) 
    matrix[i] = new int[n]; 

    return matrix; 
} 

int** create_random_matrix(int n, random_generator& rg) 
{ 
    int** matrix; 

    if (n < 1) 
    return 0; 

    matrix = new int*[n]; 
    for (int i = 0; i < n; i++) 
    { 
     matrix[i] = new int[n]; 
     for (int j = 0; j < n; j++) 
    //rg >> matrix[i][j]; 
    matrix[i][j] = rand() % 100; 
    } 

    return matrix; 
} 

void delete_matrix(int** matrix, int n) 
{ 
    for (int i = 0; i < n; i++) 
     delete[] matrix[i]; 

    delete[] matrix; 

    //avoids dangling pointers. 
    matrix = NULL; 
} 

/* 
* non-parallel matrix multiplication 
*/ 
void multiply(int** C, int** A, int** B, int n) 
{ 
    if ((C == A) || (C == B)) 
    { 
     cout << "ERROR: C equals A or B!" << endl; 
     return; 
    } 

    for (int i = 0; i < n; i++) 
    for (int j = 0; j < n; j++) 
     { 
    C[i][j] = 0; 
    for (int k = 0; k < n; k++) 
     C[i][j] += A[i][k] * B[k][j]; 
    } 
} 

void p_scalarproduct_t(int* c, int* a, int* b, 
        int s, int e, boost::mutex* lock) 
{ 
    int tmp; 

    tmp = 0; 
    for (int k = s; k < e; k++){ 
    tmp += a[k] * b[k]; 
//cout << "a[k]= "<<a[k]<<"b[k]= "<< b[k] <<" "<<k<<endl; 
} 
    lock->lock(); 
    *c = *c + tmp; 
    lock->unlock(); 
} 

int p_scalarproduct(int* a, int* b, int n, int m) 
{ 
    int c; 
    boost::mutex lock; 
    boost::thread** thrds; 

    c = 0; 

/* create threads and bind to p_merge_sort_t */ 
    thrds = new boost::thread*[m]; 
    for (int i = 0; i < m; i++) 
    thrds[i] = new boost::thread(boost::bind(&p_scalarproduct_t, 
          &c, a, b, i*n/m, (i+1)*n/m, &lock)); 
/* join threads */ 
    for (int i = 0; i < m; i++) 
    thrds[i]->join(); 

/* cleanup */ 
    for (int i = 0; i < m; i++) 
    delete thrds[i]; 
    delete[] thrds; 

    return c; 
} 

void output_matrix(int** matrix, int n) 
{ 
    cout << "["; 
    for (int i = 0; i < n; i++) 
    { 
     cout << "[ "; 
     for (int j = 0; j < n; j++) 
    cout << matrix[i][j] << " "; 
     cout << "]" << endl; 
    } 
    cout << "]" << endl; 
} 





void p_variantforloop_t(int** C, int** A, int** B, int s, int e, int numThreads, int n) 
{ 
//cout << "s= " <<s<<endl<< "e= " << e << endl; 
    for(int i = s; i < e; i++) 
     for(int j = 0; j < n; j++){ 
      C[i][j] = 0; 
//cout << "i " << i << " j " << j << endl; 
      for (int k = 0; k < n; k++){ 
      C[i][j] += A[i][k] * B[k][j];} 
     } 
}//ends the function 
+0

如果您隔離並運行第三種變體,您是否看到所有CPU內核同時處於活動狀態(並希望與之掛鉤)? ...從代碼看來它應該並行工作,但如果出現問題,至少可以快速進行完整性檢查。 – Jason 2011-05-01 20:38:10

+0

@jason我不確定如何檢查這與Linux,你有什麼建議如何做到這一點?我在學校使用電腦,所以我沒有root權限。 – tpar44 2011-05-01 21:12:23

+0

什麼發行版?在Ubuntu上,我只需進入系統 - >管理 - >系統監視器查看CPU負載... Fedora/RH/SuSE/etc。儘管菜單可能位於不同的地方,但它們具有相同的功能。另外,如果你只有一個命令行,你可以輸入'top'來獲得一個基於進程的CPU監視器(也提供了許多其他統計信息)。 – Jason 2011-05-01 21:18:46

回答

3

我的猜測是,你正在運行到False Sharing。嘗試在p_variantforloop_t使用本地變量:

void p_variantforloop_t(int** C, int** A, int** B, int s, int e, int numThreads, int n) 
{ 
    for(int i = s; i < e; i++) 
     for(int j = 0; j < n; j++){ 
      int accu = 0; 
      for (int k = 0; k < n; k++) 
      accu += A[i][k] * B[k][j]; 
      C[i][j] = accu; 
     } 
} 
+0

好吧,這對我來說很合理,但是當我將它添加到我的程序中時,即使我完全添加了該文章中的內容,它也不會反映在我的輸出中 – tpar44 2011-05-01 20:22:32

+0

如果您將整個臨時矩陣用作累加器在你的函數中,只是將結果從臨時矩陣複製到'p_variantforloop_t'函數末尾的C處,而不是在for循環中間逐個單元地複製? – Jason 2011-05-01 20:43:47

+0

@Jason:我在想同樣的事情,但是OP沿着最大步長維度(第一個索引,正確?)劃分工作,寫入沿着最小步長維度,所以我不太確定它實際上是False Sharing ,除非範圍很小。 – 2011-05-01 20:52:23

0

基於在評論你的迴應,從理論上講,因爲你只能有一個線程(即CPU),所有的線程版本應該是相同的時間作爲單線程版本或更長的時間,因爲線程管理開銷。你不應該看到任何加速,因爲解決矩陣的一部分的時間片是從另一個並行任務中竊取的時間片。只有一個CPU,你只能分時分享CPU的資源 - 在給定的一段時間內沒有真正的並行工作。那麼我會懷疑你的第二個實現運行得更快的原因是因爲你在內部循環中做了更少的指針解引用和內存訪問。例如,在multiplyp_variantforloop_t的主操作C[i][j] += A[i][k] * B[k][j];中,您正在查看彙編級別的許多操作,其中許多與內存相關。它看起來像在「組裝僞碼」下列:

1)從由A棧上所引用的地址移動指針值到寄存器R1
2)由關的值增量寄存器R1地址棧由可變ij,或k
3)將指針地址值的地址所指向的R1R1
4)由值增量R1地址關閉由可變i引用的堆棧引用j,或k
5)從所述地址移動值指向R1R1(所以R1現在持有的A[i][k]的值)
6)執行用於通過B堆棧到寄存器上引用的地址的步驟1-5 R2(所以R2現在持有的B[k][j]的值)
7)執行用於通過C在棧上所引用到寄存器R3
8的地址步驟1-4)將值從地址指向R3R4(即,R4保持在C[i][j]實際值)
9)乘在寄存器寄存器R1R2和存儲R5
10)添加寄存器R4R5和存儲在R4
11)從R4移動終值回到存儲器地址R3(現在的C[i][j]有最終結果)

這就是假設我們有5個通用寄存器可供使用,並且編譯器正確地優化了你的C代碼以利用它們。我將循環索引變量i,jk留在堆棧上,因此訪問它們需要的時間比它們在寄存器中的時間要多得多......這取決於您的編譯器在您的平臺上需要使用多少個寄存器。另外,如果你沒有進行任何優化編譯,你可能會從堆棧中進行更多的內存訪問,其中一些臨時值存儲在堆棧而不是寄存器中,然後從堆棧中重新獲取,這需要花費更長的時間比在寄存器之間移動值。無論哪種方式,上面的代碼都很難優化。它可以工作,但是如果你使用的是32位x86平臺,那麼你不會有那麼多的通用寄存器來玩(儘管你至少應該有6個)。 x86_64有更多的寄存器可供使用,但仍然有所有的內存訪問需要處理。

另一方面,在一個緊密的內部循環中,類似於tmp += a[k] * b[k]p_scalarproduct_t的操作將會移動得更快......這裏是在裝配的僞代碼的上述操作:

會有爲循環

1)請tmp寄存器R1小初始化步驟,而不是堆棧變量,和初始化它的值設置爲0
2 )移動通過a棧上所引用的地址值到R2
3)R2
4添加的s值從堆棧到R2和保存得到的地址)移動通過引用的地址值0堆疊成R3
5)添加的s值從堆棧到R3R3
6)設置保存所得地址初始化爲e - s

我們將一次性初始化之後在R6的計數器上開始實際的內環

7)移動的值的地址所指向的R2R4
8)將值從地址指向R3R5
9)乘法R4R5並存儲在R5
10)的結果添加R5R1並存儲在R1
11的結果)增量在R6R2R3
12)遞減計數器直到達到零,我們終止循環

我不能保證這正是你的編譯器設置這個循環的方式,但是你可以通過你的標量例子看到在內部循環中需要的步驟少,更重要的是l ess內存訪問。因此,更多的操作可以通過僅使用寄存器的操作完成,而不是包含存儲器位置的操作,並且需要進行存儲器提取,這比僅寄存器操作要慢得多。所以總的來說,它會更快地移動,而這與線程無關。

最後,我注意到你只有標產品兩個嵌套的循環,所以它的複雜度爲O(N^2),其中,爲您的另外兩種方法你有Ø三個嵌套循環(N^3)複雜。這也會有所作爲。