diff --git a/jobber.hpp b/jobber.hpp index db4341e..7a74380 100644 --- a/jobber.hpp +++ b/jobber.hpp @@ -53,6 +53,10 @@ namespace jobber_hpp explicit jobber(std::size_t threads); ~jobber() noexcept; + using active_wait_result_t = std::pair< + jobber_wait_status, + std::size_t>; + template < typename F, typename... Args > using async_invoke_result_t = invoke_hpp::invoke_result_t< std::decay_t, @@ -71,7 +75,8 @@ namespace jobber_hpp bool is_paused() const noexcept; jobber_wait_status wait_all() const noexcept; - jobber_wait_status active_wait_all() noexcept; + active_wait_result_t active_wait_all() noexcept; + active_wait_result_t active_wait_one() noexcept; template < typename Rep, typename Period > jobber_wait_status wait_all_for( @@ -82,11 +87,11 @@ namespace jobber_hpp const std::chrono::time_point& timeout_time) const; template < typename Rep, typename Period > - jobber_wait_status active_wait_all_for( + active_wait_result_t active_wait_all_for( const std::chrono::duration& timeout_duration); template < typename Clock, typename Duration > - jobber_wait_status active_wait_all_until( + active_wait_result_t active_wait_all_until( const std::chrono::time_point& timeout_time); private: class task; @@ -210,7 +215,8 @@ namespace jobber_hpp : jobber_wait_status::no_timeout; } - inline jobber_wait_status jobber::active_wait_all() noexcept { + inline jobber::active_wait_result_t jobber::active_wait_all() noexcept { + std::size_t processed_tasks = 0; while ( !cancelled_ && active_task_count_ ) { std::unique_lock lock(tasks_mutex_); cond_var_.wait(lock, [this](){ @@ -218,11 +224,26 @@ namespace jobber_hpp }); if ( !tasks_.empty() ) { process_task_(std::move(lock)); + ++processed_tasks; } } - return cancelled_ - ? jobber_wait_status::cancelled - : jobber_wait_status::no_timeout; + return std::make_pair( + cancelled_ + ? jobber_wait_status::cancelled + : jobber_wait_status::no_timeout, + processed_tasks); + } + + inline jobber::active_wait_result_t jobber::active_wait_one() noexcept { + std::unique_lock lock(tasks_mutex_); + if ( cancelled_ ) { + return std::make_pair(jobber_wait_status::cancelled, 0u); + } + if ( tasks_.empty() ) { + return std::make_pair(jobber_wait_status::no_timeout, 0u); + } + process_task_(std::move(lock)); + return std::make_pair(jobber_wait_status::no_timeout, 1u); } template < typename Rep, typename Period > @@ -245,7 +266,7 @@ namespace jobber_hpp } template < typename Rep, typename Period > - jobber_wait_status jobber::active_wait_all_for( + jobber::active_wait_result_t jobber::active_wait_all_for( const std::chrono::duration& timeout_duration) { return active_wait_all_until( @@ -253,12 +274,15 @@ namespace jobber_hpp } template < typename Clock, typename Duration > - jobber_wait_status jobber::active_wait_all_until( + jobber::active_wait_result_t jobber::active_wait_all_until( const std::chrono::time_point& timeout_time) { + std::size_t processed_tasks = 0; while ( !cancelled_ && active_task_count_ ) { if ( !(Clock::now() < timeout_time) ) { - return jobber_wait_status::timeout; + return std::make_pair( + jobber_wait_status::timeout, + processed_tasks); } std::unique_lock lock(tasks_mutex_); cond_var_.wait_until(lock, timeout_time, [this](){ @@ -266,11 +290,14 @@ namespace jobber_hpp }); if ( !tasks_.empty() ) { process_task_(std::move(lock)); + ++processed_tasks; } } - return cancelled_ - ? jobber_wait_status::cancelled - : jobber_wait_status::no_timeout; + return std::make_pair( + cancelled_ + ? jobber_wait_status::cancelled + : jobber_wait_status::no_timeout, + processed_tasks); } inline void jobber::push_task_(jobber_priority priority, task_ptr task) { diff --git a/jobber_tests.cpp b/jobber_tests.cpp index a279bb9..eb79d6c 100644 --- a/jobber_tests.cpp +++ b/jobber_tests.cpp @@ -117,6 +117,45 @@ TEST_CASE("jobber") { j.active_wait_all(); REQUIRE(counter == 10); } + { + jb::jobber j(1); + std::atomic counter = ATOMIC_VAR_INIT(0); + j.pause(); + for ( std::size_t i = 0; i < 3; ++i ) { + j.async([&counter](){ + ++counter; + }); + } + REQUIRE(counter == 0); + { + auto r = j.active_wait_one(); + REQUIRE(r.first == jb::jobber_wait_status::no_timeout); + REQUIRE(r.second == 1); + } + REQUIRE(counter == 1); + { + auto r = j.active_wait_one(); + REQUIRE(r.first == jb::jobber_wait_status::no_timeout); + REQUIRE(r.second == 1); + } + REQUIRE(counter == 2); + { + auto r = j.active_wait_one(); + REQUIRE(r.first == jb::jobber_wait_status::no_timeout); + REQUIRE(r.second == 1); + } + REQUIRE(counter == 3); + { + auto r = j.active_wait_one(); + REQUIRE(r.first == jb::jobber_wait_status::no_timeout); + REQUIRE(r.second == 0); + } + REQUIRE(counter == 3); + j.resume(); + REQUIRE(j.wait_all() == jb::jobber_wait_status::no_timeout); + REQUIRE(j.active_wait_one().first == jb::jobber_wait_status::no_timeout); + REQUIRE(counter == 3); + } { jb::jobber j(1); @@ -126,16 +165,16 @@ TEST_CASE("jobber") { REQUIRE(jb::jobber_wait_status::no_timeout == j.wait_all_for(std::chrono::milliseconds(-1))); REQUIRE(jb::jobber_wait_status::no_timeout == j.wait_all_until(time_now() + std::chrono::milliseconds(-1))); - REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_for(std::chrono::milliseconds(-1))); - REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_until(time_now() + std::chrono::milliseconds(-1))); + REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_for(std::chrono::milliseconds(-1)).first); + REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_until(time_now() + std::chrono::milliseconds(-1)).first); j.pause(); j.async([]{}); REQUIRE(jb::jobber_wait_status::timeout == j.wait_all_for(std::chrono::milliseconds(-1))); REQUIRE(jb::jobber_wait_status::timeout == j.wait_all_until(time_now() + std::chrono::milliseconds(-1))); - REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_for(std::chrono::milliseconds(-1))); - REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_until(time_now() + std::chrono::milliseconds(-1))); + REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_for(std::chrono::milliseconds(-1)).first); + REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_until(time_now() + std::chrono::milliseconds(-1)).first); } { jb::jobber j(1); @@ -244,9 +283,18 @@ TEST_CASE("jobber") { std::this_thread::sleep_for(std::chrono::milliseconds(5)); }); } - REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_for(std::chrono::milliseconds(50))); + { + auto r = j.active_wait_all_for(std::chrono::milliseconds(50)); + REQUIRE(jb::jobber_wait_status::timeout == r.first); + REQUIRE(r.second > 0); + } REQUIRE(counter > 0); - REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_for(std::chrono::seconds(5))); + { + auto r = j.active_wait_all_for(std::chrono::seconds(3)); + REQUIRE(jb::jobber_wait_status::no_timeout == r.first); + REQUIRE(r.second > 0); + REQUIRE(r.second < 30); + } REQUIRE(counter == 30); } {