我正在使用Boost ASIO庫作爲線程池,這是廣泛描述的。但是,如果線程處理的時間超過1秒並且移動到線程的下一個發佈任務,我想要中斷每個線程。用線程超時提升ASIO線程池
我可以在此使用單獨deadline_timer,這是復位如果線程在最後期限之前完成或中斷該線程應的任務去太久很容易地實現。不過,我認爲這將被納入ASIO。由於網絡操作超時,有一項任務似乎很自然。但是我無法在API中看到任何內容,只能簡單地做到這一點。
任何人都可以告訴我該功能是否已經存在?還是應該按照我描述的方式實施?
我正在使用Boost ASIO庫作爲線程池,這是廣泛描述的。但是,如果線程處理的時間超過1秒並且移動到線程的下一個發佈任務,我想要中斷每個線程。用線程超時提升ASIO線程池
我可以在此使用單獨deadline_timer,這是復位如果線程在最後期限之前完成或中斷該線程應的任務去太久很容易地實現。不過,我認爲這將被納入ASIO。由於網絡操作超時,有一項任務似乎很自然。但是我無法在API中看到任何內容,只能簡單地做到這一點。
任何人都可以告訴我該功能是否已經存在?還是應該按照我描述的方式實施?
這是一個快速解決方案,我敲在一起。
它要求你提交的函數對象接受exec_context
類型的參數。
在io_service對象運行可以查詢.canceled()
訪問(這是原子),以確定它是否應該取消初期的任務。
然後,它可以拋出一個異常或返回它打算返回的任何值。
呼叫者通過submit
功能提交。該函數使用上下文對象包裝worker函數,並將其返回值和/或異常編組到std :: future中。
調用者可以根據需要查詢或等待這個未來(或忽略它)。
呼叫者得到一個處理對象,其具有方法cancel()
就可以了。使用這個句柄,調用者可以取消,查詢或等待提交的任務。
希望它有幫助。寫作很有趣。
#include <boost/asio.hpp>
#include <iostream>
#include <atomic>
#include <thread>
#include <chrono>
#include <future>
#include <stdexcept>
#include <exception>
#include <utility>
#include <type_traits>
//
// an object to allow the caller to communicate a cancellation request to the
// submitted task
//
struct exec_controller
{
/// @returns previous cancellation request state;
bool notify_cancel()
{
return _should_cancel.exchange(true);
}
bool should_cancel() const {
return _should_cancel;
}
private:
std::atomic<bool> _should_cancel = { false };
};
template<class Ret>
struct exec_state : std::enable_shared_from_this<exec_state<Ret>>
{
using return_type = Ret;
bool notify_cancel() {
return _controller.notify_cancel();
}
std::shared_ptr<exec_controller>
get_controller_ptr() {
return std::shared_ptr<exec_controller>(this->shared_from_this(),
std::addressof(_controller));
}
std::promise<return_type>& promise() { return _promise; }
private:
std::promise<return_type> _promise;
exec_controller _controller;
};
struct applyer;
struct exec_context
{
exec_context(std::shared_ptr<exec_controller> impl)
: _impl(impl)
{}
bool canceled() const {
return _impl->should_cancel();
}
private:
friend applyer;
std::shared_ptr<exec_controller> _impl;
};
struct applyer
{
template<class F, class Ret>
void operator()(F& f, std::shared_ptr<exec_state<Ret>> const& p) const
{
try {
p->promise().set_value(f(exec_context { p->get_controller_ptr() }));
}
catch(...) {
p->promise().set_exception(std::current_exception());
}
}
template<class F>
void operator()(F& f, std::shared_ptr<exec_state<void>> const& p) const
{
try {
f(exec_context { p->get_controller_ptr() });
p->promise().set_value();
}
catch(...) {
p->promise().set_exception(std::current_exception());
}
}
};
template<class Ret>
struct exec_result
{
using return_type = Ret;
exec_result(std::shared_ptr<exec_state<return_type>> p)
: _impl(p)
{}
bool cancel() {
return _impl->notify_cancel();
}
std::future<Ret>& get_future()
{
return _future;
}
private:
std::shared_ptr<exec_state<return_type>> _impl;
std::future<return_type> _future { _impl->promise().get_future() };
};
template<class Executor, class F>
auto submit(Executor& exec, F&& f)
{
using function_type = std::decay_t<F>;
using result_type = std::result_of_t<function_type(exec_context)>;
using state_type = exec_state<result_type>;
auto shared_state = std::make_shared<state_type>();
exec.post([shared_state, f = std::forward<F>(f)]
{
applyer()(f, shared_state);
});
return exec_result<result_type>(std::move(shared_state));
}
int main()
{
using namespace std::literals;
boost::asio::io_service ios;
boost::asio::io_service::strand strand(ios);
boost::asio::io_service::work work(ios);
std::thread runner([&] { ios.run(); });
std::thread runner2([&] { ios.run(); });
auto func = [](auto context)
{
for(int i = 0 ; i < 1000 ; ++i)
{
if (context.canceled())
throw std::runtime_error("canceled");
std::this_thread::sleep_for(100ms);
}
};
auto handle = submit(strand, func);
auto handle2 = submit(ios, [](auto context) { return 2 + 2; });
// cancel the handle, or wait on it as you wish
std::this_thread::sleep_for(1s);
handle.cancel();
handle2.cancel(); // prove that late cancellation is a nop
try {
std::cout << "2 + 2 is " << handle2.get_future().get() << std::endl;
}
catch(std::exception& e)
{
std::cerr << "failed to add 2 + 2 : " << e.what() << std::endl;
}
try {
handle.get_future().get();
std::cout << "task completed" << std::endl;
}
catch(std::exception const& e) {
std::cout << "task threw exception: " << e.what() << std::endl;
}
ios.stop();
runner.join();
runner2.join();
}
更新:v2爲類增加了一些隱私保護,演示了2個同步任務。
預期輸出:如果沒有執行處理
2 + 2 is 4
task threw exception: canceled
的截止時間定時器的做法只會中斷線程。我假設(因爲你正在使用它作爲線程池),你想要中斷已經執行超過1秒的處理程序。這要複雜得多(並且在ASIO庫中沒有直接的支持)。 – Chad
定義「中斷」。你的意思是你希望在處理程序本身之外終止正在運行的完成處理程序嗎? –
@RichardHodges是的,我可以在執行該工作的線程上調用interrupt(),並在我的代碼中放置一個boost :: this_thread :: interruption_point()來促進此操作。 – Alex