我嘗試使用Intel TBB使用文件讀取,排序,文件寫入階段來編寫流水線版本的Bitonic Sort,如下所示。代碼在while((outQueue.try_pop(line))的spinlock處凍結;在FileWriter過濾器中。有人可以解釋爲什麼這可能是?使用Intel TBB的並行C++代碼中的塊/凍結
更新: 我做了一些進一步的測試,發現由try_pop從頭文件_concurrent_queue_internal.h調用的internal_try_pop有一個compare_and_swap操作,該操作永遠爲這個特定的try_pop失敗。以下是我從internal_try_pop中提取的值
head counter(k)15
tail counter1605177747
item ticket(tk)15
k after head CAS 15
(k=tk)15,15---break!!
我認爲尾部計數器值是垃圾。我能想到這種情況的唯一原因是分揀機添加到隊列中的值可能會被隱式修改,從而導致不可用。
任何想法?
謝謝:)
#include <iostream>
#include <sstream>
#include <string>
#include <algorithm>
#include <fstream>
#include "tbb\parallel_for.h"
#include "tbb\blocked_range.h"
#include "tbb\pipeline.h"
#include "tbb\concurrent_queue.h"
using namespace tbb;
using namespace std;
// Filter that writes lines to the output file.
class FileWriterFilter: public tbb::filter {
string outPath;
public:
FileWriterFilter(string outPath);
/*override*/void* operator()(void* item);
};
FileWriterFilter::FileWriterFilter(string outPath) :
tbb::filter(/*is_serial=*/true),
outPath(outPath)
{
}
void* FileWriterFilter::operator()(void* item) {
concurrent_queue<string>& outQueue = *static_cast<concurrent_queue<string>*>(item);
string line;
while(!outQueue.try_pop(line));
ofstream myfile(outPath);
if (myfile.is_open())
{
myfile <<line<<endl;
}
//myfile.close();
return NULL;
}
class FileReaderFilter: public tbb::filter {
public:
FileReaderFilter(string inPath);
private:
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);
};
FileReaderFilter::FileReaderFilter(string inPath) :
filter(/*is_serial=*/true),
ifs(inPath)
{
}
void* FileReaderFilter::operator()(void*) {
string temp;
if(getline(ifs, temp))
{
queue.push(temp);
}
return &queue;
}
class BitonicSort: public tbb::filter{
public:
BitonicSort();
/*override*/void* operator()(void* item);
size_t *a;
private : static const bool ASCENDING=true, DESCENDING=false;
public :void sort(size_t *b,int n)
{
a=b;
bitonicSort(0,n,ASCENDING);
}
private: void bitonicSort(int lo,int n,bool dir)
{
if(n>1)
{
int m=n/2;
bitonicSort(lo,m,ASCENDING);
bitonicSort(lo+m,m,DESCENDING);
bitonicMerge(lo,n,dir);
}
}
private : void bitonicMerge(int lo,int n,bool dir)
{
if(n>1)
{
int m=n/2;
for(int i=lo;i<lo+m;i++)
{
compare(i,i+m,dir);
}
bitonicMerge(lo,m,dir);
bitonicMerge(lo+m,m,dir);
}
}
private : void compare(int i,int j, bool dir)
{
if(dir==a[i]>a[j])
{
exchange(i,j);
}
}
private : void exchange(int i,int j)
{
/* cout<<a[i]<<" "<<a[j]<<endl;*/
int t=a[i];
a[i]=a[j];
a[j]=t;
/*cout<<a[i]<<" "<<a[j]<<endl<<endl;*/
}
private :string convertInt(int number)
{
stringstream ss;//create a stringstream
ss << number;//add number to the stream
return ss.str();//return a string with the contents of the stream
}
};
BitonicSort::BitonicSort() :
tbb::filter(/*serial=*/false)
{}
/*override*/void* BitonicSort::operator()(void* item) {
int num_elem=2048;
size_t *max = new size_t[num_elem];
concurrent_queue<string>& queue = *static_cast<concurrent_queue<string>*>(item);
concurrent_queue<string> outQueue;
string line;
while(!queue.try_pop(line));
istringstream iss(line);
int i=0;
do
{
string sub;
iss >> sub;
max[i]=atoi(sub.c_str());;
i++;
} while (iss);
sort(max,num_elem);
string out;
for(int i=0;i<num_elem;i++)
{
out.append(convertInt(max[i]).append(" "));
}
outQueue.push(out);
return &outQueue;
}
int main() {
tbb::pipeline pipeline;
FileReaderFilter reader("sample.txt");
pipeline.add_filter(reader);
BitonicSort sorter;
pipeline.add_filter(sorter);
FileWriterFilter writer("test.txt");
pipeline.add_filter(writer);
pipeline.run(3);
pipeline.clear();
system("PAUSE");
}
我試着添加this_tbb_thread yield();在while循環中嘗試從隊列中彈出。沒有效果。它看起來像循環本身沒有發生,線程第一次不退出try_pop。 – Danaja 2011-03-23 06:34:17