update modules

This commit is contained in:
2018-12-17 19:56:18 +07:00
parent f9fbc5259b
commit efb2d0fb60
4 changed files with 140 additions and 58 deletions

View File

@@ -277,7 +277,7 @@ namespace jobber_hpp
tasks_.emplace_back(priority, std::move(task)); tasks_.emplace_back(priority, std::move(task));
std::push_heap(tasks_.begin(), tasks_.end()); std::push_heap(tasks_.begin(), tasks_.end());
++active_task_count_; ++active_task_count_;
cond_var_.notify_all(); cond_var_.notify_one();
} }
inline jobber::task_ptr jobber::pop_task_() noexcept { inline jobber::task_ptr jobber::pop_task_() noexcept {

View File

@@ -280,7 +280,8 @@ namespace promise_hpp
state_->attach( state_->attach(
next, next,
std::forward<ResolveF>(on_resolve), std::forward<ResolveF>(on_resolve),
std::forward<RejectF>(on_reject)); std::forward<RejectF>(on_reject),
true);
return next; return next;
} }
@@ -290,9 +291,15 @@ namespace promise_hpp
!is_promise<ResolveFR>::value, !is_promise<ResolveFR>::value,
promise<ResolveFR>> promise<ResolveFR>>
then(ResolveF&& on_resolve) { then(ResolveF&& on_resolve) {
return then( promise<ResolveFR> next;
state_->attach(
next,
std::forward<ResolveF>(on_resolve), std::forward<ResolveF>(on_resolve),
[](std::exception_ptr){}); [](std::exception_ptr e) -> ResolveFR {
std::rethrow_exception(e);
},
false);
return next;
} }
template < typename RejectF > template < typename RejectF >
@@ -422,18 +429,28 @@ namespace promise_hpp
template < typename U, typename ResolveF, typename RejectF > template < typename U, typename ResolveF, typename RejectF >
std::enable_if_t<std::is_void<U>::value, void> std::enable_if_t<std::is_void<U>::value, void>
attach(promise<U>& next, ResolveF&& on_resolve, RejectF&& on_reject) { attach(
promise<U>& next,
ResolveF&& on_resolve,
RejectF&& on_reject,
bool has_reject)
{
auto reject_h = [ auto reject_h = [
n = next, n = next,
f = std::forward<RejectF>(on_reject) f = std::forward<RejectF>(on_reject),
has_reject
](std::exception_ptr e) mutable { ](std::exception_ptr e) mutable {
try { if ( has_reject ) {
invoke_hpp::invoke( try {
std::forward<decltype(f)>(f), invoke_hpp::invoke(
e); std::forward<decltype(f)>(f),
e);
n.resolve();
} catch (...) {
n.reject(std::current_exception());
}
} else {
n.reject(e); n.reject(e);
} catch (...) {
n.reject(std::current_exception());
} }
}; };
@@ -457,18 +474,28 @@ namespace promise_hpp
template < typename U, typename ResolveF, typename RejectF > template < typename U, typename ResolveF, typename RejectF >
std::enable_if_t<!std::is_void<U>::value, void> std::enable_if_t<!std::is_void<U>::value, void>
attach(promise<U>& next, ResolveF&& on_resolve, RejectF&& on_reject) { attach(
promise<U>& next,
ResolveF&& on_resolve,
RejectF&& on_reject,
bool has_reject)
{
auto reject_h = [ auto reject_h = [
n = next, n = next,
f = std::forward<RejectF>(on_reject) f = std::forward<RejectF>(on_reject),
has_reject
](std::exception_ptr e) mutable { ](std::exception_ptr e) mutable {
try { if ( has_reject ) {
invoke_hpp::invoke( try {
std::forward<decltype(f)>(f), auto r = invoke_hpp::invoke(
e); std::forward<decltype(f)>(f),
e);
n.resolve(std::move(r));
} catch (...) {
n.reject(std::current_exception());
}
} else {
n.reject(e); n.reject(e);
} catch (...) {
n.reject(std::current_exception());
} }
}; };
@@ -683,7 +710,8 @@ namespace promise_hpp
state_->attach( state_->attach(
next, next,
std::forward<ResolveF>(on_resolve), std::forward<ResolveF>(on_resolve),
std::forward<RejectF>(on_reject)); std::forward<RejectF>(on_reject),
true);
return next; return next;
} }
@@ -693,9 +721,15 @@ namespace promise_hpp
!is_promise<ResolveFR>::value, !is_promise<ResolveFR>::value,
promise<ResolveFR>> promise<ResolveFR>>
then(ResolveF&& on_resolve) { then(ResolveF&& on_resolve) {
return then( promise<ResolveFR> next;
state_->attach(
next,
std::forward<ResolveF>(on_resolve), std::forward<ResolveF>(on_resolve),
[](std::exception_ptr){}); [](std::exception_ptr e) -> ResolveFR {
std::rethrow_exception(e);
},
false);
return next;
} }
template < typename RejectF > template < typename RejectF >
@@ -821,18 +855,28 @@ namespace promise_hpp
template < typename U, typename ResolveF, typename RejectF > template < typename U, typename ResolveF, typename RejectF >
std::enable_if_t<std::is_void<U>::value, void> std::enable_if_t<std::is_void<U>::value, void>
attach(promise<U>& next, ResolveF&& on_resolve, RejectF&& on_reject) { attach(
promise<U>& next,
ResolveF&& on_resolve,
RejectF&& on_reject,
bool has_reject)
{
auto reject_h = [ auto reject_h = [
n = next, n = next,
f = std::forward<RejectF>(on_reject) f = std::forward<RejectF>(on_reject),
has_reject
](std::exception_ptr e) mutable { ](std::exception_ptr e) mutable {
try { if ( has_reject ) {
invoke_hpp::invoke( try {
std::forward<decltype(f)>(f), invoke_hpp::invoke(
e); std::forward<decltype(f)>(f),
e);
n.resolve();
} catch (...) {
n.reject(std::current_exception());
}
} else {
n.reject(e); n.reject(e);
} catch (...) {
n.reject(std::current_exception());
} }
}; };
@@ -855,18 +899,28 @@ namespace promise_hpp
template < typename U, typename ResolveF, typename RejectF > template < typename U, typename ResolveF, typename RejectF >
std::enable_if_t<!std::is_void<U>::value, void> std::enable_if_t<!std::is_void<U>::value, void>
attach(promise<U>& next, ResolveF&& on_resolve, RejectF&& on_reject) { attach(
promise<U>& next,
ResolveF&& on_resolve,
RejectF&& on_reject,
bool has_reject)
{
auto reject_h = [ auto reject_h = [
n = next, n = next,
f = std::forward<RejectF>(on_reject) f = std::forward<RejectF>(on_reject),
has_reject
](std::exception_ptr e) mutable { ](std::exception_ptr e) mutable {
try { if ( has_reject ) {
invoke_hpp::invoke( try {
std::forward<decltype(f)>(f), auto r = invoke_hpp::invoke(
e); std::forward<decltype(f)>(f),
e);
n.resolve(std::move(r));
} catch (...) {
n.reject(std::current_exception());
}
} else {
n.reject(e); n.reject(e);
} catch (...) {
n.reject(std::current_exception());
} }
}; };

View File

@@ -35,10 +35,10 @@ namespace scheduler_hpp
highest highest
}; };
enum class scheduler_wait_status { enum class scheduler_processing_status {
no_timeout, done,
cancelled, timeout,
timeout cancelled
}; };
class scheduler_cancelled_exception : public std::runtime_error { class scheduler_cancelled_exception : public std::runtime_error {
@@ -52,6 +52,10 @@ namespace scheduler_hpp
scheduler(); scheduler();
~scheduler() noexcept; ~scheduler() noexcept;
using processing_result_t = std::pair<
scheduler_processing_status,
std::size_t>;
template < typename F, typename... Args > template < typename F, typename... Args >
using schedule_invoke_result_t = invoke_hpp::invoke_result_t< using schedule_invoke_result_t = invoke_hpp::invoke_result_t<
std::decay_t<F>, std::decay_t<F>,
@@ -65,14 +69,15 @@ namespace scheduler_hpp
, typename R = schedule_invoke_result_t<F, Args...> > , typename R = schedule_invoke_result_t<F, Args...> >
promise<R> schedule(scheduler_priority scheduler_priority, F&& f, Args&&... args); promise<R> schedule(scheduler_priority scheduler_priority, F&& f, Args&&... args);
scheduler_wait_status process_all_tasks() noexcept; processing_result_t process_one_task() noexcept;
processing_result_t process_all_tasks() noexcept;
template < typename Rep, typename Period > template < typename Rep, typename Period >
scheduler_wait_status process_tasks_for( processing_result_t process_tasks_for(
const std::chrono::duration<Rep, Period>& timeout_duration) noexcept; const std::chrono::duration<Rep, Period>& timeout_duration) noexcept;
template < typename Clock, typename Duration > template < typename Clock, typename Duration >
scheduler_wait_status process_tasks_until( processing_result_t process_tasks_until(
const std::chrono::time_point<Clock, Duration>& timeout_time) noexcept; const std::chrono::time_point<Clock, Duration>& timeout_time) noexcept;
private: private:
class task; class task;
@@ -157,7 +162,20 @@ namespace scheduler_hpp
return future; return future;
} }
inline scheduler_wait_status scheduler::process_all_tasks() noexcept { inline scheduler::processing_result_t scheduler::process_one_task() noexcept {
std::unique_lock<std::mutex> lock(tasks_mutex_);
if ( cancelled_ ) {
return std::make_pair(scheduler_processing_status::cancelled, 0u);
}
if ( tasks_.empty() ) {
return std::make_pair(scheduler_processing_status::done, 0u);
}
process_task_(std::move(lock));
return std::make_pair(scheduler_processing_status::done, 1u);
}
inline scheduler::processing_result_t scheduler::process_all_tasks() noexcept {
std::size_t processed_tasks = 0;
while ( !cancelled_ && active_task_count_ ) { while ( !cancelled_ && active_task_count_ ) {
std::unique_lock<std::mutex> lock(tasks_mutex_); std::unique_lock<std::mutex> lock(tasks_mutex_);
cond_var_.wait(lock, [this](){ cond_var_.wait(lock, [this](){
@@ -165,15 +183,18 @@ namespace scheduler_hpp
}); });
if ( !tasks_.empty() ) { if ( !tasks_.empty() ) {
process_task_(std::move(lock)); process_task_(std::move(lock));
++processed_tasks;
} }
} }
return cancelled_ return std::make_pair(
? scheduler_wait_status::cancelled cancelled_
: scheduler_wait_status::no_timeout; ? scheduler_processing_status::cancelled
: scheduler_processing_status::done,
processed_tasks);
} }
template < typename Rep, typename Period > template < typename Rep, typename Period >
scheduler_wait_status scheduler::process_tasks_for( scheduler::processing_result_t scheduler::process_tasks_for(
const std::chrono::duration<Rep, Period>& timeout_duration) noexcept const std::chrono::duration<Rep, Period>& timeout_duration) noexcept
{ {
return process_tasks_until( return process_tasks_until(
@@ -181,12 +202,15 @@ namespace scheduler_hpp
} }
template < typename Clock, typename Duration > template < typename Clock, typename Duration >
scheduler_wait_status scheduler::process_tasks_until( scheduler::processing_result_t scheduler::process_tasks_until(
const std::chrono::time_point<Clock, Duration>& timeout_time) noexcept const std::chrono::time_point<Clock, Duration>& timeout_time) noexcept
{ {
std::size_t processed_tasks = 0;
while ( !cancelled_ && active_task_count_ ) { while ( !cancelled_ && active_task_count_ ) {
if ( !(Clock::now() < timeout_time) ) { if ( !(Clock::now() < timeout_time) ) {
return scheduler_wait_status::timeout; return std::make_pair(
scheduler_processing_status::timeout,
processed_tasks);
} }
std::unique_lock<std::mutex> lock(tasks_mutex_); std::unique_lock<std::mutex> lock(tasks_mutex_);
cond_var_.wait_until(lock, timeout_time, [this](){ cond_var_.wait_until(lock, timeout_time, [this](){
@@ -194,18 +218,21 @@ namespace scheduler_hpp
}); });
if ( !tasks_.empty() ) { if ( !tasks_.empty() ) {
process_task_(std::move(lock)); process_task_(std::move(lock));
++processed_tasks;
} }
} }
return cancelled_ return std::make_pair(
? scheduler_wait_status::cancelled cancelled_
: scheduler_wait_status::no_timeout; ? scheduler_processing_status::cancelled
: scheduler_processing_status::done,
processed_tasks);
} }
inline void scheduler::push_task_(scheduler_priority priority, task_ptr task) { inline void scheduler::push_task_(scheduler_priority priority, task_ptr task) {
tasks_.emplace_back(priority, std::move(task)); tasks_.emplace_back(priority, std::move(task));
std::push_heap(tasks_.begin(), tasks_.end()); std::push_heap(tasks_.begin(), tasks_.end());
++active_task_count_; ++active_task_count_;
cond_var_.notify_all(); cond_var_.notify_one();
} }
inline scheduler::task_ptr scheduler::pop_task_() noexcept { inline scheduler::task_ptr scheduler::pop_task_() noexcept {
@@ -237,6 +264,7 @@ namespace scheduler_hpp
if ( task ) { if ( task ) {
lock.unlock(); lock.unlock();
task->run(); task->run();
lock.lock();
--active_task_count_; --active_task_count_;
cond_var_.notify_all(); cond_var_.notify_all();
} }