Merge pull request #16 from BlackMATov/dev

Dev
This commit is contained in:
BlackMat MATov
2018-12-22 04:05:34 +07:00
committed by GitHub
2 changed files with 94 additions and 19 deletions

View File

@@ -53,6 +53,10 @@ namespace jobber_hpp
explicit jobber(std::size_t threads);
~jobber() noexcept;
using active_wait_result_t = std::pair<
jobber_wait_status,
std::size_t>;
template < typename F, typename... Args >
using async_invoke_result_t = invoke_hpp::invoke_result_t<
std::decay_t<F>,
@@ -71,7 +75,8 @@ namespace jobber_hpp
bool is_paused() const noexcept;
jobber_wait_status wait_all() const noexcept;
jobber_wait_status active_wait_all() noexcept;
active_wait_result_t active_wait_all() noexcept;
active_wait_result_t active_wait_one() noexcept;
template < typename Rep, typename Period >
jobber_wait_status wait_all_for(
@@ -82,11 +87,11 @@ namespace jobber_hpp
const std::chrono::time_point<Clock, Duration>& timeout_time) const;
template < typename Rep, typename Period >
jobber_wait_status active_wait_all_for(
active_wait_result_t active_wait_all_for(
const std::chrono::duration<Rep, Period>& timeout_duration);
template < typename Clock, typename Duration >
jobber_wait_status active_wait_all_until(
active_wait_result_t active_wait_all_until(
const std::chrono::time_point<Clock, Duration>& timeout_time);
private:
class task;
@@ -210,7 +215,8 @@ namespace jobber_hpp
: jobber_wait_status::no_timeout;
}
inline jobber_wait_status jobber::active_wait_all() noexcept {
inline jobber::active_wait_result_t jobber::active_wait_all() noexcept {
std::size_t processed_tasks = 0;
while ( !cancelled_ && active_task_count_ ) {
std::unique_lock<std::mutex> lock(tasks_mutex_);
cond_var_.wait(lock, [this](){
@@ -218,11 +224,26 @@ namespace jobber_hpp
});
if ( !tasks_.empty() ) {
process_task_(std::move(lock));
++processed_tasks;
}
}
return cancelled_
? jobber_wait_status::cancelled
: jobber_wait_status::no_timeout;
return std::make_pair(
cancelled_
? jobber_wait_status::cancelled
: jobber_wait_status::no_timeout,
processed_tasks);
}
inline jobber::active_wait_result_t jobber::active_wait_one() noexcept {
std::unique_lock<std::mutex> lock(tasks_mutex_);
if ( cancelled_ ) {
return std::make_pair(jobber_wait_status::cancelled, 0u);
}
if ( tasks_.empty() ) {
return std::make_pair(jobber_wait_status::no_timeout, 0u);
}
process_task_(std::move(lock));
return std::make_pair(jobber_wait_status::no_timeout, 1u);
}
template < typename Rep, typename Period >
@@ -245,7 +266,7 @@ namespace jobber_hpp
}
template < typename Rep, typename Period >
jobber_wait_status jobber::active_wait_all_for(
jobber::active_wait_result_t jobber::active_wait_all_for(
const std::chrono::duration<Rep, Period>& timeout_duration)
{
return active_wait_all_until(
@@ -253,12 +274,15 @@ namespace jobber_hpp
}
template < typename Clock, typename Duration >
jobber_wait_status jobber::active_wait_all_until(
jobber::active_wait_result_t jobber::active_wait_all_until(
const std::chrono::time_point<Clock, Duration>& timeout_time)
{
std::size_t processed_tasks = 0;
while ( !cancelled_ && active_task_count_ ) {
if ( !(Clock::now() < timeout_time) ) {
return jobber_wait_status::timeout;
return std::make_pair(
jobber_wait_status::timeout,
processed_tasks);
}
std::unique_lock<std::mutex> lock(tasks_mutex_);
cond_var_.wait_until(lock, timeout_time, [this](){
@@ -266,11 +290,14 @@ namespace jobber_hpp
});
if ( !tasks_.empty() ) {
process_task_(std::move(lock));
++processed_tasks;
}
}
return cancelled_
? jobber_wait_status::cancelled
: jobber_wait_status::no_timeout;
return std::make_pair(
cancelled_
? jobber_wait_status::cancelled
: jobber_wait_status::no_timeout,
processed_tasks);
}
inline void jobber::push_task_(jobber_priority priority, task_ptr task) {

View File

@@ -117,6 +117,45 @@ TEST_CASE("jobber") {
j.active_wait_all();
REQUIRE(counter == 10);
}
{
jb::jobber j(1);
std::atomic<int> counter = ATOMIC_VAR_INIT(0);
j.pause();
for ( std::size_t i = 0; i < 3; ++i ) {
j.async([&counter](){
++counter;
});
}
REQUIRE(counter == 0);
{
auto r = j.active_wait_one();
REQUIRE(r.first == jb::jobber_wait_status::no_timeout);
REQUIRE(r.second == 1);
}
REQUIRE(counter == 1);
{
auto r = j.active_wait_one();
REQUIRE(r.first == jb::jobber_wait_status::no_timeout);
REQUIRE(r.second == 1);
}
REQUIRE(counter == 2);
{
auto r = j.active_wait_one();
REQUIRE(r.first == jb::jobber_wait_status::no_timeout);
REQUIRE(r.second == 1);
}
REQUIRE(counter == 3);
{
auto r = j.active_wait_one();
REQUIRE(r.first == jb::jobber_wait_status::no_timeout);
REQUIRE(r.second == 0);
}
REQUIRE(counter == 3);
j.resume();
REQUIRE(j.wait_all() == jb::jobber_wait_status::no_timeout);
REQUIRE(j.active_wait_one().first == jb::jobber_wait_status::no_timeout);
REQUIRE(counter == 3);
}
{
jb::jobber j(1);
@@ -126,16 +165,16 @@ TEST_CASE("jobber") {
REQUIRE(jb::jobber_wait_status::no_timeout == j.wait_all_for(std::chrono::milliseconds(-1)));
REQUIRE(jb::jobber_wait_status::no_timeout == j.wait_all_until(time_now() + std::chrono::milliseconds(-1)));
REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_for(std::chrono::milliseconds(-1)));
REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_until(time_now() + std::chrono::milliseconds(-1)));
REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_for(std::chrono::milliseconds(-1)).first);
REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_until(time_now() + std::chrono::milliseconds(-1)).first);
j.pause();
j.async([]{});
REQUIRE(jb::jobber_wait_status::timeout == j.wait_all_for(std::chrono::milliseconds(-1)));
REQUIRE(jb::jobber_wait_status::timeout == j.wait_all_until(time_now() + std::chrono::milliseconds(-1)));
REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_for(std::chrono::milliseconds(-1)));
REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_until(time_now() + std::chrono::milliseconds(-1)));
REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_for(std::chrono::milliseconds(-1)).first);
REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_until(time_now() + std::chrono::milliseconds(-1)).first);
}
{
jb::jobber j(1);
@@ -244,9 +283,18 @@ TEST_CASE("jobber") {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
});
}
REQUIRE(jb::jobber_wait_status::timeout == j.active_wait_all_for(std::chrono::milliseconds(50)));
{
auto r = j.active_wait_all_for(std::chrono::milliseconds(50));
REQUIRE(jb::jobber_wait_status::timeout == r.first);
REQUIRE(r.second > 0);
}
REQUIRE(counter > 0);
REQUIRE(jb::jobber_wait_status::no_timeout == j.active_wait_all_for(std::chrono::seconds(5)));
{
auto r = j.active_wait_all_for(std::chrono::seconds(3));
REQUIRE(jb::jobber_wait_status::no_timeout == r.first);
REQUIRE(r.second > 0);
REQUIRE(r.second < 30);
}
REQUIRE(counter == 30);
}
{