minimized waiting on send

This commit is contained in:
2019-06-30 08:37:20 +07:00
parent 1a032caf2a
commit 51baed728b

View File

@@ -50,11 +50,11 @@ namespace
default_uploader(const data_t* src, std::mutex* m) noexcept
: data_(*src)
, mutex_(*m) {}
, mutex_(*m)
, size_(src->size()) {}
std::size_t size() const override {
std::lock_guard<std::mutex> guard(mutex_);
return data_.size();
return size_.load();
}
std::size_t read(char* dst, std::size_t size) override {
@@ -68,6 +68,7 @@ namespace
const data_t& data_;
std::mutex& mutex_;
std::size_t uploaded_{0};
const std::atomic_size_t size_{0};
};
class default_downloader final : public download_handler {
@@ -118,9 +119,14 @@ namespace
return true;
}
void wait_content_for(time_ms_t ms) const noexcept {
bool empty() const noexcept {
std::lock_guard<std::mutex> guard(mutex_);
return queue_.empty();
}
bool wait_content_for(time_ms_t ms) const noexcept {
std::unique_lock<std::mutex> lock(mutex_);
cvar_.wait_for(lock, ms, [this](){
return cvar_.wait_for(lock, ms, [this](){
return !queue_.empty();
});
}
@@ -173,6 +179,7 @@ namespace
using req_state_t = std::shared_ptr<request::internal_state>;
std::vector<req_state_t> active_handles;
mt_queue<req_state_t> new_handles;
class curl_state final {
public:
@@ -309,20 +316,31 @@ namespace curly_hpp
{
class request::internal_state final {
public:
internal_state(curlh_t curlh, request_builder&& rb)
: hlist_(make_header_slist(rb.headers()))
, curlh_(std::move(curlh))
, content_(std::move(rb.content()))
, uploader_(std::move(rb.uploader()))
, downloader_(std::move(rb.downloader()))
internal_state(request_builder&& rb)
: breq_(std::move(rb))
{
if ( !uploader_ ) {
uploader_ = std::make_unique<default_uploader>(&content_.data(), &mutex_);
if ( !breq_.uploader() ) {
breq_.uploader<default_uploader>(&breq_.content().data(), &mutex_);
}
if ( !downloader_ ) {
downloader_ = std::make_unique<default_downloader>(&response_content_, &mutex_);
if ( !breq_.downloader() ) {
breq_.downloader<default_downloader>(&response_content_, &mutex_);
}
}
void enqueue(CURLM* curlm) {
std::lock_guard<std::mutex> guard(mutex_);
assert(!curlh_);
curlh_ = curlh_t{
curl_easy_init(),
&curl_easy_cleanup};
if ( !curlh_ ) {
throw exception("curly_hpp: failed to curl_easy_init");
}
hlist_ = make_header_slist(breq_.headers());
if ( const auto* vi = curl_version_info(CURLVERSION_NOW); vi && vi->version ) {
std::string user_agent("cURL/");
@@ -345,15 +363,15 @@ namespace curly_hpp
curl_easy_setopt(curlh_.get(), CURLOPT_HEADERDATA, this);
curl_easy_setopt(curlh_.get(), CURLOPT_HEADERFUNCTION, &s_header_callback_);
curl_easy_setopt(curlh_.get(), CURLOPT_URL, rb.url().c_str());
curl_easy_setopt(curlh_.get(), CURLOPT_URL, breq_.url().c_str());
curl_easy_setopt(curlh_.get(), CURLOPT_HTTPHEADER, hlist_.get());
curl_easy_setopt(curlh_.get(), CURLOPT_VERBOSE, rb.verbose() ? 1l : 0l);
curl_easy_setopt(curlh_.get(), CURLOPT_VERBOSE, breq_.verbose() ? 1l : 0l);
switch ( rb.method() ) {
switch ( breq_.method() ) {
case methods::put:
curl_easy_setopt(curlh_.get(), CURLOPT_UPLOAD, 1l);
curl_easy_setopt(curlh_.get(), CURLOPT_INFILESIZE_LARGE,
static_cast<curl_off_t>(uploader_->size()));
static_cast<curl_off_t>(breq_.uploader()->size()));
break;
case methods::get:
curl_easy_setopt(curlh_.get(), CURLOPT_HTTPGET, 1l);
@@ -364,13 +382,13 @@ namespace curly_hpp
case methods::post:
curl_easy_setopt(curlh_.get(), CURLOPT_POST, 1l);
curl_easy_setopt(curlh_.get(), CURLOPT_POSTFIELDSIZE_LARGE,
static_cast<curl_off_t>(uploader_->size()));
static_cast<curl_off_t>(breq_.uploader()->size()));
break;
default:
throw exception("curly_hpp: unexpected request method");
}
if ( rb.verification() ) {
if ( breq_.verification() ) {
curl_easy_setopt(curlh_.get(), CURLOPT_SSL_VERIFYPEER, 1l);
curl_easy_setopt(curlh_.get(), CURLOPT_SSL_VERIFYHOST, 2l);
} else {
@@ -378,22 +396,34 @@ namespace curly_hpp
curl_easy_setopt(curlh_.get(), CURLOPT_SSL_VERIFYHOST, 0l);
}
if ( rb.redirections() ) {
if ( breq_.redirections() ) {
curl_easy_setopt(curlh_.get(), CURLOPT_FOLLOWLOCATION, 1l);
curl_easy_setopt(curlh_.get(), CURLOPT_MAXREDIRS,
static_cast<long>(rb.redirections()));
static_cast<long>(breq_.redirections()));
} else {
curl_easy_setopt(curlh_.get(), CURLOPT_FOLLOWLOCATION, 0l);
}
curl_easy_setopt(curlh_.get(), CURLOPT_TIMEOUT,
static_cast<long>(std::max(time_sec_t(1), rb.request_timeout()).count()));
static_cast<long>(std::max(time_sec_t(1), breq_.request_timeout()).count()));
curl_easy_setopt(curlh_.get(), CURLOPT_CONNECTTIMEOUT,
static_cast<long>(std::max(time_sec_t(1), rb.connection_timeout()).count()));
static_cast<long>(std::max(time_sec_t(1), breq_.connection_timeout()).count()));
last_response_ = time_point_t::clock::now();
response_timeout_ = std::max(time_sec_t(1), rb.response_timeout());
response_timeout_ = std::max(time_sec_t(1), breq_.response_timeout());
if ( CURLM_OK != curl_multi_add_handle(curlm, curlh_.get()) ) {
throw exception("curly_hpp: failed to curl_multi_add_handle");
}
}
void dequeue(CURLM* curlm) noexcept {
std::lock_guard<std::mutex> guard(mutex_);
if ( curlh_ ) {
curl_multi_remove_handle(curlm, curlh_.get());
curlh_.reset();
}
}
bool done() noexcept {
@@ -417,8 +447,8 @@ namespace curly_hpp
response_ = response(static_cast<response_code_t>(code));
response_.content = std::move(response_content_);
response_.headers = std::move(response_headers_);
response_.uploader = std::move(uploader_);
response_.downloader = std::move(downloader_);
response_.uploader = std::move(breq_.uploader());
response_.downloader = std::move(breq_.downloader());
} catch (...) {
status_ = statuses::failed;
cvar_.notify_all();
@@ -518,10 +548,6 @@ namespace curly_hpp
return error_;
}
const curlh_t& curlh() const noexcept {
return curlh_;
}
bool check_response_timeout(time_point_t now) const noexcept {
std::lock_guard<std::mutex> guard(mutex_);
return now - last_response_ >= response_timeout_;
@@ -555,8 +581,8 @@ namespace curly_hpp
std::size_t upload_callback_(char* dst, std::size_t size) noexcept {
try {
size = std::min(size, uploader_->size() - uploaded_.load());
const std::size_t read_bytes = uploader_->read(dst, size);
size = std::min(size, breq_.uploader()->size() - uploaded_.load());
const std::size_t read_bytes = breq_.uploader()->read(dst, size);
uploaded_.fetch_add(read_bytes);
return read_bytes;
} catch (...) {
@@ -566,7 +592,7 @@ namespace curly_hpp
std::size_t download_callback_(const char* src, std::size_t size) noexcept {
try {
const std::size_t written_bytes = downloader_->write(src, size);
const std::size_t written_bytes = breq_.downloader()->write(src, size);
downloaded_.fetch_add(written_bytes);
return written_bytes;
} catch (...) {
@@ -597,12 +623,9 @@ namespace curly_hpp
}
}
private:
slist_t hlist_;
curlh_t curlh_;
content_t content_;
uploader_uptr uploader_;
downloader_uptr downloader_;
private:
request_builder breq_;
curlh_t curlh_{nullptr, &curl_easy_cleanup};
slist_t hlist_{nullptr, &curl_slist_free_all};
time_point_t last_response_;
time_point_t::duration response_timeout_;
private:
@@ -801,32 +824,9 @@ namespace curly_hpp
}
request request_builder::send() {
return curl_state::with([this](CURLM* curlm){
curlh_t curlh{
curl_easy_init(),
&curl_easy_cleanup};
if ( !curlh ) {
throw exception("curly_hpp: failed to curl_easy_init");
}
auto sreq = std::make_shared<request::internal_state>(
std::move(curlh),
std::move(*this));
if ( CURLM_OK != curl_multi_add_handle(curlm, sreq->curlh().get()) ) {
throw exception("curly_hpp: failed to curl_multi_add_handle");
}
try {
active_handles.emplace_back(sreq);
} catch (...) {
curl_multi_remove_handle(curlm, sreq->curlh().get());
throw;
}
return request(sreq);
});
auto sreq = std::make_shared<request::internal_state>(std::move(*this));
new_handles.enqueue(sreq);
return request(sreq);
}
}
@@ -872,6 +872,19 @@ namespace curly_hpp
namespace curly_hpp
{
void perform() {
curl_state::with([](CURLM* curlm){
req_state_t sreq;
while ( new_handles.try_dequeue(sreq) ) {
try {
sreq->enqueue(curlm);
active_handles.emplace_back(sreq);
} catch (...) {
sreq->fail(CURLcode::CURLE_FAILED_INIT);
sreq->dequeue(curlm);
}
}
});
curl_state::with([](CURLM* curlm){
int running_handles = 0;
if ( CURLM_OK != curl_multi_perform(curlm, &running_handles) ) {
@@ -904,10 +917,12 @@ namespace curly_hpp
sreq->fail(CURLE_OPERATION_TIMEDOUT);
}
}
});
curl_state::with([](CURLM* curlm){
for ( auto iter = active_handles.begin(); iter != active_handles.end(); ) {
if ( (*iter)->is_ready() ) {
curl_multi_remove_handle(curlm, (*iter)->curlh().get());
(*iter)->dequeue(curlm);
iter = active_handles.erase(iter);
} else {
++iter;
@@ -918,9 +933,13 @@ namespace curly_hpp
void wait_activity(time_ms_t ms) {
curl_state::with([ms](CURLM* curlm){
const int timeout_ms = static_cast<int>(ms.count());
if ( CURLM_OK != curl_multi_wait(curlm, nullptr, 0, timeout_ms, nullptr) ) {
throw exception("curly_hpp: failed to curl_multi_wait");
if ( active_handles.empty() ) {
new_handles.wait_content_for(ms);
} else if ( new_handles.empty() ) {
const int timeout_ms = static_cast<int>(ms.count());
if ( CURLM_OK != curl_multi_wait(curlm, nullptr, 0, timeout_ms, nullptr) ) {
throw exception("curly_hpp: failed to curl_multi_wait");
}
}
});
}