我在寫一個當前從UDP接收數據的UDP服務器將其包裝在一個對象中並將它們放入一個併發隊列中。併發隊列是這裏提供的實現:http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html全局靜態變量的副作用
工作線程池將數據拉出隊列進行處理。
隊列被定義爲全球:
static concurrent_queue<boost::shared_ptr<Msg> > g_work_queue_;
現在我遇到的問題是,如果我只是寫一個函數來產生數據,並將其插入到隊列中,並創建一些消費者線程拉他們它工作正常。 但是,當我添加基於UDP的生產者時,工作線程停止收到隊列中數據到達的通知。
我已經將問題跟蹤到了concurrent_queue中推送函數的末尾。 具體而言:the_condition_variable.notify_one(); 使用我的網絡代碼時不返回。
所以這個問題與我寫網絡代碼的方式有關。
下面是它的樣子。
enum
{
MAX_LENGTH = 1500
};
class Msg
{
public:
Msg()
{
static int i = 0;
i_ = i++;
printf("Construct ObbsMsg: %d\n", i_);
}
~Msg()
{
printf("Destruct ObbsMsg: %d\n", i_);
}
const char* toString() { return data_; }
private:
friend class server;
udp::endpoint sender_endpoint_;
char data_[MAX_LENGTH];
int i_;
};
class server
{
public:
server::server(boost::asio::io_service& io_service)
: io_service_(io_service),
socket_(io_service, udp::endpoint(udp::v4(), PORT))
{
waitForNextMessage();
}
void server::waitForNextMessage()
{
printf("Waiting for next msg\n");
next_msg_.reset(new Msg());
socket_.async_receive_from(
boost::asio::buffer(next_msg_->data_, MAX_LENGTH), sender_endpoint_,
boost::bind(&server::handleReceiveFrom, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void server::handleReceiveFrom(const boost::system::error_code& error, size_t bytes_recvd)
{
if (!error && bytes_recvd > 0) {
printf("got data: %s. Adding to work queue\n", next_msg_->toString());
g_work_queue.push(next_msg_); // Add received msg to work queue
waitForNextMessage();
} else {
waitForNextMessage();
}
}
private:
boost::asio::io_service& io_service_;
udp::socket socket_;
udp::endpoint sender_endpoint_;
boost::shared_ptr<Msg> next_msg_;
}
int main(int argc, char* argv[])
{
try{
boost::asio::io_service io_service;
server s(io_service);
io_service.run();
catch(std::exception& e){
std::err << "Exception: " << e.what() << std::endl;
}
return 0;
}
現在我發現如果handle_receive_from能夠返回,那麼concurrent_queue返回的notify_one()。所以我認爲這是因爲我有一個遞歸循環。 那麼開始監聽新數據的正確方法是什麼?並且是異步udp服務器的例子有缺陷,因爲我根據他們已經在做的事情來制定它。
編輯:好的問題剛剛變得更加怪異。
我在這裏沒有提到的是我有一個叫做處理器的類。 處理器看起來是這樣的:
class processor
{
public:
processor::processor(int thread_pool_size) :
thread_pool_size_(thread_pool_size) { }
void start()
{
boost::thread_group threads;
for (std::size_t i = 0; i < thread_pool_size_; ++i){
threads.create_thread(boost::bind(&ObbsServer::worker, this));
}
}
void worker()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
private:
int thread_pool_size_;
};
現在看來,如果我提取職工功能出在它自己的,並從主啓動線程 。有用!有人可以解釋爲什麼一個線程的功能與我在課堂以外期望的一樣,但是在它裏面有副作用嗎?
EDIT2:現在它仍然
我掏出兩個函數(完全一樣)變得更加古怪。
一個被稱爲消費者,另一個工人。
即
void worker()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
printf("waiting for msg\n");
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
void consumer()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
printf("waiting for msg\n");
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
現在,消費者住在server.cpp文件的頂部。即我們的服務器代碼也在這裏生存。
另一方面,工作人員住在processor.cpp文件中。
現在我暫時沒有使用處理器。主要功能現在看起來是這樣的:
void consumer();
void worker();
int main(int argc, char* argv[])
{
try {
boost::asio::io_service io_service;
server net(io_service);
//processor s(7);
boost::thread_group threads;
for (std::size_t i = 0; i < 7; ++i){
threads.create_thread(worker); // this doesn't work
// threads.create_thread(consumer); // THIS WORKS!?!?!?
}
// s.start();
printf("Server Started...\n");
boost::asio::io_service::work work(io_service);
io_service.run();
printf("exiting...\n");
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
爲什麼消費者能夠接收排隊的項目,但工人是沒有的。 它們是具有不同名稱的相同實現。
這沒有任何意義。有任何想法嗎?
下面是示例輸出接收的TXT的 「Hello World」 的時候:
輸出1:不工作。在調用工作者函數或使用處理器類時。
Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1
輸出2:在調用與輔助函數相同的使用者函數時工作。
Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1
Got msg: hello world <----- this is what I've been wanting to see!
Destruct ObbsMsg: 0
waiting for msg
一個更好的名字將有助於其他人在未來找到這個。 – 2010-08-05 08:25:12
謝謝,我希望現在這個名字更有意義。在這裏學到了一個重要的教訓。 – Matt 2010-08-05 23:12:55