2010-08-05 123 views
0

我在寫一個當前從UDP接收數據的UDP服務器將其包裝在一個對象中並將它們放入一個併發隊列中。併發隊列是這裏提供的實現:http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html全局靜態變量的副作用

工作線程池將數據拉出隊列進行處理。

隊列被定義爲全球:

static concurrent_queue<boost::shared_ptr<Msg> > g_work_queue_; 

現在我遇到的問題是,如果我只是寫一個函數來產生數據,並將其插入到隊列中,並創建一些消費者線程拉他們它工作正常。 但是,當我添加基於UDP的生產者時,工作線程停止收到隊列中數據到達的通知。

我已經將問題跟蹤到了concurrent_queue中推送函數的末尾。 具體而言:the_condition_variable.notify_one(); 使用我的網絡代碼時不返回。

所以這個問題與我寫網絡代碼的方式有關。

下面是它的樣子。

enum 
{ 
    MAX_LENGTH = 1500 
}; 


class Msg 
{ 
    public: 
    Msg() 
    { 
     static int i = 0; 
     i_ = i++; 
     printf("Construct ObbsMsg: %d\n", i_); 
    } 

    ~Msg() 
    { 
     printf("Destruct ObbsMsg: %d\n", i_); 
    } 

    const char* toString() { return data_; } 

    private: 
    friend class server; 

    udp::endpoint sender_endpoint_; 
    char data_[MAX_LENGTH]; 
    int i_; 
}; 

class server 
{ 
public: 
    server::server(boost::asio::io_service& io_service) 
    : io_service_(io_service), 
     socket_(io_service, udp::endpoint(udp::v4(), PORT)) 
    { 
    waitForNextMessage(); 
    } 

    void server::waitForNextMessage() 
    { 
    printf("Waiting for next msg\n"); 

    next_msg_.reset(new Msg()); 

    socket_.async_receive_from(
     boost::asio::buffer(next_msg_->data_, MAX_LENGTH), sender_endpoint_, 
     boost::bind(&server::handleReceiveFrom, this, 
        boost::asio::placeholders::error, 
        boost::asio::placeholders::bytes_transferred)); 
    } 

    void server::handleReceiveFrom(const boost::system::error_code& error, size_t bytes_recvd) 
    { 
    if (!error && bytes_recvd > 0) { 
     printf("got data: %s. Adding to work queue\n", next_msg_->toString()); 
     g_work_queue.push(next_msg_); // Add received msg to work queue 
     waitForNextMessage(); 
    } else { 
     waitForNextMessage(); 
    } 
    } 

private: 
    boost::asio::io_service& io_service_; 
    udp::socket socket_; 

    udp::endpoint sender_endpoint_; 
    boost::shared_ptr<Msg> next_msg_; 
} 

int main(int argc, char* argv[]) 
{ 
    try{ 
     boost::asio::io_service io_service; 
     server s(io_service); 
     io_service.run(); 
    catch(std::exception& e){ 
     std::err << "Exception: " << e.what() << std::endl; 
    } 
    return 0; 
} 

現在我發現如果handle_receive_from能夠返回,那麼concurrent_queue返回的notify_one()。所以我認爲這是因爲我有一個遞歸循環。 那麼開始監聽新數據的正確方法是什麼?並且是異步udp服務器的例子有缺陷,因爲我根據他們已經在做的事情來制定它。

編輯:好的問題剛剛變得更加怪異。

我在這裏沒有提到的是我有一個叫做處理器的類。 處理器看起來是這樣的:

class processor 
{ 
public: 
    processor::processor(int thread_pool_size) : 
     thread_pool_size_(thread_pool_size) { } 

    void start() 
    { 
    boost::thread_group threads; 
    for (std::size_t i = 0; i < thread_pool_size_; ++i){ 
     threads.create_thread(boost::bind(&ObbsServer::worker, this)); 
    } 
    } 

    void worker() 
    { 
    while (true){ 
     boost::shared_ptr<ObbsMsg> msg; 
     g_work_queue.wait_and_pop(msg); 
     printf("Got msg: %s\n", msg->toString()); 
    } 
    } 

private: 
    int thread_pool_size_; 
}; 

現在看來,如果我提取職工功能出在它自己的,並從主啓動線程 。有用!有人可以解釋爲什麼一個線程的功能與我在課堂以外期望的一樣,但是在它裏面有副作用嗎?

EDIT2:現在它仍然

我掏出兩個函數(完全一樣)變得更加古怪。

一個被稱爲消費者,另一個工人。

void worker() 
{ 
    while (true){ 
     boost::shared_ptr<ObbsMsg> msg; 
     printf("waiting for msg\n"); 
     g_work_queue.wait_and_pop(msg); 
     printf("Got msg: %s\n", msg->toString()); 
    } 
} 

void consumer() 
{ 
    while (true){ 
     boost::shared_ptr<ObbsMsg> msg; 
     printf("waiting for msg\n"); 
     g_work_queue.wait_and_pop(msg); 
     printf("Got msg: %s\n", msg->toString()); 
    } 
} 

現在,消費者住在server.cpp文件的頂部。即我們的服務器代碼也在這裏生存。

另一方面,工作人員住在processor.cpp文件中。

現在我暫時沒有使用處理器。主要功能現在看起來是這樣的:

void consumer(); 
void worker(); 

int main(int argc, char* argv[]) 
{ 
    try { 
     boost::asio::io_service io_service; 
     server net(io_service); 
     //processor s(7); 

     boost::thread_group threads; 
     for (std::size_t i = 0; i < 7; ++i){ 
      threads.create_thread(worker); // this doesn't work 
      // threads.create_thread(consumer); // THIS WORKS!?!?!? 
     } 

//  s.start(); 

     printf("Server Started...\n"); 
     boost::asio::io_service::work work(io_service); 
     io_service.run(); 

     printf("exiting...\n"); 
    } catch (std::exception& e) { 
     std::cerr << "Exception: " << e.what() << "\n"; 
    } 

    return 0; 
} 

爲什麼消費者能夠接收排隊的項目,但工人是沒有的。 它們是具有不同名稱的相同實現。

這沒有任何意義。有任何想法嗎?

下面是示例輸出接收的TXT的 「Hello World」 的時候:

輸出1:不工作。在調用工作者函數或使用處理器類時。

Construct ObbsMsg: 0 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
Server Started... 
waiting for msg 
got data: hello world. Adding to work queue 
Construct ObbsMsg: 1 

輸出2:在調用與輔助函數相同的使用者函數時工作。

Construct ObbsMsg: 0 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
Server Started... 
waiting for msg 
got data: hello world. Adding to work queue 
Construct ObbsMsg: 1 
Got msg: hello world <----- this is what I've been wanting to see! 
Destruct ObbsMsg: 0 
waiting for msg 
+0

一個更好的名字將有助於其他人在未來找到這個。 – 2010-08-05 08:25:12

+0

謝謝,我希望現在這個名字更有意義。在這裏學到了一個重要的教訓。 – Matt 2010-08-05 23:12:55

回答

1

回答我自己的問題。

看來問題在於g_work_queue的聲明;

在頭文件中聲明爲:static concurrent_queue < boost :: shared_ptr> g_work_queue;

看來,聲明它是靜態的不是我想要做的。 顯然,致力於爲每個編譯的.o文件,顯然 單獨的鎖單獨的隊列對象等

這就解釋了爲什麼當隊列正在同一個源文件 在同一個文件中的消費者和生產者的內部操縱它工作。 但是,當在不同的文件中它並不是因爲線程實際上在不同的對象上等待。

所以我重新聲明瞭這樣的工作隊列。

-- workqueue.h -- 
extern concurrent_queue< boost::shared_ptr<Msg> > g_work_queue; 

-- workqueue.cpp -- 
#include "workqueue.h" 
concurrent_queue< boost::shared_ptr<Msg> > g_work_queue; 

這樣做可以解決問題。

+0

建議您接受您的解決方案,然後:) – 2010-08-05 08:23:12