8 curl_global_init(CURL_GLOBAL_DEFAULT);
9 multi_ = curl_multi_init();
10 worker_ = std::thread([
this] { runLoop(); });
15 if (worker_.joinable()) worker_.join();
17 for (
auto& [
id, ctx] : active_) {
19 curl_multi_remove_handle(multi_, ctx.easy);
20 curl_easy_cleanup(ctx.easy);
23 curl_slist_free_all(ctx.headerList);
26 curl_multi_cleanup(multi_);
27 curl_global_cleanup();
32 if (running_.compare_exchange_strong(expected,
false)) {
38 std::unique_lock lk(mtx_);
41 ctx.req = std::move(req);
42 ctx.startTime = std::chrono::steady_clock::now();
43 newQueue_.push_back(std::move(ctx));
50 std::lock_guard lk(mtx_);
51 for (
auto& ctx : newQueue_) {
57 auto it = active_.find(
id);
58 if (it != active_.end()) {
59 it->second.canceled =
true;
70 std::lock_guard lk(mtx_);
71 return newQueue_.size() + active_.size();
75 std::lock_guard lk(mtx_);
76 return active_.size();
80 std::lock_guard lk(mtx_);
81 return newQueue_.size();
84void HttpService::runLoop() {
87 std::unique_lock lk(mtx_);
88 cv_.wait_for(lk, std::chrono::milliseconds(50), [
this] {
89 return !newQueue_.empty() || !running_;
91 addNewRequestsLocked();
95 long curlTimeoutMs = -1;
96 curl_multi_timeout(multi_, &curlTimeoutMs);
98 if (curlTimeoutMs < 0) waitMs = 100;
99 else if (curlTimeoutMs == 0) waitMs = 0;
100 else waitMs =
static_cast<int>(curlTimeoutMs);
108void HttpService::addNewRequestsLocked() {
109 for (
auto it = newQueue_.begin(); it != newQueue_.end();) {
110 if (active_.size() >= maxConcurrent_)
break;
112 if (it->headerList) curl_slist_free_all(it->headerList);
113 it = newQueue_.erase(it);
116 RequestCtx moved = std::move(*it);
117 it = newQueue_.erase(it);
118 auto emplaced = active_.emplace(moved.id, std::move(moved));
119 RequestCtx& stableCtx = emplaced.first->second;
120 setupEasy(stableCtx);
121 curl_multi_add_handle(multi_, stableCtx.easy);
122 easyToId_[stableCtx.easy] = stableCtx.id;
126void HttpService::setupEasy(RequestCtx& ctx) {
127 CURL* easy = curl_easy_init();
130 curl_easy_setopt(easy, CURLOPT_URL, ctx.req.url.c_str());
131 curl_easy_setopt(easy, CURLOPT_FOLLOWLOCATION, 1L);
132 curl_easy_setopt(easy, CURLOPT_WRITEFUNCTION, &HttpService::writeCallback);
133 curl_easy_setopt(easy, CURLOPT_WRITEDATA, &ctx);
134 curl_easy_setopt(easy, CURLOPT_HEADERFUNCTION, &HttpService::headerCallback);
135 curl_easy_setopt(easy, CURLOPT_HEADERDATA, &ctx);
136 curl_easy_setopt(easy, CURLOPT_NOPROGRESS, 1L);
139 curl_easy_setopt(easy, CURLOPT_ACCEPT_ENCODING,
"");
141#ifdef HTTP_SERVICE_VERBOSE
142 curl_easy_setopt(easy, CURLOPT_VERBOSE, 1L);
144 curl_easy_setopt(easy, CURLOPT_USERAGENT,
"sw_task/1.0");
146 if (!ctx.req.body.empty()) {
147 curl_easy_setopt(easy, CURLOPT_POSTFIELDS, ctx.req.body.c_str());
148 curl_easy_setopt(easy, CURLOPT_POSTFIELDSIZE, ctx.req.body.size());
150 if (ctx.req.timeoutMs > 0) {
151 curl_easy_setopt(easy, CURLOPT_TIMEOUT_MS, ctx.req.timeoutMs);
155 switch (ctx.req.method) {
157 case HttpMethod::Put: curl_easy_setopt(easy, CURLOPT_CUSTOMREQUEST,
"PUT");
break;
158 case HttpMethod::Delete: curl_easy_setopt(easy, CURLOPT_CUSTOMREQUEST,
"DELETE");
break;
159 case HttpMethod::Patch: curl_easy_setopt(easy, CURLOPT_CUSTOMREQUEST,
"PATCH");
break;
166 for (
auto& h : ctx.req.headers) {
167 std::string line = h.first +
": " + h.second;
168 ctx.headerList = curl_slist_append(ctx.headerList, line.c_str());
170 if (ctx.headerList) {
171 curl_easy_setopt(easy, CURLOPT_HTTPHEADER, ctx.headerList);
175size_t HttpService::writeCallback(
char* ptr,
size_t size,
size_t nmemb,
void* userdata) {
176 auto* ctx =
static_cast<RequestCtx*
>(userdata);
177 assert(ctx->easy !=
nullptr &&
"Dangling RequestCtx pointer");
178 std::size_t total = size * nmemb;
179 if (total) ctx->responseBody.append(ptr, total);
183size_t HttpService::headerCallback(
char* buffer,
size_t size,
size_t nitems,
void* userdata) {
184 auto* ctx =
static_cast<RequestCtx*
>(userdata);
185 std::size_t len = size * nitems;
186 std::string line(buffer, len);
187 auto pos = line.find(
':');
188 if (pos != std::string::npos) {
189 std::string key = line.substr(0, pos);
190 std::string val = line.substr(pos + 1);
191 while (!val.empty() && (val.front()==
' ' || val.front()==
'\t')) val.erase(val.begin());
192 while (!val.empty() && (val.back()==
'\r' || val.back()==
'\n')) val.pop_back();
193 if (!key.empty()) ctx->respHeaders[key] = val;
198void HttpService::performOnce(
int waitTimeoutMs) {
200 curl_multi_wait(multi_,
nullptr, 0, waitTimeoutMs, &numfds);
203 curl_multi_perform(multi_, &running);
204 collectCompletions();
207void HttpService::collectCompletions() {
210 while ((msg = curl_multi_info_read(multi_, &msgs))) {
211 if (msg->msg == CURLMSG_DONE) {
212 CURL* easy = msg->easy_handle;
213 auto idIt = easyToId_.find(easy);
214 if (idIt == easyToId_.end())
continue;
217 std::lock_guard lk(mtx_);
218 auto ctxIt = active_.find(
id);
219 if (ctxIt != active_.end()) {
220 finishRequest(ctxIt->second, msg->data.result);
221 cleanupEasy(ctxIt->second);
222 active_.erase(ctxIt);
224 easyToId_.erase(easy);
229void HttpService::finishRequest(RequestCtx& ctx, CURLcode result) {
231 resp.elapsedMs = std::chrono::duration<double, std::milli>(
232 std::chrono::steady_clock::now() - ctx.startTime).count();
235 curl_easy_getinfo(ctx.easy, CURLINFO_RESPONSE_CODE, &code);
236 resp.statusCode = code;
238 curl_easy_getinfo(ctx.easy, CURLINFO_EFFECTIVE_URL, &eff);
239 if (eff) resp.effectiveUrl = eff;
240 resp.body = std::move(ctx.responseBody);
241 resp.headers = std::move(ctx.respHeaders);
245 }
else if (result != CURLE_OK) {
253 resp.error = HttpError{ec, curl_easy_strerror(result)};
256 if (ctx.req.onComplete) {
258 ctx.req.onComplete(resp);
265void HttpService::cleanupEasy(RequestCtx& ctx) {
266 curl_multi_remove_handle(multi_, ctx.easy);
267 curl_easy_cleanup(ctx.easy);
269 if (ctx.headerList) {
270 curl_slist_free_all(ctx.headerList);
271 ctx.headerList =
nullptr;
void setMaxConcurrent(std::size_t n)
bool cancel(RequestId id)
std::size_t pending() const
RequestId submit(HttpRequest req)
std::size_t queuedCount() const
std::size_t activeCount() const