From 59778fd048ab0cfab048d30ab7d30fef1b44bae4 Mon Sep 17 00:00:00 2001 From: BlackMATov Date: Wed, 12 Dec 2018 22:36:44 +0700 Subject: [PATCH] bonuses: scheduler --- scheduler.hpp | 307 ++++++++++++++++++++++++++++++++++++++++++++++++++ tests.cpp | 103 ++++++++++++++++- 2 files changed, 409 insertions(+), 1 deletion(-) create mode 100644 scheduler.hpp diff --git a/scheduler.hpp b/scheduler.hpp new file mode 100644 index 0000000..303fa57 --- /dev/null +++ b/scheduler.hpp @@ -0,0 +1,307 @@ +/******************************************************************************* + * This file is part of the "promise.hpp" + * For conditions of distribution and use, see copyright notice in LICENSE.md + * Copyright (C) 2018 Matvey Cherevko + ******************************************************************************/ + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "promise.hpp" + +namespace scheduler_hpp +{ + using namespace promise_hpp; + + enum class scheduler_priority { + lowest, + below_normal, + normal, + above_normal, + highest + }; + + enum class scheduler_wait_status { + no_timeout, + cancelled, + timeout + }; + + class scheduler_cancelled_exception : public std::runtime_error { + public: + scheduler_cancelled_exception() + : std::runtime_error("scheduler has stopped working") {} + }; + + class scheduler final : private detail::noncopyable { + public: + scheduler(); + ~scheduler() noexcept; + + template < typename F, typename... Args > + using schedule_invoke_result_t = invoke_hpp::invoke_result_t< + std::decay_t, + std::decay_t...>; + + template < typename F, typename... Args + , typename R = schedule_invoke_result_t > + promise schedule(F&& f, Args&&... args); + + template < typename F, typename... Args + , typename R = schedule_invoke_result_t > + promise schedule(scheduler_priority scheduler_priority, F&& f, Args&&... args); + + scheduler_wait_status process_all_tasks() noexcept; + + template < typename Rep, typename Period > + scheduler_wait_status process_tasks_for( + const std::chrono::duration& timeout_duration) noexcept; + + template < typename Clock, typename Duration > + scheduler_wait_status process_tasks_until( + const std::chrono::time_point& timeout_time) noexcept; + private: + class task; + using task_ptr = std::unique_ptr; + template < typename R, typename F, typename... Args > + class concrete_task; + private: + void push_task_(scheduler_priority scheduler_priority, task_ptr task); + task_ptr pop_task_() noexcept; + void shutdown_() noexcept; + void process_task_(std::unique_lock lock) noexcept; + private: + std::vector> tasks_; + std::atomic cancelled_{false}; + std::atomic active_task_count_{0}; + mutable std::mutex tasks_mutex_; + mutable std::condition_variable cond_var_; + }; + + class scheduler::task : private noncopyable { + public: + virtual ~task() noexcept = default; + virtual void run() noexcept = 0; + virtual void cancel() noexcept = 0; + }; + + template < typename R, typename F, typename... Args > + class scheduler::concrete_task : public task { + F f_; + std::tuple args_; + promise promise_; + public: + template < typename U > + concrete_task(U&& u, std::tuple&& args); + void run() noexcept final; + void cancel() noexcept final; + promise future() noexcept; + }; + + template < typename F, typename... Args > + class scheduler::concrete_task : public task { + F f_; + std::tuple args_; + promise promise_; + public: + template < typename U > + concrete_task(U&& u, std::tuple&& args); + void run() noexcept final; + void cancel() noexcept final; + promise future() noexcept; + }; +} + +namespace scheduler_hpp +{ + inline scheduler::scheduler() = default; + + inline scheduler::~scheduler() noexcept { + shutdown_(); + } + + template < typename F, typename... Args, typename R > + promise scheduler::schedule(F&& f, Args&&... args) { + return schedule( + scheduler_priority::normal, + std::forward(f), + std::forward(args)...); + } + + template < typename F, typename... Args, typename R > + promise scheduler::schedule(scheduler_priority priority, F&& f, Args&&... args) { + using task_t = concrete_task< + R, + std::decay_t, + std::decay_t...>; + std::unique_ptr task = std::make_unique( + std::forward(f), + std::make_tuple(std::forward(args)...)); + promise future = task->future(); + std::lock_guard guard(tasks_mutex_); + push_task_(priority, std::move(task)); + return future; + } + + inline scheduler_wait_status scheduler::process_all_tasks() noexcept { + while ( !cancelled_ && active_task_count_ ) { + std::unique_lock lock(tasks_mutex_); + cond_var_.wait(lock, [this](){ + return cancelled_ || !active_task_count_ || !tasks_.empty(); + }); + if ( !tasks_.empty() ) { + process_task_(std::move(lock)); + } + } + return cancelled_ + ? scheduler_wait_status::cancelled + : scheduler_wait_status::no_timeout; + } + + template < typename Rep, typename Period > + scheduler_wait_status scheduler::process_tasks_for( + const std::chrono::duration& timeout_duration) noexcept + { + return process_tasks_until( + std::chrono::steady_clock::now() + timeout_duration); + } + + template < typename Clock, typename Duration > + scheduler_wait_status scheduler::process_tasks_until( + const std::chrono::time_point& timeout_time) noexcept + { + while ( !cancelled_ && active_task_count_ ) { + if ( !(Clock::now() < timeout_time) ) { + return scheduler_wait_status::timeout; + } + std::unique_lock lock(tasks_mutex_); + cond_var_.wait_until(lock, timeout_time, [this](){ + return cancelled_ || !active_task_count_ || !tasks_.empty(); + }); + if ( !tasks_.empty() ) { + process_task_(std::move(lock)); + } + } + return cancelled_ + ? scheduler_wait_status::cancelled + : scheduler_wait_status::no_timeout; + } + + inline void scheduler::push_task_(scheduler_priority priority, task_ptr task) { + tasks_.emplace_back(priority, std::move(task)); + std::push_heap(tasks_.begin(), tasks_.end()); + ++active_task_count_; + cond_var_.notify_all(); + } + + inline scheduler::task_ptr scheduler::pop_task_() noexcept { + if ( !tasks_.empty() ) { + std::pop_heap(tasks_.begin(), tasks_.end()); + task_ptr task = std::move(tasks_.back().second); + tasks_.pop_back(); + return task; + } + return nullptr; + } + + inline void scheduler::shutdown_() noexcept { + std::lock_guard guard(tasks_mutex_); + while ( !tasks_.empty() ) { + task_ptr task = pop_task_(); + if ( task ) { + task->cancel(); + --active_task_count_; + } + } + cancelled_.store(true); + cond_var_.notify_all(); + } + + inline void scheduler::process_task_(std::unique_lock lock) noexcept { + assert(lock.owns_lock()); + task_ptr task = pop_task_(); + if ( task ) { + lock.unlock(); + task->run(); + --active_task_count_; + cond_var_.notify_all(); + } + } +} + +namespace scheduler_hpp +{ + // + // concrete_task + // + + template < typename R, typename F, typename... Args > + template < typename U > + scheduler::concrete_task::concrete_task(U&& u, std::tuple&& args) + : f_(std::forward(u)) + , args_(std::move(args)) {} + + template < typename R, typename F, typename... Args > + void scheduler::concrete_task::run() noexcept { + try { + R value = invoke_hpp::apply(std::move(f_), std::move(args_)); + promise_.resolve(std::move(value)); + } catch (...) { + promise_.reject(std::current_exception()); + } + } + + template < typename R, typename F, typename... Args > + void scheduler::concrete_task::cancel() noexcept { + promise_.reject(scheduler_cancelled_exception()); + } + + template < typename R, typename F, typename... Args > + promise scheduler::concrete_task::future() noexcept { + return promise_; + } + + // + // concrete_task + // + + template < typename F, typename... Args > + template < typename U > + scheduler::concrete_task::concrete_task(U&& u, std::tuple&& args) + : f_(std::forward(u)) + , args_(std::move(args)) {} + + template < typename F, typename... Args > + void scheduler::concrete_task::run() noexcept { + try { + invoke_hpp::apply(std::move(f_), std::move(args_)); + promise_.resolve(); + } catch (...) { + promise_.reject(std::current_exception()); + } + } + + template < typename F, typename... Args > + void scheduler::concrete_task::cancel() noexcept { + promise_.reject(scheduler_cancelled_exception()); + } + + template < typename F, typename... Args > + promise scheduler::concrete_task::future() noexcept { + return promise_; + } +} diff --git a/tests.cpp b/tests.cpp index dd06958..a3500de 100644 --- a/tests.cpp +++ b/tests.cpp @@ -12,9 +12,11 @@ #include "jobber.hpp" #include "promise.hpp" +#include "scheduler.hpp" namespace jb = jobber_hpp; namespace pr = promise_hpp; +namespace sd = scheduler_hpp; namespace { @@ -1227,7 +1229,7 @@ TEST_CASE("jobber") { for ( std::size_t i = 0; i < gp.size(); ++i ) { gp[i] = g.async([](float angle){ return std::sin(angle); - }, i); + }, static_cast(i)); } return std::accumulate(gp.begin(), gp.end(), 0.f, [](float r, pr::promise& f){ @@ -1246,3 +1248,102 @@ TEST_CASE("jobber") { REQUIRE(r0 == Approx(r1 * 50.f).margin(0.01f)); } } + +TEST_CASE("scheduler") { + { + sd::scheduler s; + auto pv0 = s.schedule([](){ + throw std::exception(); + }); + s.process_all_tasks(); + REQUIRE_THROWS_AS(pv0.get(), std::exception); + } + { + auto pv0 = pr::promise(); + { + sd::scheduler s; + pv0 = s.schedule([](){ + return 42; + }); + } + REQUIRE_THROWS_AS(pv0.get(), sd::scheduler_cancelled_exception); + } + { + sd::scheduler s; + int counter = 0; + s.schedule([&counter](){ ++counter; }); + REQUIRE(counter == 0); + s.process_all_tasks(); + REQUIRE(counter == 1); + s.schedule([&counter](){ ++counter; }); + s.schedule([&counter](){ ++counter; }); + REQUIRE(counter == 1); + s.process_all_tasks(); + REQUIRE(counter == 3); + } + { + sd::scheduler s; + int counter = 0; + for ( std::size_t i = 0; i < 50; ++i ) { + s.schedule([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + }); + } + s.process_tasks_for(std::chrono::milliseconds(-1)); + s.process_tasks_for(std::chrono::milliseconds(0)); + REQUIRE(counter == 0); + s.process_tasks_for(std::chrono::milliseconds(100)); + REQUIRE(counter > 2); + REQUIRE(counter < 50); + s.process_tasks_for(std::chrono::seconds(3)); + REQUIRE(counter == 50); + } + { + sd::scheduler s; + int counter = 0; + for ( std::size_t i = 0; i < 50; ++i ) { + s.schedule([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + }); + } + + const auto time_now = [](){ + return std::chrono::high_resolution_clock::now(); + }; + + const auto b = time_now(); + + s.process_tasks_until(time_now() - std::chrono::milliseconds(1)); + s.process_tasks_until(time_now()); + REQUIRE(counter == 0); + s.process_tasks_until(time_now() + std::chrono::milliseconds(100)); + REQUIRE(time_now() - b > std::chrono::milliseconds(50)); + REQUIRE(counter > 2); + REQUIRE(counter < 50); + s.process_tasks_until(time_now() + std::chrono::seconds(3)); + REQUIRE(counter == 50); + } + { + sd::scheduler s; + std::string accumulator; + s.schedule(sd::scheduler_priority::lowest, [](std::string& acc){ + acc.append("o"); + }, std::ref(accumulator)); + s.schedule(sd::scheduler_priority::below_normal, [](std::string& acc){ + acc.append("l"); + }, std::ref(accumulator)); + s.schedule(sd::scheduler_priority::highest, [](std::string& acc){ + acc.append("h"); + }, std::ref(accumulator)); + s.schedule(sd::scheduler_priority::above_normal, [](std::string& acc){ + acc.append("e"); + }, std::ref(accumulator)); + s.schedule(sd::scheduler_priority::normal, [](std::string& acc){ + acc.append("l"); + }, std::ref(accumulator)); + s.process_all_tasks(); + REQUIRE(accumulator == "hello"); + } +}