jobber: wait_for and wait_until

This commit is contained in:
2018-12-06 18:25:46 +07:00
parent de8299a3ee
commit 719aa6bb6d
4 changed files with 108 additions and 13 deletions

View File

@@ -7,6 +7,7 @@
#pragma once #pragma once
#include "_utils.hpp" #include "_utils.hpp"
#include "time.hpp"
namespace e2d namespace e2d
{ {
@@ -42,6 +43,18 @@ namespace e2d
void wait_all() const noexcept; void wait_all() const noexcept;
void active_wait_all() noexcept; void active_wait_all() noexcept;
template < typename T, typename TimeTag >
void wait_for(const unit<T, TimeTag>& time_for) const noexcept;
template < typename T, typename TimeTag >
void wait_until(const unit<T, TimeTag>& time_until) const noexcept;
template < typename T, typename TimeTag >
void active_wait_for(const unit<T, TimeTag>& time_for) noexcept;
template < typename T, typename TimeTag >
void active_wait_until(const unit<T, TimeTag>& time_until) noexcept;
private: private:
class task; class task;
using task_ptr = std::unique_ptr<task>; using task_ptr = std::unique_ptr<task>;
@@ -123,6 +136,55 @@ namespace e2d
return future; return future;
} }
//
// wait
//
template < typename T, typename TimeTag >
void jobber::wait_for(const unit<T, TimeTag>& time_for) const noexcept {
if ( time_for.value > T(0) ) {
wait_until(
time::now<TimeTag, u64>() +
time_for.template cast_to<u64>());
}
}
template < typename T, typename TimeTag >
void jobber::wait_until(const unit<T, TimeTag>& time_until) const noexcept {
while ( active_task_count_ ) {
const auto time_now = time::now<TimeTag, u64>();
if ( time_now >= time_until.template cast_to<u64>() ) {
break;
}
std::this_thread::yield();
}
}
//
// active_wait
//
template < typename T, typename TimeTag >
void jobber::active_wait_for(const unit<T, TimeTag>& time_for) noexcept {
if ( time_for.value > T(0) ) {
active_wait_until(
time::now<TimeTag, u64>() +
time_for.template cast_to<u64>());
}
}
template < typename T, typename TimeTag >
void jobber::active_wait_until(const unit<T, TimeTag>& time_until) noexcept {
while ( active_task_count_ ) {
const auto time_now = time::now<TimeTag, u64>();
if ( time_now >= time_until.template cast_to<u64>() ) {
break;
}
std::unique_lock<std::mutex> lock(tasks_mutex_);
process_task_(std::move(lock));
}
}
// //
// concrete_task<R, F, Args...> // concrete_task<R, F, Args...>
// //

View File

@@ -40,10 +40,10 @@ namespace e2d
void process_all_tasks() noexcept; void process_all_tasks() noexcept;
template < typename T, typename TimeTag > template < typename T, typename TimeTag >
void process_all_tasks_for(const unit<T, TimeTag>& time_for) noexcept; void process_tasks_for(const unit<T, TimeTag>& time_for) noexcept;
template < typename T, typename TimeTag > template < typename T, typename TimeTag >
void process_all_tasks_until(const unit<T, TimeTag>& time_until) noexcept; void process_tasks_until(const unit<T, TimeTag>& time_until) noexcept;
private: private:
class task; class task;
using task_ptr = std::unique_ptr<task>; using task_ptr = std::unique_ptr<task>;
@@ -119,17 +119,21 @@ namespace e2d
return future; return future;
} }
//
// process_all_tasks
//
template < typename T, typename TimeTag > template < typename T, typename TimeTag >
void scheduler::process_all_tasks_for(const unit<T, TimeTag>& time_for) noexcept { void scheduler::process_tasks_for(const unit<T, TimeTag>& time_for) noexcept {
if ( time_for.value > T(0) ) { if ( time_for.value > T(0) ) {
process_all_tasks_until( process_tasks_until(
time::now<TimeTag, u64>() + time::now<TimeTag, u64>() +
time_for.template cast_to<u64>()); time_for.template cast_to<u64>());
} }
} }
template < typename T, typename TimeTag > template < typename T, typename TimeTag >
void scheduler::process_all_tasks_until(const unit<T, TimeTag>& time_until) noexcept { void scheduler::process_tasks_until(const unit<T, TimeTag>& time_until) noexcept {
while ( active_task_count_ ) { while ( active_task_count_ ) {
const auto time_now = time::now<TimeTag, u64>(); const auto time_now = time::now<TimeTag, u64>();
if ( time_now >= time_until.template cast_to<u64>() ) { if ( time_now >= time_until.template cast_to<u64>() ) {

View File

@@ -69,6 +69,35 @@ TEST_CASE("jobber") {
j.resume(); j.resume();
j.wait_all(); j.wait_all();
} }
{
jobber j(1);
i32 counter = 0;
j.pause();
for ( std::size_t i = 0; i < 10; ++i ) {
j.async([&counter](){
++counter;
std::this_thread::sleep_for(std::chrono::milliseconds(15));
});
}
const auto b = time::now_ms();
j.wait_for(make_milliseconds(15));
REQUIRE(time::now_ms() - b > make_milliseconds<i64>(10));
REQUIRE(counter == 0);
j.wait_until(time::now_ms() + make_milliseconds<i64>(15));
REQUIRE(time::now_ms() - b > make_milliseconds<i64>(20));
REQUIRE(counter == 0);
j.active_wait_for(make_milliseconds(60));
REQUIRE(time::now_ms() - b > make_milliseconds<i64>(70));
REQUIRE(counter > 2);
REQUIRE(counter < 10);
j.active_wait_until(time::now_s() + make_seconds<i64>(1));
REQUIRE(counter == 10);
}
{ {
jobber j(2); jobber j(2);
jobber g(2); jobber g(2);

View File

@@ -29,13 +29,13 @@ TEST_CASE("scheduler") {
std::this_thread::sleep_for(std::chrono::milliseconds(15)); std::this_thread::sleep_for(std::chrono::milliseconds(15));
}); });
} }
s.process_all_tasks_for(make_milliseconds(-1)); s.process_tasks_for(make_milliseconds(-1));
s.process_all_tasks_for(make_milliseconds(0)); s.process_tasks_for(make_milliseconds(0));
REQUIRE(counter == 0); REQUIRE(counter == 0);
s.process_all_tasks_for(make_milliseconds(60)); s.process_tasks_for(make_milliseconds(60));
REQUIRE(counter > 2); REQUIRE(counter > 2);
REQUIRE(counter < 10); REQUIRE(counter < 10);
s.process_all_tasks_for(make_seconds(1)); s.process_tasks_for(make_seconds(1));
REQUIRE(counter == 10); REQUIRE(counter == 10);
} }
{ {
@@ -47,13 +47,13 @@ TEST_CASE("scheduler") {
std::this_thread::sleep_for(std::chrono::milliseconds(15)); std::this_thread::sleep_for(std::chrono::milliseconds(15));
}); });
} }
s.process_all_tasks_until(time::now_ms() - make_milliseconds<i64>(1)); s.process_tasks_until(time::now_ms() - make_milliseconds<i64>(1));
s.process_all_tasks_until(time::now_ms()); s.process_tasks_until(time::now_ms());
REQUIRE(counter == 0); REQUIRE(counter == 0);
s.process_all_tasks_until(time::now_ms() + make_milliseconds<i64>(60)); s.process_tasks_until(time::now_ms() + make_milliseconds<i64>(60));
REQUIRE(counter > 2); REQUIRE(counter > 2);
REQUIRE(counter < 10); REQUIRE(counter < 10);
s.process_all_tasks_until(time::now_s() + make_seconds<i64>(1)); s.process_tasks_until(time::now_s() + make_seconds<i64>(1));
REQUIRE(counter == 10); REQUIRE(counter == 10);
} }
{ {