extended scheduler processing status

This commit is contained in:
2018-12-17 17:47:54 +07:00
parent 5dfcf5911c
commit e865f3d679
2 changed files with 106 additions and 44 deletions

View File

@@ -35,10 +35,10 @@ namespace scheduler_hpp
highest
};
enum class scheduler_wait_status {
no_timeout,
cancelled,
timeout
enum class scheduler_processing_status {
done,
timeout,
cancelled
};
class scheduler_cancelled_exception : public std::runtime_error {
@@ -52,6 +52,10 @@ namespace scheduler_hpp
scheduler();
~scheduler() noexcept;
using processing_result_t = std::pair<
scheduler_processing_status,
std::size_t>;
template < typename F, typename... Args >
using schedule_invoke_result_t = invoke_hpp::invoke_result_t<
std::decay_t<F>,
@@ -65,15 +69,15 @@ namespace scheduler_hpp
, typename R = schedule_invoke_result_t<F, Args...> >
promise<R> schedule(scheduler_priority scheduler_priority, F&& f, Args&&... args);
bool process_one_task() noexcept;
scheduler_wait_status process_all_tasks() noexcept;
processing_result_t process_one_task() noexcept;
processing_result_t process_all_tasks() noexcept;
template < typename Rep, typename Period >
scheduler_wait_status process_tasks_for(
processing_result_t process_tasks_for(
const std::chrono::duration<Rep, Period>& timeout_duration) noexcept;
template < typename Clock, typename Duration >
scheduler_wait_status process_tasks_until(
processing_result_t process_tasks_until(
const std::chrono::time_point<Clock, Duration>& timeout_time) noexcept;
private:
class task;
@@ -158,16 +162,20 @@ namespace scheduler_hpp
return future;
}
inline bool scheduler::process_one_task() noexcept {
inline scheduler::processing_result_t scheduler::process_one_task() noexcept {
std::unique_lock<std::mutex> lock(tasks_mutex_);
if ( cancelled_ ) {
return std::make_pair(scheduler_processing_status::cancelled, 0u);
}
if ( tasks_.empty() ) {
return false;
return std::make_pair(scheduler_processing_status::done, 0u);
}
process_task_(std::move(lock));
return true;
return std::make_pair(scheduler_processing_status::done, 1u);
}
inline scheduler_wait_status scheduler::process_all_tasks() noexcept {
inline scheduler::processing_result_t scheduler::process_all_tasks() noexcept {
std::size_t processed_tasks = 0;
while ( !cancelled_ && active_task_count_ ) {
std::unique_lock<std::mutex> lock(tasks_mutex_);
cond_var_.wait(lock, [this](){
@@ -175,15 +183,18 @@ namespace scheduler_hpp
});
if ( !tasks_.empty() ) {
process_task_(std::move(lock));
++processed_tasks;
}
}
return cancelled_
? scheduler_wait_status::cancelled
: scheduler_wait_status::no_timeout;
return std::make_pair(
cancelled_
? scheduler_processing_status::cancelled
: scheduler_processing_status::done,
processed_tasks);
}
template < typename Rep, typename Period >
scheduler_wait_status scheduler::process_tasks_for(
scheduler::processing_result_t scheduler::process_tasks_for(
const std::chrono::duration<Rep, Period>& timeout_duration) noexcept
{
return process_tasks_until(
@@ -191,12 +202,15 @@ namespace scheduler_hpp
}
template < typename Clock, typename Duration >
scheduler_wait_status scheduler::process_tasks_until(
scheduler::processing_result_t scheduler::process_tasks_until(
const std::chrono::time_point<Clock, Duration>& timeout_time) noexcept
{
std::size_t processed_tasks = 0;
while ( !cancelled_ && active_task_count_ ) {
if ( !(Clock::now() < timeout_time) ) {
return scheduler_wait_status::timeout;
return std::make_pair(
scheduler_processing_status::timeout,
processed_tasks);
}
std::unique_lock<std::mutex> lock(tasks_mutex_);
cond_var_.wait_until(lock, timeout_time, [this](){
@@ -204,18 +218,21 @@ namespace scheduler_hpp
});
if ( !tasks_.empty() ) {
process_task_(std::move(lock));
++processed_tasks;
}
}
return cancelled_
? scheduler_wait_status::cancelled
: scheduler_wait_status::no_timeout;
return std::make_pair(
cancelled_
? scheduler_processing_status::cancelled
: scheduler_processing_status::done,
processed_tasks);
}
inline void scheduler::push_task_(scheduler_priority priority, task_ptr task) {
tasks_.emplace_back(priority, std::move(task));
std::push_heap(tasks_.begin(), tasks_.end());
++active_task_count_;
cond_var_.notify_all();
cond_var_.notify_one();
}
inline scheduler::task_ptr scheduler::pop_task_() noexcept {