diff --git a/headers/curly.hpp/curly.hpp b/headers/curly.hpp/curly.hpp index 39282eb..3a5e4e1 100644 --- a/headers/curly.hpp/curly.hpp +++ b/headers/curly.hpp/curly.hpp @@ -417,5 +417,8 @@ namespace curly_hpp { void perform(); void wait_activity(time_ms_t ms); + void cancel_all_pending_requests(); + std::vector get_all_pending_requests(); + void get_all_pending_requests(std::vector& dst); } diff --git a/sources/curly.hpp/curly.cpp b/sources/curly.hpp/curly.cpp index 132d619..213ce54 100644 --- a/sources/curly.hpp/curly.cpp +++ b/sources/curly.hpp/curly.cpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #include @@ -109,37 +109,32 @@ namespace template < typename T > class mt_queue final { public: - void enqueue(T&& v) { + template < typename U > + void enqueue(U&& u) { std::lock_guard guard(mutex_); - queue_.push(std::move(v)); - cvar_.notify_all(); - } - - void enqueue(const T& v) { - std::lock_guard guard(mutex_); - queue_.push(v); + deque_.push_back(std::forward(u)); cvar_.notify_all(); } bool try_dequeue(T& v) { std::lock_guard guard(mutex_); - if ( queue_.empty() ) { + if ( deque_.empty() ) { return false; } - v = queue_.front(); - queue_.pop(); + v = std::move(deque_.front()); + deque_.pop_front(); return true; } bool empty() const noexcept { std::lock_guard guard(mutex_); - return queue_.empty(); + return deque_.empty(); } void wait() const noexcept { std::unique_lock lock(mutex_); cvar_.wait(lock, [this](){ - return !queue_.empty(); + return !deque_.empty(); }); } @@ -147,7 +142,7 @@ namespace bool wait_for(const std::chrono::duration duration) const noexcept { std::unique_lock lock(mutex_); return cvar_.wait_for(lock, duration, [this](){ - return !queue_.empty(); + return !deque_.empty(); }); } @@ -155,11 +150,17 @@ namespace bool wait_until(const std::chrono::time_point& time) const noexcept { std::unique_lock lock(mutex_); return cvar_.wait_until(lock, time, [this](){ - return !queue_.empty(); + return !deque_.empty(); }); } + + template < typename Container > + void copy_to(Container&& container) const { + std::lock_guard guard(mutex_); + container.insert(container.end(), deque_.begin(), deque_.end()); + } private: - std::queue queue_; + std::deque deque_; private: mutable std::mutex mutex_; mutable std::condition_variable cvar_; @@ -1147,12 +1148,12 @@ namespace curly_hpp } void cancel_all_pending_requests() { + req_state_t sreq; + while ( new_handles.try_dequeue(sreq) ) { + sreq->cancel(); + sreq->call_callback(sreq); + } curl_state::with([](CURLM* curlm){ - req_state_t sreq; - while ( new_handles.try_dequeue(sreq) ) { - sreq->cancel(); - sreq->call_callback(sreq); - } for ( auto iter = active_handles.begin(); iter != active_handles.end(); ) { (*iter)->cancel(); (*iter)->dequeue(curlm); @@ -1161,4 +1162,17 @@ namespace curly_hpp } }); } + + std::vector get_all_pending_requests() { + std::vector requests; + get_all_pending_requests(requests); + return requests; + } + + void get_all_pending_requests(std::vector& dst) { + new_handles.copy_to(dst); + curl_state::with([&dst](CURLM*){ + dst.insert(dst.end(), active_handles.begin(), active_handles.end()); + }); + } } diff --git a/untests/curly_tests.cpp b/untests/curly_tests.cpp index 802cff7..53d3e37 100644 --- a/untests/curly_tests.cpp +++ b/untests/curly_tests.cpp @@ -955,6 +955,68 @@ TEST_CASE("curly/cancel_all_pending_requests") { } } +TEST_CASE("curly/get_all_pending_requests") { + SUBCASE("get new requests") { + std::atomic_size_t call_count{0u}; + + auto req1 = net::request_builder("https://httpbin.org/delay/2") + .callback([&call_count](net::request request){ + REQUIRE(request.status() == net::req_status::cancelled); + ++call_count; + }).send(); + + auto req2 = net::request_builder("https://httpbin.org/delay/2") + .callback([&call_count](net::request request){ + REQUIRE(request.status() == net::req_status::cancelled); + ++call_count; + }).send(); + + std::vector requests = net::get_all_pending_requests(); + REQUIRE(requests.size() == 2u); + + for ( net::request& req : requests ) { + req.cancel(); + } + + net::perform(); + + REQUIRE(call_count == 2u); + REQUIRE(req1.status() == net::req_status::cancelled); + REQUIRE(req2.status() == net::req_status::cancelled); + } + + SUBCASE("get active requests") { + std::atomic_size_t call_count{0u}; + + auto req1 = net::request_builder("https://httpbin.org/delay/2") + .callback([&call_count](net::request request){ + REQUIRE(request.status() == net::req_status::cancelled); + ++call_count; + }).send(); + + auto req2 = net::request_builder("https://httpbin.org/delay/2") + .callback([&call_count](net::request request){ + REQUIRE(request.status() == net::req_status::cancelled); + ++call_count; + }).send(); + + net::perform(); + + std::vector requests = net::get_all_pending_requests(); + REQUIRE(requests.size() == 2u); + + for ( net::request& req : requests ) { + req.cancel(); + } + + net::perform(); + + REQUIRE(call_count == 2u); + REQUIRE(req1.status() == net::req_status::cancelled); + REQUIRE(req2.status() == net::req_status::cancelled); + } +} + TEST_CASE("curly_examples") { net::performer performer;