diff --git a/jobber.hpp b/jobber.hpp index 7719e57..4cef528 100644 --- a/jobber.hpp +++ b/jobber.hpp @@ -17,7 +17,8 @@ #include #include #include -#include +#include +#include #include #include @@ -41,6 +42,12 @@ namespace jobber_hpp timeout }; + class jobber_cancelled_exception : public std::runtime_error { + public: + jobber_cancelled_exception() + : std::runtime_error("jobber has stopped working") {} + }; + class jobber final : private detail::noncopyable { public: explicit jobber(std::size_t threads); @@ -106,6 +113,7 @@ namespace jobber_hpp public: virtual ~task() noexcept = default; virtual void run() noexcept = 0; + virtual void cancel() noexcept = 0; }; template < typename R, typename F, typename... Args > @@ -117,7 +125,8 @@ namespace jobber_hpp template < typename U > concrete_task(U&& u, std::tuple&& args); void run() noexcept final; - promise promise() noexcept; + void cancel() noexcept final; + promise future() noexcept; }; template < typename F, typename... Args > @@ -129,7 +138,8 @@ namespace jobber_hpp template < typename U > concrete_task(U&& u, std::tuple&& args); void run() noexcept final; - promise promise() noexcept; + void cancel() noexcept final; + promise future() noexcept; }; } @@ -137,7 +147,7 @@ namespace jobber_hpp { inline jobber::jobber(std::size_t threads) { try { - threads_.resize(std::max(1u, threads)); + threads_.resize(threads); for ( std::thread& thread : threads_ ) { thread = std::thread(&jobber::worker_main_, this); } @@ -168,10 +178,10 @@ namespace jobber_hpp std::unique_ptr task = std::make_unique( std::forward(f), std::make_tuple(std::forward(args)...)); - promise promise = task->promise(); + promise future = task->future(); std::lock_guard guard(tasks_mutex_); push_task_(priority, std::move(task)); - return promise; + return future; } inline void jobber::pause() noexcept { @@ -283,6 +293,13 @@ namespace jobber_hpp inline void jobber::shutdown_() noexcept { { std::lock_guard guard(tasks_mutex_); + while ( !tasks_.empty() ) { + task_ptr task = pop_task_(); + if ( task ) { + task->cancel(); + --active_task_count_; + } + } cancelled_.store(true); cond_var_.notify_all(); } @@ -341,7 +358,12 @@ namespace jobber_hpp } template < typename R, typename F, typename... Args > - promise jobber::concrete_task::promise() noexcept { + void jobber::concrete_task::cancel() noexcept { + promise_.reject(jobber_cancelled_exception()); + } + + template < typename R, typename F, typename... Args > + promise jobber::concrete_task::future() noexcept { return promise_; } @@ -366,7 +388,12 @@ namespace jobber_hpp } template < typename F, typename... Args > - promise jobber::concrete_task::promise() noexcept { + void jobber::concrete_task::cancel() noexcept { + promise_.reject(jobber_cancelled_exception()); + } + + template < typename F, typename... Args > + promise jobber::concrete_task::future() noexcept { return promise_; } } diff --git a/tests.cpp b/tests.cpp index 1284d44..dd06958 100644 --- a/tests.cpp +++ b/tests.cpp @@ -7,12 +7,12 @@ #define CATCH_CONFIG_FAST_COMPILE #include "catch.hpp" -#include #include #include #include "jobber.hpp" #include "promise.hpp" + namespace jb = jobber_hpp; namespace pr = promise_hpp; @@ -989,6 +989,16 @@ TEST_CASE("jobber") { }); REQUIRE_THROWS_AS(pv0.get(), std::exception); } + { + auto pv0 = pr::promise(); + { + jb::jobber j{0}; + pv0 = j.async([](){ + return 42; + }); + } + REQUIRE_THROWS_AS(pv0.get(), jb::jobber_cancelled_exception); + } { int v5 = 5; @@ -1015,13 +1025,14 @@ TEST_CASE("jobber") { REQUIRE(v5 == 4); } { + const float pi = 3.14159265358979323846264338327950288f; jb::jobber j(1); auto p0 = j.async([](float angle){ return std::sin(angle); - }, M_PI); + }, pi); auto p1 = j.async([](float angle){ return std::cos(angle); - }, M_PI * 2); + }, pi * 2); REQUIRE(p0.get() == Approx(0.f).margin(0.01f)); REQUIRE(p1.get() == Approx(1.f).margin(0.01f)); }