2014-11-06 85 views
3

我想從輸出文件中逐行讀取。每個線程讀取一行然後處理數據。同時下一個線程必須讀取下一行。std :: thread - 從文件行中逐行讀取

std::ifstream infile("test.txt"); 
std::mutex mtx; 

void read(int id_thread){ 
    while(infile.good()){ 
    mtx.lock(); 
    std::string sLine; 
    getline(infile, sLine); 
    std::cout << "Read by thread: " << id_thread; 
    std::cout << sLine << std::endl; 
    mtx.unlock(); 
    } 
} 

void main(){ 
    std::vector<std::thread> threads; 
    for(int i = 0; i < num; i++){ 
    threads.push_back(std::thread(parallelFun, i)); 
    } 

    for(auto& thread : threads){ 
     thread.join(); 
    } 
    return 0; 
} 

當我運行此代碼時,我得到: 第一個線程讀取所有行。我怎樣才能讓每個線程讀取一行?

enter image description here

編輯

正如評論所有我需要做的是提到大的測試文件。 謝謝你們!

+2

什麼問題?無論如何,讀取操作都是由互斥鎖序列化的。 – 2014-11-06 13:03:43

+3

這就是線程的工作原理,它們運行一段時間,然後讓另一個線程運行一段時間,等等。如果單個線程運行足夠長的時間以讀取短文件中的所有行,那麼您必須找出一些其他方法來一次只讀取一行(如發送一個條件變量?)。 – 2014-11-06 13:06:42

+3

如果您希望閱讀與處理並行進行,您應該在閱讀該行後立即解鎖互斥鎖,並且直到處理完您讀取的數據後纔開啓互斥鎖。 – Angew 2014-11-06 13:07:39

回答

5

我會改變環路成

while(infile.good()){ 
    mtx.lock(); 
    std::string sLine; 
    getline(infile, sLine); 
    mtx.unlock(); 
    std::cout << "Read by thread: " << id_thread; 
    std::cout << sLine << std::endl; 
    } 

你的std ::法院東西是你的測試循環的忙碌一部分以後要換取真正的代碼。這給了其他線程的時間踢。此外,讓你的測試文件。線程初始化花費一些時間,第一個線程吃掉所有數據並不少見。

+1

它在我看來,這是代碼差,使情況變得更糟。原始文件有一個破損的輸入循環,你已經同樣損壞了。通過將輸出移動到互斥部分之外,您允許多個線程同時寫入,因此輸出可能不再一致。 – 2014-11-06 14:44:45

+3

這也不是例外,請使用['std :: lock_guard'](http://en.cppreference.com/w/cpp/thread/lock_guard)或其他RAII互斥處理程序之一來確保始終鎖定在下一次循環迭代或函數退出前釋放 – Mgetz 2014-11-06 14:46:09

+1

@JerryCoffin我的假設是,這只是一個簡約的示例代碼,併發線程應該在真正的代碼中執行一些非常重的東西,而這些東西不一定與同步的I/O相關。我同意這應該被提及,如果問題會是「我如何在線程之間同步I/O」 – Oncaphillis 2014-11-06 16:30:55

1

爲了防止每個線程讀取一行(這在您的描述中很明顯),請刪除while循環,然後您需要確保線程數與文件中的行數相同。

要擺脫上述限制,您可以使用boost線程池。

2

如果你想讓你的5個線程完全讀取每5行,你必須同步讀取,因此每個線程必須知道前一個已經完成讀取它的部分。這個需求可能會導致效率低下,因爲一些線程可能會等待很長時間才能運行。

概念代碼,未經檢驗的使用風險自負。

讓我們先創建一個默認類來處理原子鎖。我們調整它以避免錯誤的共享和相關的緩存乒乓。

constexpr size_t CACHELINESIZE = 64; // could differ on your architecture 
template<class dType> 
class alignas(CACHELINESIZE) lockstep { 
    std::atomic<dType> lock = dType(0); 

public: 
    // spinlock spins until the previous value is prev and then tries to set lock to value 
    // until success, restart the spin if prev changes. 
    dType Spinlock(dType prev = dType(0), dType next = dType(1)) { 
    dType expected = prev; 
    while (!lock.compare_exchange_weak(expected, next)) { // request for locked-exclusiv ~100 cycles? 
     expected = prev; // we wish to continue to wait for expected 
     do { 
     pause(); // on intel waits roughly one L2 latency time. 
     } while(lock.load(std::memory_order_relaxed) != prev); // only one cache miss per change 
    } 
    return expected; 
    } 

    void store(dType value) { 
    lock.store(value); 
    } 
}; 

lockstep<int> lock { 0 }; 

constexpr int NoThreads = 5; 

std::ifstream infile("test.txt"); 

void read(int id_thread) { 
    locks[id_thread].lock = id_thread; 
    bool izNoGood = false; 
    int next = id_thread; 

    while(!izNoGood){ 
    // get lock for next iteration 
    lock.spinlock(next, next); // wait on our number 

    // moved file check into locked region  
    izNoGood = !infile.good(); 
    if (izNoGood) { 
     lock.store(next+1); // release next thread to end run. 
     return; 
    } 

    std::string sLine; 
    getline(infile, sLine); 

    // release next thread 
    lock.store(next+1); 

    // do work asynchronous 
    // ... 

    // debug log, hopefully the whole line gets written in one go (atomic) 
    // but can be in "random" order relative to other lines. 
    std::cout << "Read by thread: " << id_thread << " line no. " << next 
       << " text:" << sLine << std::endl; // endl flushes cout, implicit sync? 
    next += NoThreads; // our next expected line to process 
    } 
} 

void main() { 
    std::vector<std::thread> threads; 
    for(int i = 0; i < NoThreads; i++) { 
    threads.push_back(std::thread(parallelFun, i)); 
    } 

    for(auto& thread : threads){ 
     thread.join(); 
    } 
    return 0; 
}