jobber: extended active wait result

This commit is contained in:
2018-12-22 02:53:14 +07:00
parent ff6f80a5ef
commit 2a24233a8e
2 changed files with 70 additions and 30 deletions

View File

@@ -53,6 +53,10 @@ namespace jobber_hpp
explicit jobber(std::size_t threads);
~jobber() noexcept;
using active_wait_result_t = std::pair<
jobber_wait_status,
std::size_t>;
template < typename F, typename... Args >
using async_invoke_result_t = invoke_hpp::invoke_result_t<
std::decay_t<F>,
@@ -71,8 +75,8 @@ namespace jobber_hpp
bool is_paused() const noexcept;
jobber_wait_status wait_all() const noexcept;
jobber_wait_status active_wait_all() noexcept;
jobber_wait_status active_wait_one() noexcept;
active_wait_result_t active_wait_all() noexcept;
active_wait_result_t active_wait_one() noexcept;
template < typename Rep, typename Period >
jobber_wait_status wait_all_for(
@@ -83,11 +87,11 @@ namespace jobber_hpp
const std::chrono::time_point<Clock, Duration>& timeout_time) const;
template < typename Rep, typename Period >
jobber_wait_status active_wait_all_for(
active_wait_result_t active_wait_all_for(
const std::chrono::duration<Rep, Period>& timeout_duration);
template < typename Clock, typename Duration >
jobber_wait_status active_wait_all_until(
active_wait_result_t active_wait_all_until(
const std::chrono::time_point<Clock, Duration>& timeout_time);
private:
class task;
@@ -211,7 +215,8 @@ namespace jobber_hpp
: jobber_wait_status::no_timeout;
}
inline jobber_wait_status jobber::active_wait_all() noexcept {
inline jobber::active_wait_result_t jobber::active_wait_all() noexcept {
std::size_t processed_tasks = 0;
while ( !cancelled_ && active_task_count_ ) {
std::unique_lock<std::mutex> lock(tasks_mutex_);
cond_var_.wait(lock, [this](){
@@ -219,22 +224,26 @@ namespace jobber_hpp
});
if ( !tasks_.empty() ) {
process_task_(std::move(lock));
++processed_tasks;
}
}
return cancelled_
? jobber_wait_status::cancelled
: jobber_wait_status::no_timeout;
return std::make_pair(
cancelled_
? jobber_wait_status::cancelled
: jobber_wait_status::no_timeout,
processed_tasks);
}
inline jobber_wait_status jobber::active_wait_one() noexcept {
inline jobber::active_wait_result_t jobber::active_wait_one() noexcept {
std::unique_lock<std::mutex> lock(tasks_mutex_);
if ( cancelled_ ) {
return jobber_wait_status::cancelled;
return std::make_pair(jobber_wait_status::cancelled, 0u);
}
if ( !tasks_.empty() ) {
process_task_(std::move(lock));
if ( tasks_.empty() ) {
return std::make_pair(jobber_wait_status::no_timeout, 0u);
}
return jobber_wait_status::no_timeout;
process_task_(std::move(lock));
return std::make_pair(jobber_wait_status::no_timeout, 1u);
}
template < typename Rep, typename Period >
@@ -257,7 +266,7 @@ namespace jobber_hpp
}
template < typename Rep, typename Period >
jobber_wait_status jobber::active_wait_all_for(
jobber::active_wait_result_t jobber::active_wait_all_for(
const std::chrono::duration<Rep, Period>& timeout_duration)
{
return active_wait_all_until(
@@ -265,12 +274,15 @@ namespace jobber_hpp
}
template < typename Clock, typename Duration >
jobber_wait_status jobber::active_wait_all_until(
jobber::active_wait_result_t jobber::active_wait_all_until(
const std::chrono::time_point<Clock, Duration>& timeout_time)
{
std::size_t processed_tasks = 0;
while ( !cancelled_ && active_task_count_ ) {
if ( !(Clock::now() < timeout_time) ) {
return jobber_wait_status::timeout;
return std::make_pair(
jobber_wait_status::timeout,
processed_tasks);
}
std::unique_lock<std::mutex> lock(tasks_mutex_);
cond_var_.wait_until(lock, timeout_time, [this](){
@@ -278,11 +290,14 @@ namespace jobber_hpp
});
if ( !tasks_.empty() ) {
process_task_(std::move(lock));
++processed_tasks;
}
}
return cancelled_
? jobber_wait_status::cancelled
: jobber_wait_status::no_timeout;
return std::make_pair(
cancelled_
? jobber_wait_status::cancelled
: jobber_wait_status::no_timeout,
processed_tasks);
}
inline void jobber::push_task_(jobber_priority priority, task_ptr task) {