2016-06-15 83 views
2

我正在嘗試編寫一個C++ 11/14程序,其中固定數量的線程(比如4個)持續從線程安全隊列中取出工作,直到那裏在隊列中沒有工作。具有固定數量的線程和線程安全隊列的C++多線程

線程安全的隊列實現:

template<typename T> 
class threadsafe_queue 
{ 
private: 
    mutable std::mutex mut; 
    std::queue<T> data_queue; 
    std::condition_variable data_cond; 
public: 
    threadsafe_queue() {} 
    threadsafe_queue(threadsafe_queue const &other) 
    { 
    std::lock_guard<std::mutex> lk(other.mut); 
    data_queue = other.data_queue; 
    } 

    void push(T new_value) 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    data_queue.push(new_value); 
    data_cond.notify_one(); 
    } 

    void wait_and_pop(T &value) 
    { 
    std::unique_lock<std::mutex> lk(mut); 
    data_cond.wait(lk, [this]{return !data_queue.empty();}); 
    value = data_queue.front(); 
    data_queue.pop(); 
    } 

    std::shared_ptr<T> wait_and_pop() 
    { 
    std::unique_lock<std::mutex> lk(mut); 
    data_cond.wait(lk, [this]{return !data_queue.empty();}); 
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); 
    data_queue.pop(); 
    return res; 
    } 

    bool try_pop(T &value) 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    if (data_queue.empty()) 
     return false; 
    value = data_queue.front(); 
    data_queue.pop(); 
    return true; 
    } 

    std::shared_ptr<T> try_pop() 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    if (data_queue.empty()) 
     return std::shared_ptr<T>(); 
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); 
    data_queue.pop(); 
    return res; 
    } 

    bool empty() const 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    return data_queue.empty(); 
    } 
}; 

功能每個線程運行:

void insertintobidask(std::string connstring, std::string ziparchivename, OFStreamWriter &errlog) { /.../ } 

這些線程應該直到沒有留在工作地脫下工作隊列工作的主隊列:

int main() 
{ 
    std::ofstream errlog 
    errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out); 
    OFStreamWriter ofsw(&errlog); 

    threadsafe_queue<std::string> wqueue; 
    boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data"); 
    std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip"); 
    for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter) 
    { 
    std::string name = iter->path().filename().string(); 
    if (std::regex_match(name, pattern_fx)) 
    { 
     wqueue.push(name); 
    } 
    } 

    /* Each thread below would run once, how do I modify it to make it continuously take a work off the queue and run until there is no work left in the queue? 
    std::thread consumer1 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 
    std::thread consumer2 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 
    std::thread consumer3 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 
    std::thread consumer4 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 

    consumer1.join(); 
    consumer2.join(); 
    consumer3.join(); 
    consumer4.join(); 
    */ 

    errlog.close(); 
    return 0; 
} 

我嘗試了另一種方法,基於尼姆的答案下面和 有用。

/* g++ -std=gnu++11 fxetl.cxx -o fxetl -lboost_system -lboost_filesystem -lzip -lpqxx -lpq -pthread */ 

#include <boost/filesystem.hpp> 
#include <regex> 
#include <iostream> 
#include <fstream> 
#include <string> 
#include <pqxx/pqxx> 
#include <zip.h> 
#include <thread> 
#include <boost/asio.hpp> 
#include "threadsafe_oerrlog.h" 

void insertintobidask(pqxx::nontransaction &txn, std::string ziparchivename, OFStreamWriter &errlog) 
{ 
    std::string fileyearmonth = ziparchivename.substr(27, 6); 
    std::string ziparchivepath = "/home/vorlket/Desktop/Project/Code/Test/Data/HISTDATA_COM_ASCII_AUDUSD_T" + fileyearmonth + ".zip"; 
    std::string zipfilepath = "DAT_ASCII_AUDUSD_T_" + fileyearmonth + ".csv"; 
    int err, r; 
    char buffer[39]; // each line takes up 39 bytes 

    struct zip *ziparchive = zip_open(ziparchivepath.c_str(), 0, &err); 
    if (ziparchive) 
    { 
    struct zip_file *zipfile = zip_fopen(ziparchive, zipfilepath.c_str(), 0); 
    if (zipfile) 
    { 
     while ((r = zip_fread(zipfile, buffer, sizeof(buffer))) > 0) 
     { 
     std::string str(buffer); 
     txn.exec("INSERT INTO fx.bidask VALUES('AUDUSD', to_timestamp(" +txn.quote(str.substr(0, 18)) + ", 'YYYYMMDD HH24MISSMS'), " + txn.quote(str.substr(19, 8)) + ", " + txn.quote(str.substr(28, 8)) + ")"); 
     } 
     zip_fclose(zipfile); 
     std::cout << fileyearmonth << std::endl; 
    } 
    else 
    { 
     errlog << zipfilepath; 
    } 
    } 
    else 
    { 
    errlog << ziparchivepath; 
    } 

    zip_close(ziparchive); 
} 


int main() 
{ 
    pqxx::connection conn1("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn1(conn1); 
    pqxx::connection conn2("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn2(conn2); 
    pqxx::connection conn3("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn3(conn3); 
    pqxx::connection conn4("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn4(conn4); 

    std::ofstream errlog("/home/vorlket/Desktop/Project/Code/Test/errlog.txt"); 
    OFStreamWriter ofsw(&errlog); 

    boost::asio::io_service service1; // queue 
    boost::asio::io_service service2; 
    boost::asio::io_service service3; 
    boost::asio::io_service service4; 

    boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data"); 
    std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip"); 
    int serviceid = 0; 
    for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter) 
    { 
    std::string name = iter->path().filename().string(); 
    if (std::regex_match(name, pattern_fx)) 
    { 
     serviceid %= 3; 
     switch (serviceid) 
     { 
     case 0 : 
      service1.post([&txn1, name, &ofsw]() { insertintobidask(txn1, name, ofsw); }); 
      break; 
     case 1 : 
      service2.post([&txn2, name, &ofsw]() { insertintobidask(txn2, name, ofsw); }); 
      break; 
     case 2 : 
      service3.post([&txn3, name, &ofsw]() { insertintobidask(txn3, name, ofsw); }); 
      break; 
     case 3 : 
      service4.post([&txn4, name, &ofsw]() { insertintobidask(txn4, name, ofsw); }); 
      break; 
     } 
     ++serviceid; 
    } 
    } 

    std::thread t1([&service1]() { service1.run(); }); 
    std::thread t2([&service2]() { service2.run(); }); 
    std::thread t3([&service3]() { service3.run(); }); 
    std::thread t4([&service4]() { service4.run(); }); 

    t1.join(); 
    t2.join(); 
    t3.join(); 
    t4.join(); 

} 

不知道哪種方法更快,但我想這取決於工作負載和正在開發的平臺。值得一試,看看哪個更快。任何意見,哪種方法會更快,誰讚賞。

+0

類似這樣的問題在這裏不歡迎,並經常導致重downvoting。與我們分享您已經實施/嘗試到現在,並詢問您的代碼無法工作的具體問題。說,歡迎來到SO! – Arunmu

+0

看看[線程支持庫](http://en.cppreference.com/w/cpp/thread)。你會在那裏找到你需要的大部分東西。 – Aconcagua

+0

編輯這個問題來分享我所嘗試過的。 – vorlket

回答

2

除非這是爲了學習/它不夠快的事情,否則我會將這些crud操作委託給現有機制。而我更喜歡使用boost::asio::io_service這個確切類型的事情..

代碼如下:

// Additional header 
#include <boost/asio.hpp> 

int main() 
{ 
    std::ofstream errlog 
    errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out); 
    OFStreamWriter ofsw(&errlog); 

    boost::asio::io_service service; // queue 
    boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data"); 
    std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip"); 
    for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter) 
    { 
    std::string name = iter->path().filename().string(); 
    if (std::regex_match(name, pattern_fx)) 
    { 
     service.post([name]() { 
     // Do something with this file 
     }); 
    } 
    } 
    // Now start-up n-threads to dispatch on the io_service 
    std::thread t1([&service]() { service.run(); }); // this will dispatch on queue until there is nothing left to do... 
    std::thread t2([&service]() { service.run(); }); 
    std::thread t3([&service]() { service.run(); }); 
    std::thread t4([&service]() { service.run(); }); 
    : 

    // Wait for them to complete 
    t1.join(); 
    t2.join(); 
    t3.join(); 
    t4.join(); 
} 
+0

Nim,我在lambda中插入了一個函數調用,但這些線程似乎沒有做任何工作。我試過的代碼在上面編輯的問題中。 – vorlket

+0

@vorlket,這些變化看起來不錯,我看不出有什麼特別的錯誤 - 可能是一些日誌會幫助我們看看事情是在哪裏? – Nim

+0

inserintobidask函數中存在相對/絕對路徑的錯誤。有用。感謝分享。 – vorlket