diff --git a/jobber.hpp b/jobber.hpp new file mode 100644 index 0000000..4cef528 --- /dev/null +++ b/jobber.hpp @@ -0,0 +1,399 @@ +/******************************************************************************* + * This file is part of the "promise.hpp" + * For conditions of distribution and use, see copyright notice in LICENSE.md + * Copyright (C) 2018 Matvey Cherevko + ******************************************************************************/ + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "promise.hpp" + +namespace jobber_hpp +{ + using namespace promise_hpp; + + enum class jobber_priority { + lowest, + below_normal, + normal, + above_normal, + highest + }; + + enum class jobber_wait_status { + no_timeout, + cancelled, + timeout + }; + + class jobber_cancelled_exception : public std::runtime_error { + public: + jobber_cancelled_exception() + : std::runtime_error("jobber has stopped working") {} + }; + + class jobber final : private detail::noncopyable { + public: + explicit jobber(std::size_t threads); + ~jobber() noexcept; + + template < typename F, typename... Args > + using async_invoke_result_t = invoke_hpp::invoke_result_t< + std::decay_t, + std::decay_t...>; + + template < typename F, typename... Args + , typename R = async_invoke_result_t > + promise async(F&& f, Args&&... args); + + template < typename F, typename... Args + , typename R = async_invoke_result_t > + promise async(jobber_priority priority, F&& f, Args&&... args); + + void pause() noexcept; + void resume() noexcept; + bool is_paused() const noexcept; + + jobber_wait_status wait_all() const noexcept; + jobber_wait_status active_wait_all() noexcept; + + template < typename Rep, typename Period > + jobber_wait_status wait_all_for( + const std::chrono::duration& timeout_duration) const; + + template < typename Clock, typename Duration > + jobber_wait_status wait_all_until( + const std::chrono::time_point& timeout_time) const; + + template < typename Rep, typename Period > + jobber_wait_status active_wait_all_for( + const std::chrono::duration& timeout_duration); + + template < typename Clock, typename Duration > + jobber_wait_status active_wait_all_until( + const std::chrono::time_point& timeout_time); + private: + class task; + using task_ptr = std::unique_ptr; + template < typename R, typename F, typename... Args > + class concrete_task; + private: + void push_task_(jobber_priority priority, task_ptr task); + task_ptr pop_task_() noexcept; + void shutdown_() noexcept; + void worker_main_() noexcept; + void process_task_(std::unique_lock lock) noexcept; + private: + std::vector threads_; + std::vector> tasks_; + std::atomic paused_{false}; + std::atomic cancelled_{false}; + std::atomic active_task_count_{0}; + mutable std::mutex tasks_mutex_; + mutable std::condition_variable cond_var_; + }; + + class jobber::task : private noncopyable { + public: + virtual ~task() noexcept = default; + virtual void run() noexcept = 0; + virtual void cancel() noexcept = 0; + }; + + template < typename R, typename F, typename... Args > + class jobber::concrete_task : public task { + F f_; + std::tuple args_; + promise promise_; + public: + template < typename U > + concrete_task(U&& u, std::tuple&& args); + void run() noexcept final; + void cancel() noexcept final; + promise future() noexcept; + }; + + template < typename F, typename... Args > + class jobber::concrete_task : public task { + F f_; + std::tuple args_; + promise promise_; + public: + template < typename U > + concrete_task(U&& u, std::tuple&& args); + void run() noexcept final; + void cancel() noexcept final; + promise future() noexcept; + }; +} + +namespace jobber_hpp +{ + inline jobber::jobber(std::size_t threads) { + try { + threads_.resize(threads); + for ( std::thread& thread : threads_ ) { + thread = std::thread(&jobber::worker_main_, this); + } + } catch (...) { + shutdown_(); + throw; + } + } + + inline jobber::~jobber() noexcept { + shutdown_(); + } + + template < typename F, typename... Args, typename R > + promise jobber::async(F&& f, Args&&... args) { + return async( + jobber_priority::normal, + std::forward(f), + std::forward(args)...); + } + + template < typename F, typename... Args, typename R > + promise jobber::async(jobber_priority priority, F&& f, Args&&... args) { + using task_t = concrete_task< + R, + std::decay_t, + std::decay_t...>; + std::unique_ptr task = std::make_unique( + std::forward(f), + std::make_tuple(std::forward(args)...)); + promise future = task->future(); + std::lock_guard guard(tasks_mutex_); + push_task_(priority, std::move(task)); + return future; + } + + inline void jobber::pause() noexcept { + std::lock_guard guard(tasks_mutex_); + paused_.store(true); + cond_var_.notify_all(); + } + + inline void jobber::resume() noexcept { + std::lock_guard guard(tasks_mutex_); + paused_.store(false); + cond_var_.notify_all(); + } + + inline bool jobber::is_paused() const noexcept { + return paused_; + } + + inline jobber_wait_status jobber::wait_all() const noexcept { + std::unique_lock lock(tasks_mutex_); + cond_var_.wait(lock, [this](){ + return cancelled_ || !active_task_count_; + }); + return cancelled_ + ? jobber_wait_status::cancelled + : jobber_wait_status::no_timeout; + } + + inline jobber_wait_status jobber::active_wait_all() noexcept { + while ( !cancelled_ && active_task_count_ ) { + std::unique_lock lock(tasks_mutex_); + cond_var_.wait(lock, [this](){ + return cancelled_ || !active_task_count_ || !tasks_.empty(); + }); + if ( !tasks_.empty() ) { + process_task_(std::move(lock)); + } + } + return cancelled_ + ? jobber_wait_status::cancelled + : jobber_wait_status::no_timeout; + } + + template < typename Rep, typename Period > + jobber_wait_status jobber::wait_all_for( + const std::chrono::duration& timeout_duration) const + { + return wait_all_until( + std::chrono::steady_clock::now() + timeout_duration); + } + + template < typename Clock, typename Duration > + jobber_wait_status jobber::wait_all_until( + const std::chrono::time_point& timeout_time) const + { + std::unique_lock lock(tasks_mutex_); + return cond_var_.wait_until(lock, timeout_time, [this](){ + return cancelled_ || !active_task_count_; + }) ? jobber_wait_status::no_timeout + : jobber_wait_status::timeout; + } + + template < typename Rep, typename Period > + jobber_wait_status jobber::active_wait_all_for( + const std::chrono::duration& timeout_duration) + { + return active_wait_all_until( + std::chrono::steady_clock::now() + timeout_duration); + } + + template < typename Clock, typename Duration > + jobber_wait_status jobber::active_wait_all_until( + const std::chrono::time_point& timeout_time) + { + while ( !cancelled_ && active_task_count_ ) { + if ( !(Clock::now() < timeout_time) ) { + return jobber_wait_status::timeout; + } + std::unique_lock lock(tasks_mutex_); + cond_var_.wait_until(lock, timeout_time, [this](){ + return cancelled_ || !active_task_count_ || !tasks_.empty(); + }); + if ( !tasks_.empty() ) { + process_task_(std::move(lock)); + } + } + return cancelled_ + ? jobber_wait_status::cancelled + : jobber_wait_status::no_timeout; + } + + inline void jobber::push_task_(jobber_priority priority, task_ptr task) { + tasks_.emplace_back(priority, std::move(task)); + std::push_heap(tasks_.begin(), tasks_.end()); + ++active_task_count_; + cond_var_.notify_all(); + } + + inline jobber::task_ptr jobber::pop_task_() noexcept { + if ( !tasks_.empty() ) { + std::pop_heap(tasks_.begin(), tasks_.end()); + task_ptr task = std::move(tasks_.back().second); + tasks_.pop_back(); + return task; + } + return nullptr; + } + + inline void jobber::shutdown_() noexcept { + { + std::lock_guard guard(tasks_mutex_); + while ( !tasks_.empty() ) { + task_ptr task = pop_task_(); + if ( task ) { + task->cancel(); + --active_task_count_; + } + } + cancelled_.store(true); + cond_var_.notify_all(); + } + for ( std::thread& thread : threads_ ) { + if ( thread.joinable() ) { + thread.join(); + } + } + } + + inline void jobber::worker_main_() noexcept { + while ( true ) { + std::unique_lock lock(tasks_mutex_); + cond_var_.wait(lock, [this](){ + return cancelled_ || (!paused_ && !tasks_.empty()); + }); + if ( cancelled_ ) { + break; + } + process_task_(std::move(lock)); + } + } + + inline void jobber::process_task_(std::unique_lock lock) noexcept { + assert(lock.owns_lock()); + task_ptr task = pop_task_(); + if ( task ) { + lock.unlock(); + task->run(); + --active_task_count_; + cond_var_.notify_all(); + } + } +} + +namespace jobber_hpp +{ + // + // concrete_task + // + + template < typename R, typename F, typename... Args > + template < typename U > + jobber::concrete_task::concrete_task(U&& u, std::tuple&& args) + : f_(std::forward(u)) + , args_(std::move(args)) {} + + template < typename R, typename F, typename... Args > + void jobber::concrete_task::run() noexcept { + try { + R value = invoke_hpp::apply(std::move(f_), std::move(args_)); + promise_.resolve(std::move(value)); + } catch (...) { + promise_.reject(std::current_exception()); + } + } + + template < typename R, typename F, typename... Args > + void jobber::concrete_task::cancel() noexcept { + promise_.reject(jobber_cancelled_exception()); + } + + template < typename R, typename F, typename... Args > + promise jobber::concrete_task::future() noexcept { + return promise_; + } + + // + // concrete_task + // + + template < typename F, typename... Args > + template < typename U > + jobber::concrete_task::concrete_task(U&& u, std::tuple&& args) + : f_(std::forward(u)) + , args_(std::move(args)) {} + + template < typename F, typename... Args > + void jobber::concrete_task::run() noexcept { + try { + invoke_hpp::apply(std::move(f_), std::move(args_)); + promise_.resolve(); + } catch (...) { + promise_.reject(std::current_exception()); + } + } + + template < typename F, typename... Args > + void jobber::concrete_task::cancel() noexcept { + promise_.reject(jobber_cancelled_exception()); + } + + template < typename F, typename... Args > + promise jobber::concrete_task::future() noexcept { + return promise_; + } +} diff --git a/promise.hpp b/promise.hpp index 00eb8c8..62678a8 100644 --- a/promise.hpp +++ b/promise.hpp @@ -545,7 +545,7 @@ namespace promise_hpp return state_->get(); } - void wait() const { + void wait() const noexcept { state_->wait(); } @@ -607,7 +607,7 @@ namespace promise_hpp return storage_.value(); } - void wait() const { + void wait() const noexcept { std::unique_lock lock(mutex_); cond_var_.wait(lock, [this](){ return status_ != status::pending; @@ -934,7 +934,7 @@ namespace promise_hpp state_->get(); } - void wait() const { + void wait() const noexcept { state_->wait(); } @@ -993,7 +993,7 @@ namespace promise_hpp assert(status_ == status::resolved); } - void wait() const { + void wait() const noexcept { std::unique_lock lock(mutex_); cond_var_.wait(lock, [this](){ return status_ != status::pending; diff --git a/scheduler.hpp b/scheduler.hpp new file mode 100644 index 0000000..303fa57 --- /dev/null +++ b/scheduler.hpp @@ -0,0 +1,307 @@ +/******************************************************************************* + * This file is part of the "promise.hpp" + * For conditions of distribution and use, see copyright notice in LICENSE.md + * Copyright (C) 2018 Matvey Cherevko + ******************************************************************************/ + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "promise.hpp" + +namespace scheduler_hpp +{ + using namespace promise_hpp; + + enum class scheduler_priority { + lowest, + below_normal, + normal, + above_normal, + highest + }; + + enum class scheduler_wait_status { + no_timeout, + cancelled, + timeout + }; + + class scheduler_cancelled_exception : public std::runtime_error { + public: + scheduler_cancelled_exception() + : std::runtime_error("scheduler has stopped working") {} + }; + + class scheduler final : private detail::noncopyable { + public: + scheduler(); + ~scheduler() noexcept; + + template < typename F, typename... Args > + using schedule_invoke_result_t = invoke_hpp::invoke_result_t< + std::decay_t, + std::decay_t...>; + + template < typename F, typename... Args + , typename R = schedule_invoke_result_t > + promise schedule(F&& f, Args&&... args); + + template < typename F, typename... Args + , typename R = schedule_invoke_result_t > + promise schedule(scheduler_priority scheduler_priority, F&& f, Args&&... args); + + scheduler_wait_status process_all_tasks() noexcept; + + template < typename Rep, typename Period > + scheduler_wait_status process_tasks_for( + const std::chrono::duration& timeout_duration) noexcept; + + template < typename Clock, typename Duration > + scheduler_wait_status process_tasks_until( + const std::chrono::time_point& timeout_time) noexcept; + private: + class task; + using task_ptr = std::unique_ptr; + template < typename R, typename F, typename... Args > + class concrete_task; + private: + void push_task_(scheduler_priority scheduler_priority, task_ptr task); + task_ptr pop_task_() noexcept; + void shutdown_() noexcept; + void process_task_(std::unique_lock lock) noexcept; + private: + std::vector> tasks_; + std::atomic cancelled_{false}; + std::atomic active_task_count_{0}; + mutable std::mutex tasks_mutex_; + mutable std::condition_variable cond_var_; + }; + + class scheduler::task : private noncopyable { + public: + virtual ~task() noexcept = default; + virtual void run() noexcept = 0; + virtual void cancel() noexcept = 0; + }; + + template < typename R, typename F, typename... Args > + class scheduler::concrete_task : public task { + F f_; + std::tuple args_; + promise promise_; + public: + template < typename U > + concrete_task(U&& u, std::tuple&& args); + void run() noexcept final; + void cancel() noexcept final; + promise future() noexcept; + }; + + template < typename F, typename... Args > + class scheduler::concrete_task : public task { + F f_; + std::tuple args_; + promise promise_; + public: + template < typename U > + concrete_task(U&& u, std::tuple&& args); + void run() noexcept final; + void cancel() noexcept final; + promise future() noexcept; + }; +} + +namespace scheduler_hpp +{ + inline scheduler::scheduler() = default; + + inline scheduler::~scheduler() noexcept { + shutdown_(); + } + + template < typename F, typename... Args, typename R > + promise scheduler::schedule(F&& f, Args&&... args) { + return schedule( + scheduler_priority::normal, + std::forward(f), + std::forward(args)...); + } + + template < typename F, typename... Args, typename R > + promise scheduler::schedule(scheduler_priority priority, F&& f, Args&&... args) { + using task_t = concrete_task< + R, + std::decay_t, + std::decay_t...>; + std::unique_ptr task = std::make_unique( + std::forward(f), + std::make_tuple(std::forward(args)...)); + promise future = task->future(); + std::lock_guard guard(tasks_mutex_); + push_task_(priority, std::move(task)); + return future; + } + + inline scheduler_wait_status scheduler::process_all_tasks() noexcept { + while ( !cancelled_ && active_task_count_ ) { + std::unique_lock lock(tasks_mutex_); + cond_var_.wait(lock, [this](){ + return cancelled_ || !active_task_count_ || !tasks_.empty(); + }); + if ( !tasks_.empty() ) { + process_task_(std::move(lock)); + } + } + return cancelled_ + ? scheduler_wait_status::cancelled + : scheduler_wait_status::no_timeout; + } + + template < typename Rep, typename Period > + scheduler_wait_status scheduler::process_tasks_for( + const std::chrono::duration& timeout_duration) noexcept + { + return process_tasks_until( + std::chrono::steady_clock::now() + timeout_duration); + } + + template < typename Clock, typename Duration > + scheduler_wait_status scheduler::process_tasks_until( + const std::chrono::time_point& timeout_time) noexcept + { + while ( !cancelled_ && active_task_count_ ) { + if ( !(Clock::now() < timeout_time) ) { + return scheduler_wait_status::timeout; + } + std::unique_lock lock(tasks_mutex_); + cond_var_.wait_until(lock, timeout_time, [this](){ + return cancelled_ || !active_task_count_ || !tasks_.empty(); + }); + if ( !tasks_.empty() ) { + process_task_(std::move(lock)); + } + } + return cancelled_ + ? scheduler_wait_status::cancelled + : scheduler_wait_status::no_timeout; + } + + inline void scheduler::push_task_(scheduler_priority priority, task_ptr task) { + tasks_.emplace_back(priority, std::move(task)); + std::push_heap(tasks_.begin(), tasks_.end()); + ++active_task_count_; + cond_var_.notify_all(); + } + + inline scheduler::task_ptr scheduler::pop_task_() noexcept { + if ( !tasks_.empty() ) { + std::pop_heap(tasks_.begin(), tasks_.end()); + task_ptr task = std::move(tasks_.back().second); + tasks_.pop_back(); + return task; + } + return nullptr; + } + + inline void scheduler::shutdown_() noexcept { + std::lock_guard guard(tasks_mutex_); + while ( !tasks_.empty() ) { + task_ptr task = pop_task_(); + if ( task ) { + task->cancel(); + --active_task_count_; + } + } + cancelled_.store(true); + cond_var_.notify_all(); + } + + inline void scheduler::process_task_(std::unique_lock lock) noexcept { + assert(lock.owns_lock()); + task_ptr task = pop_task_(); + if ( task ) { + lock.unlock(); + task->run(); + --active_task_count_; + cond_var_.notify_all(); + } + } +} + +namespace scheduler_hpp +{ + // + // concrete_task + // + + template < typename R, typename F, typename... Args > + template < typename U > + scheduler::concrete_task::concrete_task(U&& u, std::tuple&& args) + : f_(std::forward(u)) + , args_(std::move(args)) {} + + template < typename R, typename F, typename... Args > + void scheduler::concrete_task::run() noexcept { + try { + R value = invoke_hpp::apply(std::move(f_), std::move(args_)); + promise_.resolve(std::move(value)); + } catch (...) { + promise_.reject(std::current_exception()); + } + } + + template < typename R, typename F, typename... Args > + void scheduler::concrete_task::cancel() noexcept { + promise_.reject(scheduler_cancelled_exception()); + } + + template < typename R, typename F, typename... Args > + promise scheduler::concrete_task::future() noexcept { + return promise_; + } + + // + // concrete_task + // + + template < typename F, typename... Args > + template < typename U > + scheduler::concrete_task::concrete_task(U&& u, std::tuple&& args) + : f_(std::forward(u)) + , args_(std::move(args)) {} + + template < typename F, typename... Args > + void scheduler::concrete_task::run() noexcept { + try { + invoke_hpp::apply(std::move(f_), std::move(args_)); + promise_.resolve(); + } catch (...) { + promise_.reject(std::current_exception()); + } + } + + template < typename F, typename... Args > + void scheduler::concrete_task::cancel() noexcept { + promise_.reject(scheduler_cancelled_exception()); + } + + template < typename F, typename... Args > + promise scheduler::concrete_task::future() noexcept { + return promise_; + } +} diff --git a/tests.cpp b/tests.cpp index 7e5d5be..a3500de 100644 --- a/tests.cpp +++ b/tests.cpp @@ -7,12 +7,17 @@ #define CATCH_CONFIG_FAST_COMPILE #include "catch.hpp" -#include "promise.hpp" -namespace pr = promise_hpp; - -#include +#include #include +#include "jobber.hpp" +#include "promise.hpp" +#include "scheduler.hpp" + +namespace jb = jobber_hpp; +namespace pr = promise_hpp; +namespace sd = scheduler_hpp; + namespace { struct obj_t { @@ -977,3 +982,368 @@ TEST_CASE("get_and_wait") { } } } + +TEST_CASE("jobber") { + { + jb::jobber j(1); + auto pv0 = j.async([](){ + throw std::exception(); + }); + REQUIRE_THROWS_AS(pv0.get(), std::exception); + } + { + auto pv0 = pr::promise(); + { + jb::jobber j{0}; + pv0 = j.async([](){ + return 42; + }); + } + REQUIRE_THROWS_AS(pv0.get(), jb::jobber_cancelled_exception); + } + { + int v5 = 5; + + jb::jobber j(1); + auto pv0 = j.async([](int v){ + REQUIRE(v == 5); + throw std::exception(); + }, v5); + REQUIRE_THROWS_AS(pv0.get(), std::exception); + + auto pv1 = j.async([](int& v){ + REQUIRE(v == 5); + return v != 5 + ? 0 + : throw std::exception(); + }, std::ref(v5)); + REQUIRE_THROWS_AS(pv1.get(), std::exception); + + auto pv3 = j.async([](int& v){ + v = 4; + return v; + }, std::ref(v5)); + REQUIRE(pv3.get() == v5); + REQUIRE(v5 == 4); + } + { + const float pi = 3.14159265358979323846264338327950288f; + jb::jobber j(1); + auto p0 = j.async([](float angle){ + return std::sin(angle); + }, pi); + auto p1 = j.async([](float angle){ + return std::cos(angle); + }, pi * 2); + REQUIRE(p0.get() == Approx(0.f).margin(0.01f)); + REQUIRE(p1.get() == Approx(1.f).margin(0.01f)); + } + { + jb::jobber j(1); + j.pause(); + jb::jobber_priority max_priority = jb::jobber_priority::highest; + j.async([](){ + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + }); + for ( std::size_t i = 0; i < 10; ++i ) { + jb::jobber_priority p = static_cast( + i % static_cast(jb::jobber_priority::highest)); + j.async(p, [&max_priority](jb::jobber_priority priority) { + REQUIRE(priority <= max_priority); + max_priority = priority; + }, p); + } + j.resume(); + j.wait_all(); + } + { + jb::jobber j(1); + std::atomic counter = ATOMIC_VAR_INIT(0); + j.pause(); + for ( std::size_t i = 0; i < 10; ++i ) { + j.async([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + }); + } + + j.resume(); + REQUIRE(counter < 10); + j.wait_all(); + REQUIRE(counter == 10); + } + { + jb::jobber j(1); + std::atomic counter = ATOMIC_VAR_INIT(0); + j.pause(); + for ( std::size_t i = 0; i < 10; ++i ) { + j.async([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + }); + } + REQUIRE(counter < 10); + j.active_wait_all(); + REQUIRE(counter == 10); + } + { + jb::jobber j(1); + + const auto time_now = [](){ + return std::chrono::high_resolution_clock::now(); + }; + + REQUIRE(jb::jobber_wait_status::no_timeout == j.wait_all_for(std::chrono::milliseconds(-1))); + REQUIRE(jb::jobber_wait_status::no_timeout == j.wait_all_until(time_now() + std::chrono::milliseconds(-1))); + REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_for(std::chrono::milliseconds(-1))); + REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_until(time_now() + std::chrono::milliseconds(-1))); + + j.pause(); + j.async([]{}); + + REQUIRE(jb::jobber_wait_status::timeout == j.wait_all_for(std::chrono::milliseconds(-1))); + REQUIRE(jb::jobber_wait_status::timeout == j.wait_all_until(time_now() + std::chrono::milliseconds(-1))); + REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_for(std::chrono::milliseconds(-1))); + REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_until(time_now() + std::chrono::milliseconds(-1))); + } + { + jb::jobber j(1); + std::atomic counter = ATOMIC_VAR_INIT(0); + j.pause(); + for ( std::size_t i = 0; i < 10; ++i ) { + j.async([&counter](){ + ++counter; + }); + } + + const auto time_now = [](){ + return std::chrono::high_resolution_clock::now(); + }; + + j.wait_all_for(std::chrono::milliseconds(10)); + j.wait_all_until(time_now() + std::chrono::milliseconds(10)); + REQUIRE(counter == 0); + + j.active_wait_all_for(std::chrono::milliseconds(10)); + j.active_wait_all_until(time_now() + std::chrono::milliseconds(10)); + REQUIRE(counter > 0); + } + { + jb::jobber j(1); + std::atomic counter = ATOMIC_VAR_INIT(0); + j.pause(); + for ( std::size_t i = 0; i < 50; ++i ) { + j.async([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + }); + } + + const auto time_now = [](){ + return std::chrono::high_resolution_clock::now(); + }; + + const auto b = time_now(); + + j.resume(); + j.wait_all_for(std::chrono::milliseconds(100)); + REQUIRE(time_now() - b > std::chrono::milliseconds(50)); + REQUIRE(counter > 2); + REQUIRE(counter < 50); + + j.wait_all_until(time_now() + std::chrono::seconds(3)); + REQUIRE(counter == 50); + } + { + jb::jobber j(1); + std::atomic counter = ATOMIC_VAR_INIT(0); + j.pause(); + for ( std::size_t i = 0; i < 50; ++i ) { + j.async([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + }); + } + + const auto time_now = [](){ + return std::chrono::high_resolution_clock::now(); + }; + + const auto b = time_now(); + + j.wait_all_for(std::chrono::milliseconds(15)); + REQUIRE(time_now() - b > std::chrono::milliseconds(10)); + REQUIRE(counter == 0); + + j.wait_all_until(time_now() + std::chrono::milliseconds(15)); + REQUIRE(time_now() - b > std::chrono::milliseconds(20)); + REQUIRE(counter == 0); + + j.active_wait_all_for(std::chrono::milliseconds(100)); + REQUIRE(time_now() - b > std::chrono::milliseconds(70)); + REQUIRE(counter > 2); + REQUIRE(counter < 50); + + j.active_wait_all_until(time_now() + std::chrono::seconds(3)); + REQUIRE(counter == 50); + } + { + jb::jobber j(1); + std::atomic counter = ATOMIC_VAR_INIT(0); + j.pause(); + for ( std::size_t i = 0; i < 30; ++i ) { + j.async([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + }); + } + j.resume(); + REQUIRE(jb::jobber_wait_status::timeout == j.wait_all_for(std::chrono::milliseconds(50))); + REQUIRE(counter > 0); + REQUIRE(jb::jobber_wait_status::no_timeout == j.wait_all_for(std::chrono::seconds(5))); + REQUIRE(counter == 30); + } + { + jb::jobber j(1); + std::atomic counter = ATOMIC_VAR_INIT(0); + j.pause(); + for ( std::size_t i = 0; i < 30; ++i ) { + j.async([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + }); + } + REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_for(std::chrono::milliseconds(50))); + REQUIRE(counter > 0); + REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_for(std::chrono::seconds(5))); + REQUIRE(counter == 30); + } + { + jb::jobber j(2); + jb::jobber g(2); + + std::vector> jp(50); + for ( auto& jpi : jp ) { + jpi = j.async([&g](){ + std::vector> gp(50); + for ( std::size_t i = 0; i < gp.size(); ++i ) { + gp[i] = g.async([](float angle){ + return std::sin(angle); + }, static_cast(i)); + } + return std::accumulate(gp.begin(), gp.end(), 0.f, + [](float r, pr::promise& f){ + return r + f.get(); + }); + }); + } + float r0 = std::accumulate(jp.begin(), jp.end(), 0.f, + [](float r, pr::promise& f){ + return r + f.get(); + }); + float r1 = 0.f; + for ( std::size_t i = 0; i < 50; ++i ) { + r1 += std::sin(static_cast(i)); + } + REQUIRE(r0 == Approx(r1 * 50.f).margin(0.01f)); + } +} + +TEST_CASE("scheduler") { + { + sd::scheduler s; + auto pv0 = s.schedule([](){ + throw std::exception(); + }); + s.process_all_tasks(); + REQUIRE_THROWS_AS(pv0.get(), std::exception); + } + { + auto pv0 = pr::promise(); + { + sd::scheduler s; + pv0 = s.schedule([](){ + return 42; + }); + } + REQUIRE_THROWS_AS(pv0.get(), sd::scheduler_cancelled_exception); + } + { + sd::scheduler s; + int counter = 0; + s.schedule([&counter](){ ++counter; }); + REQUIRE(counter == 0); + s.process_all_tasks(); + REQUIRE(counter == 1); + s.schedule([&counter](){ ++counter; }); + s.schedule([&counter](){ ++counter; }); + REQUIRE(counter == 1); + s.process_all_tasks(); + REQUIRE(counter == 3); + } + { + sd::scheduler s; + int counter = 0; + for ( std::size_t i = 0; i < 50; ++i ) { + s.schedule([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + }); + } + s.process_tasks_for(std::chrono::milliseconds(-1)); + s.process_tasks_for(std::chrono::milliseconds(0)); + REQUIRE(counter == 0); + s.process_tasks_for(std::chrono::milliseconds(100)); + REQUIRE(counter > 2); + REQUIRE(counter < 50); + s.process_tasks_for(std::chrono::seconds(3)); + REQUIRE(counter == 50); + } + { + sd::scheduler s; + int counter = 0; + for ( std::size_t i = 0; i < 50; ++i ) { + s.schedule([&counter](){ + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + }); + } + + const auto time_now = [](){ + return std::chrono::high_resolution_clock::now(); + }; + + const auto b = time_now(); + + s.process_tasks_until(time_now() - std::chrono::milliseconds(1)); + s.process_tasks_until(time_now()); + REQUIRE(counter == 0); + s.process_tasks_until(time_now() + std::chrono::milliseconds(100)); + REQUIRE(time_now() - b > std::chrono::milliseconds(50)); + REQUIRE(counter > 2); + REQUIRE(counter < 50); + s.process_tasks_until(time_now() + std::chrono::seconds(3)); + REQUIRE(counter == 50); + } + { + sd::scheduler s; + std::string accumulator; + s.schedule(sd::scheduler_priority::lowest, [](std::string& acc){ + acc.append("o"); + }, std::ref(accumulator)); + s.schedule(sd::scheduler_priority::below_normal, [](std::string& acc){ + acc.append("l"); + }, std::ref(accumulator)); + s.schedule(sd::scheduler_priority::highest, [](std::string& acc){ + acc.append("h"); + }, std::ref(accumulator)); + s.schedule(sd::scheduler_priority::above_normal, [](std::string& acc){ + acc.append("e"); + }, std::ref(accumulator)); + s.schedule(sd::scheduler_priority::normal, [](std::string& acc){ + acc.append("l"); + }, std::ref(accumulator)); + s.process_all_tasks(); + REQUIRE(accumulator == "hello"); + } +}