2011-03-22 81 views
0

我嘗試使用Intel TBB使用文件讀取,排序,文件寫入階段來編寫流水線版本的Bitonic Sort,如下所示。代碼在while((outQueue.try_pop(line))的spinlock處凍結;在FileWriter過濾器中。有人可以解釋爲什麼這可能是?使用Intel TBB的並行C++代碼中的塊/凍結

更新: 我做了一些進一步的測試,發現由try_pop從頭文件_concurrent_queue_internal.h調用的internal_try_pop有一個compare_and_swap操作,該操作永遠爲這個特定的try_pop失敗。以下是我從internal_try_pop中提取的值

head counter(k)15 
tail counter1605177747 
item ticket(tk)15 
k after head CAS 15 
(k=tk)15,15---break!! 

我認爲尾部計數器值是垃圾。我能想到這種情況的唯一原因是分揀機添加到隊列中的值可能會被隱式修改,從而導致不可用。

任何想法?

謝謝:)

#include <iostream> 
#include <sstream> 
#include <string> 
#include <algorithm> 
#include <fstream> 
#include "tbb\parallel_for.h" 
#include "tbb\blocked_range.h" 
#include "tbb\pipeline.h" 
#include "tbb\concurrent_queue.h" 

using namespace tbb; 
using namespace std; 

// Filter that writes lines to the output file. 
class FileWriterFilter: public tbb::filter { 
string outPath; 
public: 
FileWriterFilter(string outPath); 
/*override*/void* operator()(void* item); 
}; 

FileWriterFilter::FileWriterFilter(string outPath) : 
tbb::filter(/*is_serial=*/true), 
outPath(outPath) 
{ 
} 

void* FileWriterFilter::operator()(void* item) { 

concurrent_queue<string>& outQueue = *static_cast<concurrent_queue<string>*>(item); 
string line; 
while(!outQueue.try_pop(line)); 

ofstream myfile(outPath); 
if (myfile.is_open()) 
{ 
    myfile <<line<<endl; 
} 
//myfile.close(); 
return NULL; 

} 

class FileReaderFilter: public tbb::filter { 
public: 

FileReaderFilter(string inPath); 

private: 
ifstream ifs; 
tbb::concurrent_queue<string> queue; 
/*override*/ void* operator()(void*); 

}; 

FileReaderFilter::FileReaderFilter(string inPath) : 
filter(/*is_serial=*/true), 
ifs(inPath) 
{ 
} 

void* FileReaderFilter::operator()(void*) { 

string temp; 

if(getline(ifs, temp)) 
{ 

    queue.push(temp); 
} 

return &queue; 
} 

class BitonicSort: public tbb::filter{ 

public: 
    BitonicSort(); 
/*override*/void* operator()(void* item); 

size_t *a; 

private : static const bool ASCENDING=true, DESCENDING=false; 

public :void sort(size_t *b,int n) 
{ 
    a=b; 
    bitonicSort(0,n,ASCENDING); 

} 

private: void bitonicSort(int lo,int n,bool dir) 
{ 
    if(n>1) 
    { 
     int m=n/2; 
     bitonicSort(lo,m,ASCENDING); 

     bitonicSort(lo+m,m,DESCENDING); 
     bitonicMerge(lo,n,dir); 


    } 

} 

private : void bitonicMerge(int lo,int n,bool dir) 
    { 
     if(n>1) 
     { 
      int m=n/2; 
      for(int i=lo;i<lo+m;i++) 
      { 
       compare(i,i+m,dir); 

      } 
      bitonicMerge(lo,m,dir); 
      bitonicMerge(lo+m,m,dir); 
     } 
    } 

private : void compare(int i,int j, bool dir) 
      { 
       if(dir==a[i]>a[j]) 
       { 
        exchange(i,j); 

       } 

      } 

private : void exchange(int i,int j) 
      { 
      /* cout<<a[i]<<" "<<a[j]<<endl;*/ 
       int t=a[i]; 
       a[i]=a[j]; 
       a[j]=t; 
       /*cout<<a[i]<<" "<<a[j]<<endl<<endl;*/ 
      } 

    private :string convertInt(int number) 
    { 
    stringstream ss;//create a stringstream 
    ss << number;//add number to the stream 
    return ss.str();//return a string with the contents of the stream 
} 
}; 


BitonicSort::BitonicSort() : 
tbb::filter(/*serial=*/false) 
{} 

/*override*/void* BitonicSort::operator()(void* item) { 

int num_elem=2048; 
size_t *max = new size_t[num_elem]; 
concurrent_queue<string>& queue = *static_cast<concurrent_queue<string>*>(item); 
concurrent_queue<string> outQueue; 
string line; 
while(!queue.try_pop(line)); 
istringstream iss(line); 
int i=0; 
do 
{ 
    string sub; 
    iss >> sub; 
    max[i]=atoi(sub.c_str());; 
    i++; 
} while (iss); 

sort(max,num_elem); 

string out; 

for(int i=0;i<num_elem;i++) 
{ 
    out.append(convertInt(max[i]).append(" ")); 
} 

outQueue.push(out); 

return &outQueue; 
} 


int main() { 

tbb::pipeline pipeline; 

FileReaderFilter reader("sample.txt"); 
pipeline.add_filter(reader); 

BitonicSort sorter; 
pipeline.add_filter(sorter); 

FileWriterFilter writer("test.txt"); 
pipeline.add_filter(writer); 

pipeline.run(3); 


pipeline.clear(); 

system("PAUSE"); 
} 
+0

我試着添加this_tbb_thread yield();在while循環中嘗試從隊列中彈出。沒有效果。它看起來像循環本身沒有發生,線程第一次不退出try_pop。 – Danaja 2011-03-23 06:34:17

回答

1

我找到了!這是一個相當微不足道的錯誤。我已經將第二個concurrent_queue聲明爲管道分揀機過濾器的操作員方法中的方法變量。因此,每次操作員方法執行時,隊列都將重新初始化,使發送到寫入器過濾器的指針無效。隊列必須是分類過濾器的類變量,並且一切正常。文件編寫器出現了另一個錯誤,該錯誤在下面進行了更改。 `

#include <string> 
    #include <algorithm> 
    #include <fstream> 
    #include "tbb\parallel_for.h" 
    #include "tbb\blocked_range.h" 
    #include "tbb\pipeline.h" 
    #include "tbb\concurrent_queue.h" 
    #include "tbb\task_scheduler_init.h" 
    #include "tbb\tbb_thread.h" 
    #include "tbb\task.h" 
    #include <iostream> 
    #include <sstream> 

    using namespace tbb; 
    using namespace std; 


// Filter that writes lines to the output file. 
class FileWriterFilter: public tbb::filter { 

public: 
int count; 
FileWriterFilter(FILE* outFile); 

private: 
    FILE* outFile; 

/*override*/void* operator()(void* item); 
}; 

FileWriterFilter::FileWriterFilter(FILE* outFile) : 
tbb::filter(/*is_serial=*/true), 
outFile(outFile),count(0) 
{ 
} 

/*override*/void* FileWriterFilter::operator()(void* item) { 

tbb::concurrent_queue<string> &outQueue = *static_cast<tbb::concurrent_queue<string>*> (item); 
string outLine; 

while(!outQueue.try_pop(outLine)) 
    this_tbb_thread::yield(); 

fprintf(outFile,outLine.append("\n").c_str()); 

count++; 
if(count==10000){ 
    cout<<"over"<<endl; 

} 
return NULL; 

} 


class FileReaderFilter: public tbb::filter { 
public: 
    FileReaderFilter(char* inPath); 

private: 
int count; 
ifstream ifs; 
tbb::concurrent_queue<string> queue; 
/*override*/ void* operator()(void*); 

}; 

FileReaderFilter::FileReaderFilter(char* inPath) : 
filter(/*is_serial=*/true), 
ifs(inPath),count(0) 
{ 
} 

/*override*/void* FileReaderFilter::operator()(void*) { 

string temp; 
count++; 
if(count<=10000){ 
    if(getline(ifs, temp)) 
    { 

      queue.push(temp); 

    } 
    return &queue; 
} 
else{ 
    return NULL; 
} 
} 


class bitonicMerger : public tbb::task{ 
int lo; 
int n; 
bool dir; 
size_t* a; 
private : static const bool ASCENDING=true, DESCENDING=false; 

public: 
    bitonicMerger(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {} 


    task* execute() { 
    if(n>1) 
    {     
     int m=n/2; 
     for(int i=lo;i<lo+m;i++) 
     { 
     compare(i,i+m,dir); 
     }  

     int count = 1; 
     tbb::task_list list; 
     ++count; 
     list.push_back(*new(allocate_child()) bitonicMerger(lo,m,dir,a)); 
     ++count; 
     list.push_back(*new(allocate_child()) bitonicMerger(lo+m,m,dir,a)); 
     set_ref_count(count); 
     spawn_and_wait_for_all(list); 
    } 
    return NULL; 
} 

    private : void compare(int i,int j, bool dir) 
      { 
       if(dir==a[i]>a[j]) 
       { 
        exchange(i,j); 

       } 

      } 

private : void exchange(int i,int j) 
      { 

       int t=a[i]; 
       a[i]=a[j]; 
       a[j]=t; 

      } 

    }; 


class bitonicSorter : public tbb::task{ 
int lo; 
int n; 
bool dir; 
size_t* a; 

private : static const bool ASCENDING=true, DESCENDING=false; 

public: 
    bitonicSorter(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {} 

task* execute() { 
    if(n>1) 
    {     
     int m=n/2; 
     int count = 1; 
     tbb::task_list list; 
     ++count; 
     list.push_back(*new(allocate_child()) bitonicSorter(lo,m,ASCENDING,a)); 
     ++count; 
     list.push_back(*new(allocate_child()) bitonicSorter(lo+m,m,DESCENDING,a)); 
     set_ref_count(count); 
     spawn_and_wait_for_all(list); 



     count = 1; 
     tbb::task_list list1; 
     ++count; 
     list1.push_back(*new(allocate_child()) bitonicMerger(lo,n,dir,a)); 
     set_ref_count(count); 
     spawn_and_wait_for_all(list1); 

    } 
    return NULL; 
} 

}; 




class TBitonicSort : public tbb::filter{ 


public: 
    TBitonicSort(); 
/*override*/void* operator()(void* item); 


size_t *a; 

private : static const bool ASCENDING=true, DESCENDING=false; 
private : tbb::concurrent_queue<string> outQueue; 

public :void sort(size_t *b,int n) 
{ 
    a=b;   
    bitonicSorter& tt = *new(tbb::task::allocate_root()) bitonicSorter(0,n,ASCENDING,a); 
    tbb::task::spawn_root_and_wait(tt); 
} 




}; 

string convertInt(int number) 
{ 
    stringstream ss;//create a stringstream 
    ss << number;//add number to the stream 
    return ss.str();//return a string with the contents of the stream 
} 




TBitonicSort::TBitonicSort() : 
filter(/*is_serial=*/true) 
{} 

/*override*/void* TBitonicSort::operator()(void* item) { 

int num_elem=2048; 
size_t *max = new size_t[num_elem]; 
tbb::concurrent_queue<string>& queue = *static_cast<tbb::concurrent_queue<string>*>(item); 

string line; 

while(!queue.try_pop(line)) 
    this_tbb_thread::yield(); 

istringstream iss(line); 
int i=0; 
do 
{ 
    string sub; 
    iss >> sub; 
    max[i]=atoi(sub.c_str());; 
    i++; 
} while (iss); 

sort(max,num_elem); 

string out; 


for(int i=0;i<num_elem;i++) 
{ 
    out.append(convertInt(max[i]).append(" ")); 
} 


outQueue.push(out); 

return &outQueue; 
} 

int run_pipe(int threads) 
{ 
    FILE* output_file = fopen("test.txt","w"); 
    if(!output_file) { 
     perror("test.txt"); 
     return 0; 
    } 
    char* input_file="sample.txt"; 


    tbb::pipeline pipeline; 

    FileReaderFilter reader(input_file); 
    pipeline.add_filter(reader); 

    TBitonicSort sorter; 
    pipeline.add_filter(sorter); 

    FileWriterFilter writer(output_file); 
    pipeline.add_filter(writer); 

    tbb::tick_count t0 = tbb::tick_count::now(); 
    pipeline.run(threads); 
    tbb::tick_count t1 = tbb::tick_count::now(); 

    fclose(output_file); 
    pipeline.clear(); 

    if(threads==1){ 
     printf("serial run time = %g\n", (t1-t0).seconds()); 
    } 
    else{ 
     printf("parallel run time = %g\n", (t1-t0).seconds()); 
    } 

    return 0; 
} 

int main() { 

int threads[2]={1,3}; 


for(int i=0;i<2;i++) 
{ 
    run_pipe(threads[i]); 
} 
system("PAUSE"); 
}