From e865f3d6791d74353c0312e69a12abe24d12a1b2 Mon Sep 17 00:00:00 2001 From: BlackMATov Date: Mon, 17 Dec 2018 17:47:54 +0700 Subject: [PATCH] extended scheduler processing status --- scheduler.hpp | 61 ++++++++++++++++++++----------- scheduler_tests.cpp | 89 ++++++++++++++++++++++++++++++++++----------- 2 files changed, 106 insertions(+), 44 deletions(-) diff --git a/scheduler.hpp b/scheduler.hpp index 309a7c9..226c39d 100644 --- a/scheduler.hpp +++ b/scheduler.hpp @@ -35,10 +35,10 @@ namespace scheduler_hpp highest }; - enum class scheduler_wait_status { - no_timeout, - cancelled, - timeout + enum class scheduler_processing_status { + done, + timeout, + cancelled }; class scheduler_cancelled_exception : public std::runtime_error { @@ -52,6 +52,10 @@ namespace scheduler_hpp scheduler(); ~scheduler() noexcept; + using processing_result_t = std::pair< + scheduler_processing_status, + std::size_t>; + template < typename F, typename... Args > using schedule_invoke_result_t = invoke_hpp::invoke_result_t< std::decay_t, @@ -65,15 +69,15 @@ namespace scheduler_hpp , typename R = schedule_invoke_result_t > promise schedule(scheduler_priority scheduler_priority, F&& f, Args&&... args); - bool process_one_task() noexcept; - scheduler_wait_status process_all_tasks() noexcept; + processing_result_t process_one_task() noexcept; + processing_result_t process_all_tasks() noexcept; template < typename Rep, typename Period > - scheduler_wait_status process_tasks_for( + processing_result_t process_tasks_for( const std::chrono::duration& timeout_duration) noexcept; template < typename Clock, typename Duration > - scheduler_wait_status process_tasks_until( + processing_result_t process_tasks_until( const std::chrono::time_point& timeout_time) noexcept; private: class task; @@ -158,16 +162,20 @@ namespace scheduler_hpp return future; } - inline bool scheduler::process_one_task() noexcept { + inline scheduler::processing_result_t scheduler::process_one_task() noexcept { std::unique_lock lock(tasks_mutex_); + if ( cancelled_ ) { + return std::make_pair(scheduler_processing_status::cancelled, 0u); + } if ( tasks_.empty() ) { - return false; + return std::make_pair(scheduler_processing_status::done, 0u); } process_task_(std::move(lock)); - return true; + return std::make_pair(scheduler_processing_status::done, 1u); } - inline scheduler_wait_status scheduler::process_all_tasks() noexcept { + inline scheduler::processing_result_t scheduler::process_all_tasks() noexcept { + std::size_t processed_tasks = 0; while ( !cancelled_ && active_task_count_ ) { std::unique_lock lock(tasks_mutex_); cond_var_.wait(lock, [this](){ @@ -175,15 +183,18 @@ namespace scheduler_hpp }); if ( !tasks_.empty() ) { process_task_(std::move(lock)); + ++processed_tasks; } } - return cancelled_ - ? scheduler_wait_status::cancelled - : scheduler_wait_status::no_timeout; + return std::make_pair( + cancelled_ + ? scheduler_processing_status::cancelled + : scheduler_processing_status::done, + processed_tasks); } template < typename Rep, typename Period > - scheduler_wait_status scheduler::process_tasks_for( + scheduler::processing_result_t scheduler::process_tasks_for( const std::chrono::duration& timeout_duration) noexcept { return process_tasks_until( @@ -191,12 +202,15 @@ namespace scheduler_hpp } template < typename Clock, typename Duration > - scheduler_wait_status scheduler::process_tasks_until( + scheduler::processing_result_t scheduler::process_tasks_until( const std::chrono::time_point& timeout_time) noexcept { + std::size_t processed_tasks = 0; while ( !cancelled_ && active_task_count_ ) { if ( !(Clock::now() < timeout_time) ) { - return scheduler_wait_status::timeout; + return std::make_pair( + scheduler_processing_status::timeout, + processed_tasks); } std::unique_lock lock(tasks_mutex_); cond_var_.wait_until(lock, timeout_time, [this](){ @@ -204,18 +218,21 @@ namespace scheduler_hpp }); if ( !tasks_.empty() ) { process_task_(std::move(lock)); + ++processed_tasks; } } - return cancelled_ - ? scheduler_wait_status::cancelled - : scheduler_wait_status::no_timeout; + return std::make_pair( + cancelled_ + ? scheduler_processing_status::cancelled + : scheduler_processing_status::done, + processed_tasks); } inline void scheduler::push_task_(scheduler_priority priority, task_ptr task) { tasks_.emplace_back(priority, std::move(task)); std::push_heap(tasks_.begin(), tasks_.end()); ++active_task_count_; - cond_var_.notify_all(); + cond_var_.notify_one(); } inline scheduler::task_ptr scheduler::pop_task_() noexcept { diff --git a/scheduler_tests.cpp b/scheduler_tests.cpp index b2065ad..a583de3 100644 --- a/scheduler_tests.cpp +++ b/scheduler_tests.cpp @@ -38,13 +38,20 @@ TEST_CASE("scheduler") { int counter = 0; s.schedule([&counter](){ ++counter; }); REQUIRE(counter == 0); - s.process_all_tasks(); + REQUIRE(s.process_all_tasks() == std::make_pair( + sd::scheduler_processing_status::done, + std::size_t(1u))); REQUIRE(counter == 1); s.schedule([&counter](){ ++counter; }); s.schedule([&counter](){ ++counter; }); REQUIRE(counter == 1); - s.process_all_tasks(); + REQUIRE(s.process_all_tasks() == std::make_pair( + sd::scheduler_processing_status::done, + std::size_t(2u))); REQUIRE(counter == 3); + REQUIRE(s.process_all_tasks() == std::make_pair( + sd::scheduler_processing_status::done, + std::size_t(0u))); } { sd::scheduler s; @@ -53,13 +60,21 @@ TEST_CASE("scheduler") { s.schedule([&counter](){ ++counter; }); s.schedule([&counter](){ ++counter; }); REQUIRE(counter == 0); - REQUIRE(s.process_one_task()); + REQUIRE(s.process_one_task() == std::make_pair( + sd::scheduler_processing_status::done, + std::size_t(1u))); REQUIRE(counter == 1); - REQUIRE(s.process_one_task()); + REQUIRE(s.process_one_task() == std::make_pair( + sd::scheduler_processing_status::done, + std::size_t(1u))); REQUIRE(counter == 2); - REQUIRE(s.process_one_task()); + REQUIRE(s.process_one_task() == std::make_pair( + sd::scheduler_processing_status::done, + std::size_t(1u))); REQUIRE(counter == 3); - REQUIRE_FALSE(s.process_one_task()); + REQUIRE(s.process_one_task() == std::make_pair( + sd::scheduler_processing_status::done, + std::size_t(0u))); REQUIRE(counter == 3); } { @@ -71,14 +86,28 @@ TEST_CASE("scheduler") { std::this_thread::sleep_for(std::chrono::milliseconds(5)); }); } - s.process_tasks_for(std::chrono::milliseconds(-1)); - s.process_tasks_for(std::chrono::milliseconds(0)); + REQUIRE(s.process_tasks_for(std::chrono::milliseconds(-1)) == std::make_pair( + sd::scheduler_processing_status::timeout, + std::size_t(0u))); + REQUIRE(s.process_tasks_for(std::chrono::milliseconds(0)) == std::make_pair( + sd::scheduler_processing_status::timeout, + std::size_t(0u))); REQUIRE(counter == 0); - s.process_tasks_for(std::chrono::milliseconds(100)); - REQUIRE(counter > 2); - REQUIRE(counter < 50); - s.process_tasks_for(std::chrono::seconds(3)); - REQUIRE(counter == 50); + { + auto r = s.process_tasks_for(std::chrono::milliseconds(100)); + REQUIRE(r.first == sd::scheduler_processing_status::timeout); + REQUIRE(r.second > 2); + REQUIRE(r.second < 50); + REQUIRE(counter > 2); + REQUIRE(counter < 50); + } + { + auto r = s.process_tasks_for(std::chrono::seconds(3)); + REQUIRE(r.first == sd::scheduler_processing_status::done); + REQUIRE(r.second > 0); + REQUIRE(r.second < 50); + REQUIRE(counter == 50); + } } { sd::scheduler s; @@ -96,15 +125,29 @@ TEST_CASE("scheduler") { const auto b = time_now(); - s.process_tasks_until(time_now() - std::chrono::milliseconds(1)); - s.process_tasks_until(time_now()); + REQUIRE(s.process_tasks_until(time_now() - std::chrono::milliseconds(1)) == std::make_pair( + sd::scheduler_processing_status::timeout, + std::size_t(0u))); + REQUIRE(s.process_tasks_until(time_now()) == std::make_pair( + sd::scheduler_processing_status::timeout, + std::size_t(0u))); REQUIRE(counter == 0); - s.process_tasks_until(time_now() + std::chrono::milliseconds(100)); - REQUIRE(time_now() - b > std::chrono::milliseconds(50)); - REQUIRE(counter > 2); - REQUIRE(counter < 50); - s.process_tasks_until(time_now() + std::chrono::seconds(3)); - REQUIRE(counter == 50); + { + auto r = s.process_tasks_until(time_now() + std::chrono::milliseconds(100)); + REQUIRE(time_now() - b > std::chrono::milliseconds(50)); + REQUIRE(r.first == sd::scheduler_processing_status::timeout); + REQUIRE(r.second > 2); + REQUIRE(r.second < 50); + REQUIRE(counter > 2); + REQUIRE(counter < 50); + } + { + auto r = s.process_tasks_until(time_now() + std::chrono::seconds(3)); + REQUIRE(r.first == sd::scheduler_processing_status::done); + REQUIRE(r.second > 0); + REQUIRE(r.second < 50); + REQUIRE(counter == 50); + } } { sd::scheduler s; @@ -124,7 +167,9 @@ TEST_CASE("scheduler") { s.schedule(sd::scheduler_priority::normal, [](std::string& acc){ acc.append("l"); }, std::ref(accumulator)); - s.process_all_tasks(); + REQUIRE(s.process_all_tasks() == std::make_pair( + sd::scheduler_processing_status::done, + std::size_t(5u))); REQUIRE(accumulator == "hello"); } }