Merge branch 'dev' into main

This commit is contained in:
BlackMATov
2021-02-01 11:41:22 +07:00
3 changed files with 164 additions and 17 deletions

View File

@@ -417,4 +417,8 @@ namespace curly_hpp
{
void perform();
void wait_activity(time_ms_t ms);
void cancel_all_pending_requests();
std::vector<request> get_all_pending_requests();
void get_all_pending_requests(std::vector<request>& dst);
}

View File

@@ -7,7 +7,7 @@
#include <curly.hpp/curly.hpp>
#include <mutex>
#include <queue>
#include <deque>
#include <type_traits>
#include <condition_variable>
@@ -109,37 +109,32 @@ namespace
template < typename T >
class mt_queue final {
public:
void enqueue(T&& v) {
template < typename U >
void enqueue(U&& u) {
std::lock_guard<std::mutex> guard(mutex_);
queue_.push(std::move(v));
cvar_.notify_all();
}
void enqueue(const T& v) {
std::lock_guard<std::mutex> guard(mutex_);
queue_.push(v);
deque_.push_back(std::forward<U>(u));
cvar_.notify_all();
}
bool try_dequeue(T& v) {
std::lock_guard<std::mutex> guard(mutex_);
if ( queue_.empty() ) {
if ( deque_.empty() ) {
return false;
}
v = queue_.front();
queue_.pop();
v = std::move(deque_.front());
deque_.pop_front();
return true;
}
bool empty() const noexcept {
std::lock_guard<std::mutex> guard(mutex_);
return queue_.empty();
return deque_.empty();
}
void wait() const noexcept {
std::unique_lock<std::mutex> lock(mutex_);
cvar_.wait(lock, [this](){
return !queue_.empty();
return !deque_.empty();
});
}
@@ -147,7 +142,7 @@ namespace
bool wait_for(const std::chrono::duration<Rep, Period> duration) const noexcept {
std::unique_lock<std::mutex> lock(mutex_);
return cvar_.wait_for(lock, duration, [this](){
return !queue_.empty();
return !deque_.empty();
});
}
@@ -155,11 +150,17 @@ namespace
bool wait_until(const std::chrono::time_point<Clock, Duration>& time) const noexcept {
std::unique_lock<std::mutex> lock(mutex_);
return cvar_.wait_until(lock, time, [this](){
return !queue_.empty();
return !deque_.empty();
});
}
template < typename Container >
void copy_to(Container&& container) const {
std::lock_guard<std::mutex> guard(mutex_);
container.insert(container.end(), deque_.begin(), deque_.end());
}
private:
std::queue<T> queue_;
std::deque<T> deque_;
private:
mutable std::mutex mutex_;
mutable std::condition_variable cvar_;
@@ -1071,6 +1072,10 @@ namespace curly_hpp
curl_state::with([](CURLM* curlm){
req_state_t sreq;
while ( new_handles.try_dequeue(sreq) ) {
if ( !sreq->is_pending() ) {
sreq->call_callback(sreq);
continue;
}
try {
sreq->enqueue(curlm);
active_handles.emplace_back(sreq);
@@ -1141,4 +1146,33 @@ namespace curly_hpp
}
});
}
void cancel_all_pending_requests() {
req_state_t sreq;
while ( new_handles.try_dequeue(sreq) ) {
sreq->cancel();
sreq->call_callback(sreq);
}
curl_state::with([](CURLM* curlm){
for ( auto iter = active_handles.begin(); iter != active_handles.end(); ) {
(*iter)->cancel();
(*iter)->dequeue(curlm);
(*iter)->call_callback(*iter);
iter = active_handles.erase(iter);
}
});
}
std::vector<request> get_all_pending_requests() {
std::vector<request> requests;
get_all_pending_requests(requests);
return requests;
}
void get_all_pending_requests(std::vector<request>& dst) {
new_handles.copy_to(dst);
curl_state::with([&dst](CURLM*){
dst.insert(dst.end(), active_handles.begin(), active_handles.end());
});
}
}

View File

@@ -908,6 +908,115 @@ TEST_CASE("curly") {
}
}
TEST_CASE("curly/cancel_all_pending_requests") {
SUBCASE("cancel new requests") {
std::atomic_size_t call_count{0u};
auto req1 = net::request_builder("https://httpbin.org/delay/2")
.callback([&call_count](net::request request){
REQUIRE(request.status() == net::req_status::cancelled);
++call_count;
}).send();
auto req2 = net::request_builder("https://httpbin.org/delay/2")
.callback([&call_count](net::request request){
REQUIRE(request.status() == net::req_status::cancelled);
++call_count;
}).send();
net::cancel_all_pending_requests();
REQUIRE(call_count == 2u);
REQUIRE(req1.status() == net::req_status::cancelled);
REQUIRE(req2.status() == net::req_status::cancelled);
}
SUBCASE("cancel active requests") {
std::atomic_size_t call_count{0u};
auto req1 = net::request_builder("https://httpbin.org/delay/2")
.callback([&call_count](net::request request){
REQUIRE(request.status() == net::req_status::cancelled);
++call_count;
}).send();
auto req2 = net::request_builder("https://httpbin.org/delay/2")
.callback([&call_count](net::request request){
REQUIRE(request.status() == net::req_status::cancelled);
++call_count;
}).send();
net::perform();
net::cancel_all_pending_requests();
REQUIRE(call_count == 2u);
REQUIRE(req1.status() == net::req_status::cancelled);
REQUIRE(req2.status() == net::req_status::cancelled);
}
}
TEST_CASE("curly/get_all_pending_requests") {
SUBCASE("get new requests") {
std::atomic_size_t call_count{0u};
auto req1 = net::request_builder("https://httpbin.org/delay/2")
.callback([&call_count](net::request request){
REQUIRE(request.status() == net::req_status::cancelled);
++call_count;
}).send();
auto req2 = net::request_builder("https://httpbin.org/delay/2")
.callback([&call_count](net::request request){
REQUIRE(request.status() == net::req_status::cancelled);
++call_count;
}).send();
std::vector<net::request> requests = net::get_all_pending_requests();
REQUIRE(requests.size() == 2u);
for ( net::request& req : requests ) {
req.cancel();
}
net::perform();
REQUIRE(call_count == 2u);
REQUIRE(req1.status() == net::req_status::cancelled);
REQUIRE(req2.status() == net::req_status::cancelled);
}
SUBCASE("get active requests") {
std::atomic_size_t call_count{0u};
auto req1 = net::request_builder("https://httpbin.org/delay/2")
.callback([&call_count](net::request request){
REQUIRE(request.status() == net::req_status::cancelled);
++call_count;
}).send();
auto req2 = net::request_builder("https://httpbin.org/delay/2")
.callback([&call_count](net::request request){
REQUIRE(request.status() == net::req_status::cancelled);
++call_count;
}).send();
net::perform();
std::vector<net::request> requests = net::get_all_pending_requests();
REQUIRE(requests.size() == 2u);
for ( net::request& req : requests ) {
req.cancel();
}
net::perform();
REQUIRE(call_count == 2u);
REQUIRE(req1.status() == net::req_status::cancelled);
REQUIRE(req2.status() == net::req_status::cancelled);
}
}
TEST_CASE("curly_examples") {
net::performer performer;