diff --git a/headers/enduro2d/utils/_all.hpp b/headers/enduro2d/utils/_all.hpp index d59fa257..7df6d4d6 100644 --- a/headers/enduro2d/utils/_all.hpp +++ b/headers/enduro2d/utils/_all.hpp @@ -18,6 +18,7 @@ #include "mesh.hpp" #include "module.hpp" #include "path.hpp" +#include "scheduler.hpp" #include "streams.hpp" #include "strfmts.hpp" #include "strings.hpp" diff --git a/headers/enduro2d/utils/_utils.hpp b/headers/enduro2d/utils/_utils.hpp index 0e1501c5..7506c251 100644 --- a/headers/enduro2d/utils/_utils.hpp +++ b/headers/enduro2d/utils/_utils.hpp @@ -17,6 +17,7 @@ namespace e2d class image; class jobber; class mesh; + class scheduler; class input_stream; class output_stream; class input_sequence; diff --git a/headers/enduro2d/utils/jobber.hpp b/headers/enduro2d/utils/jobber.hpp index df97a034..edd6f323 100644 --- a/headers/enduro2d/utils/jobber.hpp +++ b/headers/enduro2d/utils/jobber.hpp @@ -7,6 +7,7 @@ #pragma once #include "_utils.hpp" +#include "time.hpp" namespace e2d { @@ -42,6 +43,18 @@ namespace e2d void wait_all() const noexcept; void active_wait_all() noexcept; + + template < typename T, typename TimeTag > + void wait_for(const unit& time_for) const noexcept; + + template < typename T, typename TimeTag > + void wait_until(const unit& time_until) const noexcept; + + template < typename T, typename TimeTag > + void active_wait_for(const unit& time_for) noexcept; + + template < typename T, typename TimeTag > + void active_wait_until(const unit& time_until) noexcept; private: class task; using task_ptr = std::unique_ptr; @@ -123,6 +136,55 @@ namespace e2d return future; } + // + // wait + // + + template < typename T, typename TimeTag > + void jobber::wait_for(const unit& time_for) const noexcept { + if ( time_for.value > T(0) ) { + wait_until( + time::now() + + time_for.template cast_to()); + } + } + + template < typename T, typename TimeTag > + void jobber::wait_until(const unit& time_until) const noexcept { + while ( active_task_count_ ) { + const auto time_now = time::now(); + if ( time_now >= time_until.template cast_to() ) { + break; + } + std::this_thread::yield(); + } + } + + // + // active_wait + // + + template < typename T, typename TimeTag > + void jobber::active_wait_for(const unit& time_for) noexcept { + if ( time_for.value > T(0) ) { + active_wait_until( + time::now() + + time_for.template cast_to()); + } + } + + template < typename T, typename TimeTag > + void jobber::active_wait_until(const unit& time_until) noexcept { + while ( active_task_count_ ) { + const auto time_now = time::now(); + if ( time_now >= time_until.template cast_to() ) { + break; + } + std::unique_lock lock(tasks_mutex_); + process_task_(std::move(lock)); + } + } + // // concrete_task // diff --git a/headers/enduro2d/utils/scheduler.hpp b/headers/enduro2d/utils/scheduler.hpp new file mode 100644 index 00000000..d71d9f93 --- /dev/null +++ b/headers/enduro2d/utils/scheduler.hpp @@ -0,0 +1,196 @@ +/******************************************************************************* + * This file is part of the "Enduro2D" + * For conditions of distribution and use, see copyright notice in LICENSE.md + * Copyright (C) 2018 Matvey Cherevko + ******************************************************************************/ + +#pragma once + +#include "_utils.hpp" +#include "time.hpp" + +namespace e2d +{ + class scheduler final : private noncopyable { + public: + enum class priority { + lowest, + below_normal, + normal, + above_normal, + highest + }; + public: + scheduler(); + ~scheduler() noexcept; + + template < typename F, typename... Args > + using schedule_invoke_result_t = stdex::invoke_result_t< + std::decay_t, + std::decay_t...>; + + template < typename F, typename... Args + , typename R = schedule_invoke_result_t > + std::future schedule(F&& f, Args&&... args); + + template < typename F, typename... Args + , typename R = schedule_invoke_result_t > + std::future schedule(priority priority, F&& f, Args&&... args); + + void process_all_tasks() noexcept; + + template < typename T, typename TimeTag > + void process_tasks_for(const unit& time_for) noexcept; + + template < typename T, typename TimeTag > + void process_tasks_until(const unit& time_until) noexcept; + private: + class task; + using task_ptr = std::unique_ptr; + template < typename R, typename F, typename... Args > + class concrete_task; + private: + void push_task_(priority priority, task_ptr task); + task_ptr pop_task_() noexcept; + void process_task_(std::unique_lock lock) noexcept; + private: + std::mutex tasks_mutex_; + std::atomic active_task_count_{0}; + vector> tasks_; + }; + + class scheduler::task : private noncopyable { + public: + virtual ~task() noexcept = default; + virtual void run() noexcept = 0; + }; + + template < typename R, typename F, typename... Args > + class scheduler::concrete_task : public task { + F f_; + std::tuple args_; + std::promise promise_; + public: + template < typename U > + concrete_task(U&& u, std::tuple&& args); + void run() noexcept final; + std::future future() noexcept; + }; + + template < typename F, typename... Args > + class scheduler::concrete_task : public task { + F f_; + std::tuple args_; + std::promise promise_; + public: + template < typename U > + concrete_task(U&& u, std::tuple&& args); + void run() noexcept final; + std::future future() noexcept; + }; +} + +namespace e2d +{ + // + // schedule + // + + template < typename F, typename... Args, typename R > + std::future scheduler::schedule(F&& f, Args&&... args) { + return schedule( + priority::normal, + std::forward(f), + std::forward(args)...); + } + + template < typename F, typename... Args, typename R > + std::future scheduler::schedule(priority priority, F&& f, Args&&... args) { + using task_t = concrete_task< + R, + std::decay_t, + std::decay_t...>; + std::unique_ptr task = std::make_unique( + std::forward(f), + std::make_tuple(std::forward(args)...)); + std::future future = task->future(); + std::lock_guard guard(tasks_mutex_); + push_task_(priority, std::move(task)); + return future; + } + + // + // process_all_tasks + // + + template < typename T, typename TimeTag > + void scheduler::process_tasks_for(const unit& time_for) noexcept { + if ( time_for.value > T(0) ) { + process_tasks_until( + time::now() + + time_for.template cast_to()); + } + } + + template < typename T, typename TimeTag > + void scheduler::process_tasks_until(const unit& time_until) noexcept { + while ( active_task_count_ ) { + const auto time_now = time::now(); + if ( time_now >= time_until.template cast_to() ) { + break; + } + std::unique_lock lock(tasks_mutex_); + process_task_(std::move(lock)); + } + } + + // + // concrete_task + // + + template < typename R, typename F, typename... Args > + template < typename U > + scheduler::concrete_task::concrete_task(U&& u, std::tuple&& args) + : f_(std::forward(u)) + , args_(std::move(args)) {} + + template < typename R, typename F, typename... Args > + void scheduler::concrete_task::run() noexcept { + try { + R value = stdex::apply(std::move(f_), std::move(args_)); + promise_.set_value(std::move(value)); + } catch (...) { + promise_.set_exception(std::current_exception()); + } + } + + template < typename R, typename F, typename... Args > + std::future scheduler::concrete_task::future() noexcept { + return promise_.get_future(); + } + + // + // concrete_task + // + + template < typename F, typename... Args > + template < typename U > + scheduler::concrete_task::concrete_task(U&& u, std::tuple&& args) + : f_(std::forward(u)) + , args_(std::move(args)) {} + + template < typename F, typename... Args > + void scheduler::concrete_task::run() noexcept { + try { + stdex::apply(std::move(f_), std::move(args_)); + promise_.set_value(); + } catch (...) { + promise_.set_exception(std::current_exception()); + } + } + + template < typename F, typename... Args > + std::future scheduler::concrete_task::future() noexcept { + return promise_.get_future(); + } +} diff --git a/sources/enduro2d/utils/scheduler.cpp b/sources/enduro2d/utils/scheduler.cpp new file mode 100644 index 00000000..240423e8 --- /dev/null +++ b/sources/enduro2d/utils/scheduler.cpp @@ -0,0 +1,51 @@ +/******************************************************************************* + * This file is part of the "Enduro2D" + * For conditions of distribution and use, see copyright notice in LICENSE.md + * Copyright (C) 2018 Matvey Cherevko + ******************************************************************************/ + +#include + +namespace +{ + using namespace e2d; +} + +namespace e2d +{ + scheduler::scheduler() = default; + scheduler::~scheduler() noexcept = default; + + void scheduler::process_all_tasks() noexcept { + while ( active_task_count_ ) { + std::unique_lock lock(tasks_mutex_); + process_task_(std::move(lock)); + } + } + + void scheduler::push_task_(priority priority, task_ptr task) { + tasks_.emplace_back(priority, std::move(task)); + std::push_heap(tasks_.begin(), tasks_.end()); + ++active_task_count_; + } + + scheduler::task_ptr scheduler::pop_task_() noexcept { + if ( !tasks_.empty() ) { + std::pop_heap(tasks_.begin(), tasks_.end()); + task_ptr task = std::move(tasks_.back().second); + tasks_.pop_back(); + return task; + } + return nullptr; + } + + void scheduler::process_task_(std::unique_lock lock) noexcept { + E2D_ASSERT(lock.owns_lock()); + task_ptr task = pop_task_(); + if ( task ) { + lock.unlock(); + task->run(); + --active_task_count_; + } + } +} diff --git a/untests/sources/untests_utils/jobber.cpp b/untests/sources/untests_utils/jobber.cpp index 75778500..6bba998b 100644 --- a/untests/sources/untests_utils/jobber.cpp +++ b/untests/sources/untests_utils/jobber.cpp @@ -69,6 +69,35 @@ TEST_CASE("jobber") { j.resume(); j.wait_all(); } + { + jobber j(1); + i32 counter = 0; + j.pause(); + for ( std::size_t i = 0; i < 10; ++i ) { + j.async([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(15)); + }); + } + + const auto b = time::now_ms(); + + j.wait_for(make_milliseconds(15)); + REQUIRE(time::now_ms() - b > make_milliseconds(10)); + REQUIRE(counter == 0); + + j.wait_until(time::now_ms() + make_milliseconds(15)); + REQUIRE(time::now_ms() - b > make_milliseconds(20)); + REQUIRE(counter == 0); + + j.active_wait_for(make_milliseconds(60)); + REQUIRE(time::now_ms() - b > make_milliseconds(70)); + REQUIRE(counter > 2); + REQUIRE(counter < 10); + + j.active_wait_until(time::now_s() + make_seconds(1)); + REQUIRE(counter == 10); + } { jobber j(2); jobber g(2); diff --git a/untests/sources/untests_utils/scheduler.cpp b/untests/sources/untests_utils/scheduler.cpp new file mode 100644 index 00000000..162b294b --- /dev/null +++ b/untests/sources/untests_utils/scheduler.cpp @@ -0,0 +1,80 @@ +/******************************************************************************* + * This file is part of the "Enduro2D" + * For conditions of distribution and use, see copyright notice in LICENSE.md + * Copyright (C) 2018 Matvey Cherevko + ******************************************************************************/ + +#include "_utils.hpp" +using namespace e2d; + +TEST_CASE("scheduler") { + { + scheduler s; + i32 counter = 0; + s.schedule([&counter](){ ++counter; }); + REQUIRE(counter == 0); + s.process_all_tasks(); + REQUIRE(counter == 1); + s.schedule([&counter](){ ++counter; }); + REQUIRE(counter == 1); + s.process_all_tasks(); + REQUIRE(counter == 2); + } + { + scheduler s; + i32 counter = 0; + for ( std::size_t i = 0; i < 10; ++i ) { + s.schedule([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(15)); + }); + } + s.process_tasks_for(make_milliseconds(-1)); + s.process_tasks_for(make_milliseconds(0)); + REQUIRE(counter == 0); + s.process_tasks_for(make_milliseconds(60)); + REQUIRE(counter > 2); + REQUIRE(counter < 10); + s.process_tasks_for(make_seconds(1)); + REQUIRE(counter == 10); + } + { + scheduler s; + i32 counter = 0; + for ( std::size_t i = 0; i < 10; ++i ) { + s.schedule([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(15)); + }); + } + s.process_tasks_until(time::now_ms() - make_milliseconds(1)); + s.process_tasks_until(time::now_ms()); + REQUIRE(counter == 0); + s.process_tasks_until(time::now_ms() + make_milliseconds(60)); + REQUIRE(counter > 2); + REQUIRE(counter < 10); + s.process_tasks_until(time::now_s() + make_seconds(1)); + REQUIRE(counter == 10); + } + { + scheduler s; + str accumulator; + s.schedule(scheduler::priority::lowest, [](str& acc){ + acc.append("o"); + }, std::ref(accumulator)); + s.schedule(scheduler::priority::below_normal, [](str& acc){ + acc.append("l"); + }, std::ref(accumulator)); + s.schedule(scheduler::priority::highest, [](str& acc){ + acc.append("h"); + }, std::ref(accumulator)); + s.schedule(scheduler::priority::above_normal, [](str& acc){ + acc.append("e"); + }, std::ref(accumulator)); + s.schedule(scheduler::priority::normal, [](str& acc){ + acc.append("l"); + }, std::ref(accumulator)); + s.process_all_tasks(); + REQUIRE(accumulator == "hello"); + } +}