mirror of
https://github.com/BlackMATov/promise.hpp.git
synced 2025-12-13 11:56:26 +07:00
399
jobber.hpp
Normal file
399
jobber.hpp
Normal file
@@ -0,0 +1,399 @@
|
||||
/*******************************************************************************
|
||||
* This file is part of the "promise.hpp"
|
||||
* For conditions of distribution and use, see copyright notice in LICENSE.md
|
||||
* Copyright (C) 2018 Matvey Cherevko
|
||||
******************************************************************************/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <cassert>
|
||||
|
||||
#include <tuple>
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include <utility>
|
||||
#include <exception>
|
||||
#include <stdexcept>
|
||||
#include <type_traits>
|
||||
#include <condition_variable>
|
||||
|
||||
#include "promise.hpp"
|
||||
|
||||
namespace jobber_hpp
|
||||
{
|
||||
using namespace promise_hpp;
|
||||
|
||||
enum class jobber_priority {
|
||||
lowest,
|
||||
below_normal,
|
||||
normal,
|
||||
above_normal,
|
||||
highest
|
||||
};
|
||||
|
||||
enum class jobber_wait_status {
|
||||
no_timeout,
|
||||
cancelled,
|
||||
timeout
|
||||
};
|
||||
|
||||
class jobber_cancelled_exception : public std::runtime_error {
|
||||
public:
|
||||
jobber_cancelled_exception()
|
||||
: std::runtime_error("jobber has stopped working") {}
|
||||
};
|
||||
|
||||
class jobber final : private detail::noncopyable {
|
||||
public:
|
||||
explicit jobber(std::size_t threads);
|
||||
~jobber() noexcept;
|
||||
|
||||
template < typename F, typename... Args >
|
||||
using async_invoke_result_t = invoke_hpp::invoke_result_t<
|
||||
std::decay_t<F>,
|
||||
std::decay_t<Args>...>;
|
||||
|
||||
template < typename F, typename... Args
|
||||
, typename R = async_invoke_result_t<F, Args...> >
|
||||
promise<R> async(F&& f, Args&&... args);
|
||||
|
||||
template < typename F, typename... Args
|
||||
, typename R = async_invoke_result_t<F, Args...> >
|
||||
promise<R> async(jobber_priority priority, F&& f, Args&&... args);
|
||||
|
||||
void pause() noexcept;
|
||||
void resume() noexcept;
|
||||
bool is_paused() const noexcept;
|
||||
|
||||
jobber_wait_status wait_all() const noexcept;
|
||||
jobber_wait_status active_wait_all() noexcept;
|
||||
|
||||
template < typename Rep, typename Period >
|
||||
jobber_wait_status wait_all_for(
|
||||
const std::chrono::duration<Rep, Period>& timeout_duration) const;
|
||||
|
||||
template < typename Clock, typename Duration >
|
||||
jobber_wait_status wait_all_until(
|
||||
const std::chrono::time_point<Clock, Duration>& timeout_time) const;
|
||||
|
||||
template < typename Rep, typename Period >
|
||||
jobber_wait_status active_wait_all_for(
|
||||
const std::chrono::duration<Rep, Period>& timeout_duration);
|
||||
|
||||
template < typename Clock, typename Duration >
|
||||
jobber_wait_status active_wait_all_until(
|
||||
const std::chrono::time_point<Clock, Duration>& timeout_time);
|
||||
private:
|
||||
class task;
|
||||
using task_ptr = std::unique_ptr<task>;
|
||||
template < typename R, typename F, typename... Args >
|
||||
class concrete_task;
|
||||
private:
|
||||
void push_task_(jobber_priority priority, task_ptr task);
|
||||
task_ptr pop_task_() noexcept;
|
||||
void shutdown_() noexcept;
|
||||
void worker_main_() noexcept;
|
||||
void process_task_(std::unique_lock<std::mutex> lock) noexcept;
|
||||
private:
|
||||
std::vector<std::thread> threads_;
|
||||
std::vector<std::pair<jobber_priority, task_ptr>> tasks_;
|
||||
std::atomic<bool> paused_{false};
|
||||
std::atomic<bool> cancelled_{false};
|
||||
std::atomic<std::size_t> active_task_count_{0};
|
||||
mutable std::mutex tasks_mutex_;
|
||||
mutable std::condition_variable cond_var_;
|
||||
};
|
||||
|
||||
class jobber::task : private noncopyable {
|
||||
public:
|
||||
virtual ~task() noexcept = default;
|
||||
virtual void run() noexcept = 0;
|
||||
virtual void cancel() noexcept = 0;
|
||||
};
|
||||
|
||||
template < typename R, typename F, typename... Args >
|
||||
class jobber::concrete_task : public task {
|
||||
F f_;
|
||||
std::tuple<Args...> args_;
|
||||
promise<R> promise_;
|
||||
public:
|
||||
template < typename U >
|
||||
concrete_task(U&& u, std::tuple<Args...>&& args);
|
||||
void run() noexcept final;
|
||||
void cancel() noexcept final;
|
||||
promise<R> future() noexcept;
|
||||
};
|
||||
|
||||
template < typename F, typename... Args >
|
||||
class jobber::concrete_task<void, F, Args...> : public task {
|
||||
F f_;
|
||||
std::tuple<Args...> args_;
|
||||
promise<void> promise_;
|
||||
public:
|
||||
template < typename U >
|
||||
concrete_task(U&& u, std::tuple<Args...>&& args);
|
||||
void run() noexcept final;
|
||||
void cancel() noexcept final;
|
||||
promise<void> future() noexcept;
|
||||
};
|
||||
}
|
||||
|
||||
namespace jobber_hpp
|
||||
{
|
||||
inline jobber::jobber(std::size_t threads) {
|
||||
try {
|
||||
threads_.resize(threads);
|
||||
for ( std::thread& thread : threads_ ) {
|
||||
thread = std::thread(&jobber::worker_main_, this);
|
||||
}
|
||||
} catch (...) {
|
||||
shutdown_();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
inline jobber::~jobber() noexcept {
|
||||
shutdown_();
|
||||
}
|
||||
|
||||
template < typename F, typename... Args, typename R >
|
||||
promise<R> jobber::async(F&& f, Args&&... args) {
|
||||
return async(
|
||||
jobber_priority::normal,
|
||||
std::forward<F>(f),
|
||||
std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
template < typename F, typename... Args, typename R >
|
||||
promise<R> jobber::async(jobber_priority priority, F&& f, Args&&... args) {
|
||||
using task_t = concrete_task<
|
||||
R,
|
||||
std::decay_t<F>,
|
||||
std::decay_t<Args>...>;
|
||||
std::unique_ptr<task_t> task = std::make_unique<task_t>(
|
||||
std::forward<F>(f),
|
||||
std::make_tuple(std::forward<Args>(args)...));
|
||||
promise<R> future = task->future();
|
||||
std::lock_guard<std::mutex> guard(tasks_mutex_);
|
||||
push_task_(priority, std::move(task));
|
||||
return future;
|
||||
}
|
||||
|
||||
inline void jobber::pause() noexcept {
|
||||
std::lock_guard<std::mutex> guard(tasks_mutex_);
|
||||
paused_.store(true);
|
||||
cond_var_.notify_all();
|
||||
}
|
||||
|
||||
inline void jobber::resume() noexcept {
|
||||
std::lock_guard<std::mutex> guard(tasks_mutex_);
|
||||
paused_.store(false);
|
||||
cond_var_.notify_all();
|
||||
}
|
||||
|
||||
inline bool jobber::is_paused() const noexcept {
|
||||
return paused_;
|
||||
}
|
||||
|
||||
inline jobber_wait_status jobber::wait_all() const noexcept {
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex_);
|
||||
cond_var_.wait(lock, [this](){
|
||||
return cancelled_ || !active_task_count_;
|
||||
});
|
||||
return cancelled_
|
||||
? jobber_wait_status::cancelled
|
||||
: jobber_wait_status::no_timeout;
|
||||
}
|
||||
|
||||
inline jobber_wait_status jobber::active_wait_all() noexcept {
|
||||
while ( !cancelled_ && active_task_count_ ) {
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex_);
|
||||
cond_var_.wait(lock, [this](){
|
||||
return cancelled_ || !active_task_count_ || !tasks_.empty();
|
||||
});
|
||||
if ( !tasks_.empty() ) {
|
||||
process_task_(std::move(lock));
|
||||
}
|
||||
}
|
||||
return cancelled_
|
||||
? jobber_wait_status::cancelled
|
||||
: jobber_wait_status::no_timeout;
|
||||
}
|
||||
|
||||
template < typename Rep, typename Period >
|
||||
jobber_wait_status jobber::wait_all_for(
|
||||
const std::chrono::duration<Rep, Period>& timeout_duration) const
|
||||
{
|
||||
return wait_all_until(
|
||||
std::chrono::steady_clock::now() + timeout_duration);
|
||||
}
|
||||
|
||||
template < typename Clock, typename Duration >
|
||||
jobber_wait_status jobber::wait_all_until(
|
||||
const std::chrono::time_point<Clock, Duration>& timeout_time) const
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex_);
|
||||
return cond_var_.wait_until(lock, timeout_time, [this](){
|
||||
return cancelled_ || !active_task_count_;
|
||||
}) ? jobber_wait_status::no_timeout
|
||||
: jobber_wait_status::timeout;
|
||||
}
|
||||
|
||||
template < typename Rep, typename Period >
|
||||
jobber_wait_status jobber::active_wait_all_for(
|
||||
const std::chrono::duration<Rep, Period>& timeout_duration)
|
||||
{
|
||||
return active_wait_all_until(
|
||||
std::chrono::steady_clock::now() + timeout_duration);
|
||||
}
|
||||
|
||||
template < typename Clock, typename Duration >
|
||||
jobber_wait_status jobber::active_wait_all_until(
|
||||
const std::chrono::time_point<Clock, Duration>& timeout_time)
|
||||
{
|
||||
while ( !cancelled_ && active_task_count_ ) {
|
||||
if ( !(Clock::now() < timeout_time) ) {
|
||||
return jobber_wait_status::timeout;
|
||||
}
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex_);
|
||||
cond_var_.wait_until(lock, timeout_time, [this](){
|
||||
return cancelled_ || !active_task_count_ || !tasks_.empty();
|
||||
});
|
||||
if ( !tasks_.empty() ) {
|
||||
process_task_(std::move(lock));
|
||||
}
|
||||
}
|
||||
return cancelled_
|
||||
? jobber_wait_status::cancelled
|
||||
: jobber_wait_status::no_timeout;
|
||||
}
|
||||
|
||||
inline void jobber::push_task_(jobber_priority priority, task_ptr task) {
|
||||
tasks_.emplace_back(priority, std::move(task));
|
||||
std::push_heap(tasks_.begin(), tasks_.end());
|
||||
++active_task_count_;
|
||||
cond_var_.notify_all();
|
||||
}
|
||||
|
||||
inline jobber::task_ptr jobber::pop_task_() noexcept {
|
||||
if ( !tasks_.empty() ) {
|
||||
std::pop_heap(tasks_.begin(), tasks_.end());
|
||||
task_ptr task = std::move(tasks_.back().second);
|
||||
tasks_.pop_back();
|
||||
return task;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
inline void jobber::shutdown_() noexcept {
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(tasks_mutex_);
|
||||
while ( !tasks_.empty() ) {
|
||||
task_ptr task = pop_task_();
|
||||
if ( task ) {
|
||||
task->cancel();
|
||||
--active_task_count_;
|
||||
}
|
||||
}
|
||||
cancelled_.store(true);
|
||||
cond_var_.notify_all();
|
||||
}
|
||||
for ( std::thread& thread : threads_ ) {
|
||||
if ( thread.joinable() ) {
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inline void jobber::worker_main_() noexcept {
|
||||
while ( true ) {
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex_);
|
||||
cond_var_.wait(lock, [this](){
|
||||
return cancelled_ || (!paused_ && !tasks_.empty());
|
||||
});
|
||||
if ( cancelled_ ) {
|
||||
break;
|
||||
}
|
||||
process_task_(std::move(lock));
|
||||
}
|
||||
}
|
||||
|
||||
inline void jobber::process_task_(std::unique_lock<std::mutex> lock) noexcept {
|
||||
assert(lock.owns_lock());
|
||||
task_ptr task = pop_task_();
|
||||
if ( task ) {
|
||||
lock.unlock();
|
||||
task->run();
|
||||
--active_task_count_;
|
||||
cond_var_.notify_all();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace jobber_hpp
|
||||
{
|
||||
//
|
||||
// concrete_task<R, F, Args...>
|
||||
//
|
||||
|
||||
template < typename R, typename F, typename... Args >
|
||||
template < typename U >
|
||||
jobber::concrete_task<R, F, Args...>::concrete_task(U&& u, std::tuple<Args...>&& args)
|
||||
: f_(std::forward<U>(u))
|
||||
, args_(std::move(args)) {}
|
||||
|
||||
template < typename R, typename F, typename... Args >
|
||||
void jobber::concrete_task<R, F, Args...>::run() noexcept {
|
||||
try {
|
||||
R value = invoke_hpp::apply(std::move(f_), std::move(args_));
|
||||
promise_.resolve(std::move(value));
|
||||
} catch (...) {
|
||||
promise_.reject(std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
template < typename R, typename F, typename... Args >
|
||||
void jobber::concrete_task<R, F, Args...>::cancel() noexcept {
|
||||
promise_.reject(jobber_cancelled_exception());
|
||||
}
|
||||
|
||||
template < typename R, typename F, typename... Args >
|
||||
promise<R> jobber::concrete_task<R, F, Args...>::future() noexcept {
|
||||
return promise_;
|
||||
}
|
||||
|
||||
//
|
||||
// concrete_task<void, F, Args...>
|
||||
//
|
||||
|
||||
template < typename F, typename... Args >
|
||||
template < typename U >
|
||||
jobber::concrete_task<void, F, Args...>::concrete_task(U&& u, std::tuple<Args...>&& args)
|
||||
: f_(std::forward<U>(u))
|
||||
, args_(std::move(args)) {}
|
||||
|
||||
template < typename F, typename... Args >
|
||||
void jobber::concrete_task<void, F, Args...>::run() noexcept {
|
||||
try {
|
||||
invoke_hpp::apply(std::move(f_), std::move(args_));
|
||||
promise_.resolve();
|
||||
} catch (...) {
|
||||
promise_.reject(std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
template < typename F, typename... Args >
|
||||
void jobber::concrete_task<void, F, Args...>::cancel() noexcept {
|
||||
promise_.reject(jobber_cancelled_exception());
|
||||
}
|
||||
|
||||
template < typename F, typename... Args >
|
||||
promise<void> jobber::concrete_task<void, F, Args...>::future() noexcept {
|
||||
return promise_;
|
||||
}
|
||||
}
|
||||
@@ -545,7 +545,7 @@ namespace promise_hpp
|
||||
return state_->get();
|
||||
}
|
||||
|
||||
void wait() const {
|
||||
void wait() const noexcept {
|
||||
state_->wait();
|
||||
}
|
||||
|
||||
@@ -607,7 +607,7 @@ namespace promise_hpp
|
||||
return storage_.value();
|
||||
}
|
||||
|
||||
void wait() const {
|
||||
void wait() const noexcept {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
cond_var_.wait(lock, [this](){
|
||||
return status_ != status::pending;
|
||||
@@ -934,7 +934,7 @@ namespace promise_hpp
|
||||
state_->get();
|
||||
}
|
||||
|
||||
void wait() const {
|
||||
void wait() const noexcept {
|
||||
state_->wait();
|
||||
}
|
||||
|
||||
@@ -993,7 +993,7 @@ namespace promise_hpp
|
||||
assert(status_ == status::resolved);
|
||||
}
|
||||
|
||||
void wait() const {
|
||||
void wait() const noexcept {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
cond_var_.wait(lock, [this](){
|
||||
return status_ != status::pending;
|
||||
|
||||
307
scheduler.hpp
Normal file
307
scheduler.hpp
Normal file
@@ -0,0 +1,307 @@
|
||||
/*******************************************************************************
|
||||
* This file is part of the "promise.hpp"
|
||||
* For conditions of distribution and use, see copyright notice in LICENSE.md
|
||||
* Copyright (C) 2018 Matvey Cherevko
|
||||
******************************************************************************/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <cassert>
|
||||
|
||||
#include <tuple>
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include <utility>
|
||||
#include <exception>
|
||||
#include <stdexcept>
|
||||
#include <type_traits>
|
||||
#include <condition_variable>
|
||||
|
||||
#include "promise.hpp"
|
||||
|
||||
namespace scheduler_hpp
|
||||
{
|
||||
using namespace promise_hpp;
|
||||
|
||||
enum class scheduler_priority {
|
||||
lowest,
|
||||
below_normal,
|
||||
normal,
|
||||
above_normal,
|
||||
highest
|
||||
};
|
||||
|
||||
enum class scheduler_wait_status {
|
||||
no_timeout,
|
||||
cancelled,
|
||||
timeout
|
||||
};
|
||||
|
||||
class scheduler_cancelled_exception : public std::runtime_error {
|
||||
public:
|
||||
scheduler_cancelled_exception()
|
||||
: std::runtime_error("scheduler has stopped working") {}
|
||||
};
|
||||
|
||||
class scheduler final : private detail::noncopyable {
|
||||
public:
|
||||
scheduler();
|
||||
~scheduler() noexcept;
|
||||
|
||||
template < typename F, typename... Args >
|
||||
using schedule_invoke_result_t = invoke_hpp::invoke_result_t<
|
||||
std::decay_t<F>,
|
||||
std::decay_t<Args>...>;
|
||||
|
||||
template < typename F, typename... Args
|
||||
, typename R = schedule_invoke_result_t<F, Args...> >
|
||||
promise<R> schedule(F&& f, Args&&... args);
|
||||
|
||||
template < typename F, typename... Args
|
||||
, typename R = schedule_invoke_result_t<F, Args...> >
|
||||
promise<R> schedule(scheduler_priority scheduler_priority, F&& f, Args&&... args);
|
||||
|
||||
scheduler_wait_status process_all_tasks() noexcept;
|
||||
|
||||
template < typename Rep, typename Period >
|
||||
scheduler_wait_status process_tasks_for(
|
||||
const std::chrono::duration<Rep, Period>& timeout_duration) noexcept;
|
||||
|
||||
template < typename Clock, typename Duration >
|
||||
scheduler_wait_status process_tasks_until(
|
||||
const std::chrono::time_point<Clock, Duration>& timeout_time) noexcept;
|
||||
private:
|
||||
class task;
|
||||
using task_ptr = std::unique_ptr<task>;
|
||||
template < typename R, typename F, typename... Args >
|
||||
class concrete_task;
|
||||
private:
|
||||
void push_task_(scheduler_priority scheduler_priority, task_ptr task);
|
||||
task_ptr pop_task_() noexcept;
|
||||
void shutdown_() noexcept;
|
||||
void process_task_(std::unique_lock<std::mutex> lock) noexcept;
|
||||
private:
|
||||
std::vector<std::pair<scheduler_priority, task_ptr>> tasks_;
|
||||
std::atomic<bool> cancelled_{false};
|
||||
std::atomic<std::size_t> active_task_count_{0};
|
||||
mutable std::mutex tasks_mutex_;
|
||||
mutable std::condition_variable cond_var_;
|
||||
};
|
||||
|
||||
class scheduler::task : private noncopyable {
|
||||
public:
|
||||
virtual ~task() noexcept = default;
|
||||
virtual void run() noexcept = 0;
|
||||
virtual void cancel() noexcept = 0;
|
||||
};
|
||||
|
||||
template < typename R, typename F, typename... Args >
|
||||
class scheduler::concrete_task : public task {
|
||||
F f_;
|
||||
std::tuple<Args...> args_;
|
||||
promise<R> promise_;
|
||||
public:
|
||||
template < typename U >
|
||||
concrete_task(U&& u, std::tuple<Args...>&& args);
|
||||
void run() noexcept final;
|
||||
void cancel() noexcept final;
|
||||
promise<R> future() noexcept;
|
||||
};
|
||||
|
||||
template < typename F, typename... Args >
|
||||
class scheduler::concrete_task<void, F, Args...> : public task {
|
||||
F f_;
|
||||
std::tuple<Args...> args_;
|
||||
promise<void> promise_;
|
||||
public:
|
||||
template < typename U >
|
||||
concrete_task(U&& u, std::tuple<Args...>&& args);
|
||||
void run() noexcept final;
|
||||
void cancel() noexcept final;
|
||||
promise<void> future() noexcept;
|
||||
};
|
||||
}
|
||||
|
||||
namespace scheduler_hpp
|
||||
{
|
||||
inline scheduler::scheduler() = default;
|
||||
|
||||
inline scheduler::~scheduler() noexcept {
|
||||
shutdown_();
|
||||
}
|
||||
|
||||
template < typename F, typename... Args, typename R >
|
||||
promise<R> scheduler::schedule(F&& f, Args&&... args) {
|
||||
return schedule(
|
||||
scheduler_priority::normal,
|
||||
std::forward<F>(f),
|
||||
std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
template < typename F, typename... Args, typename R >
|
||||
promise<R> scheduler::schedule(scheduler_priority priority, F&& f, Args&&... args) {
|
||||
using task_t = concrete_task<
|
||||
R,
|
||||
std::decay_t<F>,
|
||||
std::decay_t<Args>...>;
|
||||
std::unique_ptr<task_t> task = std::make_unique<task_t>(
|
||||
std::forward<F>(f),
|
||||
std::make_tuple(std::forward<Args>(args)...));
|
||||
promise<R> future = task->future();
|
||||
std::lock_guard<std::mutex> guard(tasks_mutex_);
|
||||
push_task_(priority, std::move(task));
|
||||
return future;
|
||||
}
|
||||
|
||||
inline scheduler_wait_status scheduler::process_all_tasks() noexcept {
|
||||
while ( !cancelled_ && active_task_count_ ) {
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex_);
|
||||
cond_var_.wait(lock, [this](){
|
||||
return cancelled_ || !active_task_count_ || !tasks_.empty();
|
||||
});
|
||||
if ( !tasks_.empty() ) {
|
||||
process_task_(std::move(lock));
|
||||
}
|
||||
}
|
||||
return cancelled_
|
||||
? scheduler_wait_status::cancelled
|
||||
: scheduler_wait_status::no_timeout;
|
||||
}
|
||||
|
||||
template < typename Rep, typename Period >
|
||||
scheduler_wait_status scheduler::process_tasks_for(
|
||||
const std::chrono::duration<Rep, Period>& timeout_duration) noexcept
|
||||
{
|
||||
return process_tasks_until(
|
||||
std::chrono::steady_clock::now() + timeout_duration);
|
||||
}
|
||||
|
||||
template < typename Clock, typename Duration >
|
||||
scheduler_wait_status scheduler::process_tasks_until(
|
||||
const std::chrono::time_point<Clock, Duration>& timeout_time) noexcept
|
||||
{
|
||||
while ( !cancelled_ && active_task_count_ ) {
|
||||
if ( !(Clock::now() < timeout_time) ) {
|
||||
return scheduler_wait_status::timeout;
|
||||
}
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex_);
|
||||
cond_var_.wait_until(lock, timeout_time, [this](){
|
||||
return cancelled_ || !active_task_count_ || !tasks_.empty();
|
||||
});
|
||||
if ( !tasks_.empty() ) {
|
||||
process_task_(std::move(lock));
|
||||
}
|
||||
}
|
||||
return cancelled_
|
||||
? scheduler_wait_status::cancelled
|
||||
: scheduler_wait_status::no_timeout;
|
||||
}
|
||||
|
||||
inline void scheduler::push_task_(scheduler_priority priority, task_ptr task) {
|
||||
tasks_.emplace_back(priority, std::move(task));
|
||||
std::push_heap(tasks_.begin(), tasks_.end());
|
||||
++active_task_count_;
|
||||
cond_var_.notify_all();
|
||||
}
|
||||
|
||||
inline scheduler::task_ptr scheduler::pop_task_() noexcept {
|
||||
if ( !tasks_.empty() ) {
|
||||
std::pop_heap(tasks_.begin(), tasks_.end());
|
||||
task_ptr task = std::move(tasks_.back().second);
|
||||
tasks_.pop_back();
|
||||
return task;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
inline void scheduler::shutdown_() noexcept {
|
||||
std::lock_guard<std::mutex> guard(tasks_mutex_);
|
||||
while ( !tasks_.empty() ) {
|
||||
task_ptr task = pop_task_();
|
||||
if ( task ) {
|
||||
task->cancel();
|
||||
--active_task_count_;
|
||||
}
|
||||
}
|
||||
cancelled_.store(true);
|
||||
cond_var_.notify_all();
|
||||
}
|
||||
|
||||
inline void scheduler::process_task_(std::unique_lock<std::mutex> lock) noexcept {
|
||||
assert(lock.owns_lock());
|
||||
task_ptr task = pop_task_();
|
||||
if ( task ) {
|
||||
lock.unlock();
|
||||
task->run();
|
||||
--active_task_count_;
|
||||
cond_var_.notify_all();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace scheduler_hpp
|
||||
{
|
||||
//
|
||||
// concrete_task<R, F, Args...>
|
||||
//
|
||||
|
||||
template < typename R, typename F, typename... Args >
|
||||
template < typename U >
|
||||
scheduler::concrete_task<R, F, Args...>::concrete_task(U&& u, std::tuple<Args...>&& args)
|
||||
: f_(std::forward<U>(u))
|
||||
, args_(std::move(args)) {}
|
||||
|
||||
template < typename R, typename F, typename... Args >
|
||||
void scheduler::concrete_task<R, F, Args...>::run() noexcept {
|
||||
try {
|
||||
R value = invoke_hpp::apply(std::move(f_), std::move(args_));
|
||||
promise_.resolve(std::move(value));
|
||||
} catch (...) {
|
||||
promise_.reject(std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
template < typename R, typename F, typename... Args >
|
||||
void scheduler::concrete_task<R, F, Args...>::cancel() noexcept {
|
||||
promise_.reject(scheduler_cancelled_exception());
|
||||
}
|
||||
|
||||
template < typename R, typename F, typename... Args >
|
||||
promise<R> scheduler::concrete_task<R, F, Args...>::future() noexcept {
|
||||
return promise_;
|
||||
}
|
||||
|
||||
//
|
||||
// concrete_task<void, F, Args...>
|
||||
//
|
||||
|
||||
template < typename F, typename... Args >
|
||||
template < typename U >
|
||||
scheduler::concrete_task<void, F, Args...>::concrete_task(U&& u, std::tuple<Args...>&& args)
|
||||
: f_(std::forward<U>(u))
|
||||
, args_(std::move(args)) {}
|
||||
|
||||
template < typename F, typename... Args >
|
||||
void scheduler::concrete_task<void, F, Args...>::run() noexcept {
|
||||
try {
|
||||
invoke_hpp::apply(std::move(f_), std::move(args_));
|
||||
promise_.resolve();
|
||||
} catch (...) {
|
||||
promise_.reject(std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
template < typename F, typename... Args >
|
||||
void scheduler::concrete_task<void, F, Args...>::cancel() noexcept {
|
||||
promise_.reject(scheduler_cancelled_exception());
|
||||
}
|
||||
|
||||
template < typename F, typename... Args >
|
||||
promise<void> scheduler::concrete_task<void, F, Args...>::future() noexcept {
|
||||
return promise_;
|
||||
}
|
||||
}
|
||||
378
tests.cpp
378
tests.cpp
@@ -7,12 +7,17 @@
|
||||
#define CATCH_CONFIG_FAST_COMPILE
|
||||
#include "catch.hpp"
|
||||
|
||||
#include "promise.hpp"
|
||||
namespace pr = promise_hpp;
|
||||
|
||||
#include <thread>
|
||||
#include <numeric>
|
||||
#include <cstring>
|
||||
|
||||
#include "jobber.hpp"
|
||||
#include "promise.hpp"
|
||||
#include "scheduler.hpp"
|
||||
|
||||
namespace jb = jobber_hpp;
|
||||
namespace pr = promise_hpp;
|
||||
namespace sd = scheduler_hpp;
|
||||
|
||||
namespace
|
||||
{
|
||||
struct obj_t {
|
||||
@@ -977,3 +982,368 @@ TEST_CASE("get_and_wait") {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("jobber") {
|
||||
{
|
||||
jb::jobber j(1);
|
||||
auto pv0 = j.async([](){
|
||||
throw std::exception();
|
||||
});
|
||||
REQUIRE_THROWS_AS(pv0.get(), std::exception);
|
||||
}
|
||||
{
|
||||
auto pv0 = pr::promise<int>();
|
||||
{
|
||||
jb::jobber j{0};
|
||||
pv0 = j.async([](){
|
||||
return 42;
|
||||
});
|
||||
}
|
||||
REQUIRE_THROWS_AS(pv0.get(), jb::jobber_cancelled_exception);
|
||||
}
|
||||
{
|
||||
int v5 = 5;
|
||||
|
||||
jb::jobber j(1);
|
||||
auto pv0 = j.async([](int v){
|
||||
REQUIRE(v == 5);
|
||||
throw std::exception();
|
||||
}, v5);
|
||||
REQUIRE_THROWS_AS(pv0.get(), std::exception);
|
||||
|
||||
auto pv1 = j.async([](int& v){
|
||||
REQUIRE(v == 5);
|
||||
return v != 5
|
||||
? 0
|
||||
: throw std::exception();
|
||||
}, std::ref(v5));
|
||||
REQUIRE_THROWS_AS(pv1.get(), std::exception);
|
||||
|
||||
auto pv3 = j.async([](int& v){
|
||||
v = 4;
|
||||
return v;
|
||||
}, std::ref(v5));
|
||||
REQUIRE(pv3.get() == v5);
|
||||
REQUIRE(v5 == 4);
|
||||
}
|
||||
{
|
||||
const float pi = 3.14159265358979323846264338327950288f;
|
||||
jb::jobber j(1);
|
||||
auto p0 = j.async([](float angle){
|
||||
return std::sin(angle);
|
||||
}, pi);
|
||||
auto p1 = j.async([](float angle){
|
||||
return std::cos(angle);
|
||||
}, pi * 2);
|
||||
REQUIRE(p0.get() == Approx(0.f).margin(0.01f));
|
||||
REQUIRE(p1.get() == Approx(1.f).margin(0.01f));
|
||||
}
|
||||
{
|
||||
jb::jobber j(1);
|
||||
j.pause();
|
||||
jb::jobber_priority max_priority = jb::jobber_priority::highest;
|
||||
j.async([](){
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2));
|
||||
});
|
||||
for ( std::size_t i = 0; i < 10; ++i ) {
|
||||
jb::jobber_priority p = static_cast<jb::jobber_priority>(
|
||||
i % static_cast<std::size_t>(jb::jobber_priority::highest));
|
||||
j.async(p, [&max_priority](jb::jobber_priority priority) {
|
||||
REQUIRE(priority <= max_priority);
|
||||
max_priority = priority;
|
||||
}, p);
|
||||
}
|
||||
j.resume();
|
||||
j.wait_all();
|
||||
}
|
||||
{
|
||||
jb::jobber j(1);
|
||||
std::atomic<int> counter = ATOMIC_VAR_INIT(0);
|
||||
j.pause();
|
||||
for ( std::size_t i = 0; i < 10; ++i ) {
|
||||
j.async([&counter](){
|
||||
++counter;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
});
|
||||
}
|
||||
|
||||
j.resume();
|
||||
REQUIRE(counter < 10);
|
||||
j.wait_all();
|
||||
REQUIRE(counter == 10);
|
||||
}
|
||||
{
|
||||
jb::jobber j(1);
|
||||
std::atomic<int> counter = ATOMIC_VAR_INIT(0);
|
||||
j.pause();
|
||||
for ( std::size_t i = 0; i < 10; ++i ) {
|
||||
j.async([&counter](){
|
||||
++counter;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
});
|
||||
}
|
||||
REQUIRE(counter < 10);
|
||||
j.active_wait_all();
|
||||
REQUIRE(counter == 10);
|
||||
}
|
||||
{
|
||||
jb::jobber j(1);
|
||||
|
||||
const auto time_now = [](){
|
||||
return std::chrono::high_resolution_clock::now();
|
||||
};
|
||||
|
||||
REQUIRE(jb::jobber_wait_status::no_timeout == j.wait_all_for(std::chrono::milliseconds(-1)));
|
||||
REQUIRE(jb::jobber_wait_status::no_timeout == j.wait_all_until(time_now() + std::chrono::milliseconds(-1)));
|
||||
REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_for(std::chrono::milliseconds(-1)));
|
||||
REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_until(time_now() + std::chrono::milliseconds(-1)));
|
||||
|
||||
j.pause();
|
||||
j.async([]{});
|
||||
|
||||
REQUIRE(jb::jobber_wait_status::timeout == j.wait_all_for(std::chrono::milliseconds(-1)));
|
||||
REQUIRE(jb::jobber_wait_status::timeout == j.wait_all_until(time_now() + std::chrono::milliseconds(-1)));
|
||||
REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_for(std::chrono::milliseconds(-1)));
|
||||
REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_until(time_now() + std::chrono::milliseconds(-1)));
|
||||
}
|
||||
{
|
||||
jb::jobber j(1);
|
||||
std::atomic<int> counter = ATOMIC_VAR_INIT(0);
|
||||
j.pause();
|
||||
for ( std::size_t i = 0; i < 10; ++i ) {
|
||||
j.async([&counter](){
|
||||
++counter;
|
||||
});
|
||||
}
|
||||
|
||||
const auto time_now = [](){
|
||||
return std::chrono::high_resolution_clock::now();
|
||||
};
|
||||
|
||||
j.wait_all_for(std::chrono::milliseconds(10));
|
||||
j.wait_all_until(time_now() + std::chrono::milliseconds(10));
|
||||
REQUIRE(counter == 0);
|
||||
|
||||
j.active_wait_all_for(std::chrono::milliseconds(10));
|
||||
j.active_wait_all_until(time_now() + std::chrono::milliseconds(10));
|
||||
REQUIRE(counter > 0);
|
||||
}
|
||||
{
|
||||
jb::jobber j(1);
|
||||
std::atomic<int> counter = ATOMIC_VAR_INIT(0);
|
||||
j.pause();
|
||||
for ( std::size_t i = 0; i < 50; ++i ) {
|
||||
j.async([&counter](){
|
||||
++counter;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
});
|
||||
}
|
||||
|
||||
const auto time_now = [](){
|
||||
return std::chrono::high_resolution_clock::now();
|
||||
};
|
||||
|
||||
const auto b = time_now();
|
||||
|
||||
j.resume();
|
||||
j.wait_all_for(std::chrono::milliseconds(100));
|
||||
REQUIRE(time_now() - b > std::chrono::milliseconds(50));
|
||||
REQUIRE(counter > 2);
|
||||
REQUIRE(counter < 50);
|
||||
|
||||
j.wait_all_until(time_now() + std::chrono::seconds(3));
|
||||
REQUIRE(counter == 50);
|
||||
}
|
||||
{
|
||||
jb::jobber j(1);
|
||||
std::atomic<int> counter = ATOMIC_VAR_INIT(0);
|
||||
j.pause();
|
||||
for ( std::size_t i = 0; i < 50; ++i ) {
|
||||
j.async([&counter](){
|
||||
++counter;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
});
|
||||
}
|
||||
|
||||
const auto time_now = [](){
|
||||
return std::chrono::high_resolution_clock::now();
|
||||
};
|
||||
|
||||
const auto b = time_now();
|
||||
|
||||
j.wait_all_for(std::chrono::milliseconds(15));
|
||||
REQUIRE(time_now() - b > std::chrono::milliseconds(10));
|
||||
REQUIRE(counter == 0);
|
||||
|
||||
j.wait_all_until(time_now() + std::chrono::milliseconds(15));
|
||||
REQUIRE(time_now() - b > std::chrono::milliseconds(20));
|
||||
REQUIRE(counter == 0);
|
||||
|
||||
j.active_wait_all_for(std::chrono::milliseconds(100));
|
||||
REQUIRE(time_now() - b > std::chrono::milliseconds(70));
|
||||
REQUIRE(counter > 2);
|
||||
REQUIRE(counter < 50);
|
||||
|
||||
j.active_wait_all_until(time_now() + std::chrono::seconds(3));
|
||||
REQUIRE(counter == 50);
|
||||
}
|
||||
{
|
||||
jb::jobber j(1);
|
||||
std::atomic<int> counter = ATOMIC_VAR_INIT(0);
|
||||
j.pause();
|
||||
for ( std::size_t i = 0; i < 30; ++i ) {
|
||||
j.async([&counter](){
|
||||
++counter;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
});
|
||||
}
|
||||
j.resume();
|
||||
REQUIRE(jb::jobber_wait_status::timeout == j.wait_all_for(std::chrono::milliseconds(50)));
|
||||
REQUIRE(counter > 0);
|
||||
REQUIRE(jb::jobber_wait_status::no_timeout == j.wait_all_for(std::chrono::seconds(5)));
|
||||
REQUIRE(counter == 30);
|
||||
}
|
||||
{
|
||||
jb::jobber j(1);
|
||||
std::atomic<int> counter = ATOMIC_VAR_INIT(0);
|
||||
j.pause();
|
||||
for ( std::size_t i = 0; i < 30; ++i ) {
|
||||
j.async([&counter](){
|
||||
++counter;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
});
|
||||
}
|
||||
REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_for(std::chrono::milliseconds(50)));
|
||||
REQUIRE(counter > 0);
|
||||
REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_for(std::chrono::seconds(5)));
|
||||
REQUIRE(counter == 30);
|
||||
}
|
||||
{
|
||||
jb::jobber j(2);
|
||||
jb::jobber g(2);
|
||||
|
||||
std::vector<pr::promise<float>> jp(50);
|
||||
for ( auto& jpi : jp ) {
|
||||
jpi = j.async([&g](){
|
||||
std::vector<pr::promise<float>> gp(50);
|
||||
for ( std::size_t i = 0; i < gp.size(); ++i ) {
|
||||
gp[i] = g.async([](float angle){
|
||||
return std::sin(angle);
|
||||
}, static_cast<float>(i));
|
||||
}
|
||||
return std::accumulate(gp.begin(), gp.end(), 0.f,
|
||||
[](float r, pr::promise<float>& f){
|
||||
return r + f.get();
|
||||
});
|
||||
});
|
||||
}
|
||||
float r0 = std::accumulate(jp.begin(), jp.end(), 0.f,
|
||||
[](float r, pr::promise<float>& f){
|
||||
return r + f.get();
|
||||
});
|
||||
float r1 = 0.f;
|
||||
for ( std::size_t i = 0; i < 50; ++i ) {
|
||||
r1 += std::sin(static_cast<float>(i));
|
||||
}
|
||||
REQUIRE(r0 == Approx(r1 * 50.f).margin(0.01f));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("scheduler") {
|
||||
{
|
||||
sd::scheduler s;
|
||||
auto pv0 = s.schedule([](){
|
||||
throw std::exception();
|
||||
});
|
||||
s.process_all_tasks();
|
||||
REQUIRE_THROWS_AS(pv0.get(), std::exception);
|
||||
}
|
||||
{
|
||||
auto pv0 = pr::promise<int>();
|
||||
{
|
||||
sd::scheduler s;
|
||||
pv0 = s.schedule([](){
|
||||
return 42;
|
||||
});
|
||||
}
|
||||
REQUIRE_THROWS_AS(pv0.get(), sd::scheduler_cancelled_exception);
|
||||
}
|
||||
{
|
||||
sd::scheduler s;
|
||||
int counter = 0;
|
||||
s.schedule([&counter](){ ++counter; });
|
||||
REQUIRE(counter == 0);
|
||||
s.process_all_tasks();
|
||||
REQUIRE(counter == 1);
|
||||
s.schedule([&counter](){ ++counter; });
|
||||
s.schedule([&counter](){ ++counter; });
|
||||
REQUIRE(counter == 1);
|
||||
s.process_all_tasks();
|
||||
REQUIRE(counter == 3);
|
||||
}
|
||||
{
|
||||
sd::scheduler s;
|
||||
int counter = 0;
|
||||
for ( std::size_t i = 0; i < 50; ++i ) {
|
||||
s.schedule([&counter](){
|
||||
++counter;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
});
|
||||
}
|
||||
s.process_tasks_for(std::chrono::milliseconds(-1));
|
||||
s.process_tasks_for(std::chrono::milliseconds(0));
|
||||
REQUIRE(counter == 0);
|
||||
s.process_tasks_for(std::chrono::milliseconds(100));
|
||||
REQUIRE(counter > 2);
|
||||
REQUIRE(counter < 50);
|
||||
s.process_tasks_for(std::chrono::seconds(3));
|
||||
REQUIRE(counter == 50);
|
||||
}
|
||||
{
|
||||
sd::scheduler s;
|
||||
int counter = 0;
|
||||
for ( std::size_t i = 0; i < 50; ++i ) {
|
||||
s.schedule([&counter](){
|
||||
++counter;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
});
|
||||
}
|
||||
|
||||
const auto time_now = [](){
|
||||
return std::chrono::high_resolution_clock::now();
|
||||
};
|
||||
|
||||
const auto b = time_now();
|
||||
|
||||
s.process_tasks_until(time_now() - std::chrono::milliseconds(1));
|
||||
s.process_tasks_until(time_now());
|
||||
REQUIRE(counter == 0);
|
||||
s.process_tasks_until(time_now() + std::chrono::milliseconds(100));
|
||||
REQUIRE(time_now() - b > std::chrono::milliseconds(50));
|
||||
REQUIRE(counter > 2);
|
||||
REQUIRE(counter < 50);
|
||||
s.process_tasks_until(time_now() + std::chrono::seconds(3));
|
||||
REQUIRE(counter == 50);
|
||||
}
|
||||
{
|
||||
sd::scheduler s;
|
||||
std::string accumulator;
|
||||
s.schedule(sd::scheduler_priority::lowest, [](std::string& acc){
|
||||
acc.append("o");
|
||||
}, std::ref(accumulator));
|
||||
s.schedule(sd::scheduler_priority::below_normal, [](std::string& acc){
|
||||
acc.append("l");
|
||||
}, std::ref(accumulator));
|
||||
s.schedule(sd::scheduler_priority::highest, [](std::string& acc){
|
||||
acc.append("h");
|
||||
}, std::ref(accumulator));
|
||||
s.schedule(sd::scheduler_priority::above_normal, [](std::string& acc){
|
||||
acc.append("e");
|
||||
}, std::ref(accumulator));
|
||||
s.schedule(sd::scheduler_priority::normal, [](std::string& acc){
|
||||
acc.append("l");
|
||||
}, std::ref(accumulator));
|
||||
s.process_all_tasks();
|
||||
REQUIRE(accumulator == "hello");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user