SW Task Event Loop Framework v1.0.0
High-performance C++ asynchronous event loop framework with timer management and promise-based programming
Loading...
Searching...
No Matches
HttpService.cpp
Go to the documentation of this file.
1#include "http/HttpService.h"
2#include <cassert>
3#include <iostream> // (Có thể bỏ nếu không cần log)
4
5namespace sw_task::net {
6
8 curl_global_init(CURL_GLOBAL_DEFAULT);
9 multi_ = curl_multi_init();
10 worker_ = std::thread([this] { runLoop(); });
11}
12
14 shutdown();
15 if (worker_.joinable()) worker_.join();
16
17 for (auto& [id, ctx] : active_) {
18 if (ctx.easy) {
19 curl_multi_remove_handle(multi_, ctx.easy);
20 curl_easy_cleanup(ctx.easy);
21 }
22 if (ctx.headerList) {
23 curl_slist_free_all(ctx.headerList);
24 }
25 }
26 curl_multi_cleanup(multi_);
27 curl_global_cleanup();
28}
29
31 bool expected = true;
32 if (running_.compare_exchange_strong(expected, false)) {
33 cv_.notify_all();
34 }
35}
36
38 std::unique_lock lk(mtx_);
39 RequestCtx ctx;
40 ctx.id = nextId_++;
41 ctx.req = std::move(req);
42 ctx.startTime = std::chrono::steady_clock::now();
43 newQueue_.push_back(std::move(ctx));
44 lk.unlock();
45 cv_.notify_one();
46 return ctx.id;
47}
48
50 std::lock_guard lk(mtx_);
51 for (auto& ctx : newQueue_) {
52 if (ctx.id == id) {
53 ctx.canceled = true;
54 return true;
55 }
56 }
57 auto it = active_.find(id);
58 if (it != active_.end()) {
59 it->second.canceled = true;
60 return true;
61 }
62 return false;
63}
64
65void HttpService::setMaxConcurrent(std::size_t n) {
66 maxConcurrent_ = n;
67}
68
69std::size_t HttpService::pending() const {
70 std::lock_guard lk(mtx_);
71 return newQueue_.size() + active_.size();
72}
73
74std::size_t HttpService::activeCount() const {
75 std::lock_guard lk(mtx_);
76 return active_.size();
77}
78
79std::size_t HttpService::queuedCount() const {
80 std::lock_guard lk(mtx_);
81 return newQueue_.size();
82}
83
84void HttpService::runLoop() {
85 while (running_) {
86 {
87 std::unique_lock lk(mtx_);
88 cv_.wait_for(lk, std::chrono::milliseconds(50), [this] {
89 return !newQueue_.empty() || !running_;
90 });
91 addNewRequestsLocked();
92 }
93 if (!running_) break;
94
95 long curlTimeoutMs = -1;
96 curl_multi_timeout(multi_, &curlTimeoutMs);
97 int waitMs;
98 if (curlTimeoutMs < 0) waitMs = 100;
99 else if (curlTimeoutMs == 0) waitMs = 0;
100 else waitMs = static_cast<int>(curlTimeoutMs);
101
102 performOnce(waitMs);
103 }
104 // Drain completions cuối vòng
105 performOnce(0);
106}
107
108void HttpService::addNewRequestsLocked() {
109 for (auto it = newQueue_.begin(); it != newQueue_.end();) {
110 if (active_.size() >= maxConcurrent_) break;
111 if (it->canceled) {
112 if (it->headerList) curl_slist_free_all(it->headerList);
113 it = newQueue_.erase(it);
114 continue;
115 }
116 RequestCtx moved = std::move(*it);
117 it = newQueue_.erase(it);
118 auto emplaced = active_.emplace(moved.id, std::move(moved));
119 RequestCtx& stableCtx = emplaced.first->second;
120 setupEasy(stableCtx);
121 curl_multi_add_handle(multi_, stableCtx.easy);
122 easyToId_[stableCtx.easy] = stableCtx.id;
123 }
124}
125
126void HttpService::setupEasy(RequestCtx& ctx) {
127 CURL* easy = curl_easy_init();
128 ctx.easy = easy;
129
130 curl_easy_setopt(easy, CURLOPT_URL, ctx.req.url.c_str());
131 curl_easy_setopt(easy, CURLOPT_FOLLOWLOCATION, 1L);
132 curl_easy_setopt(easy, CURLOPT_WRITEFUNCTION, &HttpService::writeCallback);
133 curl_easy_setopt(easy, CURLOPT_WRITEDATA, &ctx);
134 curl_easy_setopt(easy, CURLOPT_HEADERFUNCTION, &HttpService::headerCallback);
135 curl_easy_setopt(easy, CURLOPT_HEADERDATA, &ctx);
136 curl_easy_setopt(easy, CURLOPT_NOPROGRESS, 1L);
137
138 // Cho phép tự giải nén gzip/deflate
139 curl_easy_setopt(easy, CURLOPT_ACCEPT_ENCODING, "");
140
141#ifdef HTTP_SERVICE_VERBOSE
142 curl_easy_setopt(easy, CURLOPT_VERBOSE, 1L);
143#endif
144 curl_easy_setopt(easy, CURLOPT_USERAGENT, "sw_task/1.0");
145
146 if (!ctx.req.body.empty()) {
147 curl_easy_setopt(easy, CURLOPT_POSTFIELDS, ctx.req.body.c_str());
148 curl_easy_setopt(easy, CURLOPT_POSTFIELDSIZE, ctx.req.body.size());
149 }
150 if (ctx.req.timeoutMs > 0) {
151 curl_easy_setopt(easy, CURLOPT_TIMEOUT_MS, ctx.req.timeoutMs);
152 }
153
154 if (ctx.req.method != HttpMethod::Get) {
155 switch (ctx.req.method) {
156 case HttpMethod::Post: curl_easy_setopt(easy, CURLOPT_POST, 1L); break;
157 case HttpMethod::Put: curl_easy_setopt(easy, CURLOPT_CUSTOMREQUEST, "PUT"); break;
158 case HttpMethod::Delete: curl_easy_setopt(easy, CURLOPT_CUSTOMREQUEST, "DELETE"); break;
159 case HttpMethod::Patch: curl_easy_setopt(easy, CURLOPT_CUSTOMREQUEST, "PATCH"); break;
160 case HttpMethod::Head: curl_easy_setopt(easy, CURLOPT_NOBODY, 1L); break;
161 case HttpMethod::Options: curl_easy_setopt(easy, CURLOPT_CUSTOMREQUEST, "OPTIONS"); break;
162 default: break;
163 }
164 }
165
166 for (auto& h : ctx.req.headers) {
167 std::string line = h.first + ": " + h.second;
168 ctx.headerList = curl_slist_append(ctx.headerList, line.c_str());
169 }
170 if (ctx.headerList) {
171 curl_easy_setopt(easy, CURLOPT_HTTPHEADER, ctx.headerList);
172 }
173}
174
175size_t HttpService::writeCallback(char* ptr, size_t size, size_t nmemb, void* userdata) {
176 auto* ctx = static_cast<RequestCtx*>(userdata);
177 assert(ctx->easy != nullptr && "Dangling RequestCtx pointer");
178 std::size_t total = size * nmemb;
179 if (total) ctx->responseBody.append(ptr, total);
180 return total;
181}
182
183size_t HttpService::headerCallback(char* buffer, size_t size, size_t nitems, void* userdata) {
184 auto* ctx = static_cast<RequestCtx*>(userdata);
185 std::size_t len = size * nitems;
186 std::string line(buffer, len);
187 auto pos = line.find(':');
188 if (pos != std::string::npos) {
189 std::string key = line.substr(0, pos);
190 std::string val = line.substr(pos + 1);
191 while (!val.empty() && (val.front()==' ' || val.front()=='\t')) val.erase(val.begin());
192 while (!val.empty() && (val.back()=='\r' || val.back()=='\n')) val.pop_back();
193 if (!key.empty()) ctx->respHeaders[key] = val;
194 }
195 return len;
196}
197
198void HttpService::performOnce(int waitTimeoutMs) {
199 int numfds = 0;
200 curl_multi_wait(multi_, nullptr, 0, waitTimeoutMs, &numfds);
201
202 int running = 0;
203 curl_multi_perform(multi_, &running);
204 collectCompletions();
205}
206
207void HttpService::collectCompletions() {
208 int msgs = 0;
209 CURLMsg* msg;
210 while ((msg = curl_multi_info_read(multi_, &msgs))) {
211 if (msg->msg == CURLMSG_DONE) {
212 CURL* easy = msg->easy_handle;
213 auto idIt = easyToId_.find(easy);
214 if (idIt == easyToId_.end()) continue;
215 RequestId id = idIt->second;
216
217 std::lock_guard lk(mtx_);
218 auto ctxIt = active_.find(id);
219 if (ctxIt != active_.end()) {
220 finishRequest(ctxIt->second, msg->data.result);
221 cleanupEasy(ctxIt->second);
222 active_.erase(ctxIt);
223 }
224 easyToId_.erase(easy);
225 }
226 }
227}
228
229void HttpService::finishRequest(RequestCtx& ctx, CURLcode result) {
230 HttpResponse resp;
231 resp.elapsedMs = std::chrono::duration<double, std::milli>(
232 std::chrono::steady_clock::now() - ctx.startTime).count();
233
234 long code = 0;
235 curl_easy_getinfo(ctx.easy, CURLINFO_RESPONSE_CODE, &code);
236 resp.statusCode = code;
237 char* eff = nullptr;
238 curl_easy_getinfo(ctx.easy, CURLINFO_EFFECTIVE_URL, &eff);
239 if (eff) resp.effectiveUrl = eff;
240 resp.body = std::move(ctx.responseBody);
241 resp.headers = std::move(ctx.respHeaders);
242
243 if (ctx.canceled) {
244 resp.error = HttpError{HttpErrorCode::Canceled, "Canceled"};
245 } else if (result != CURLE_OK) {
247 switch (result) {
248 case CURLE_OPERATION_TIMEDOUT: ec = HttpErrorCode::Timeout; break;
249 case CURLE_COULDNT_RESOLVE_HOST: ec = HttpErrorCode::Resolve; break;
250 case CURLE_SSL_CONNECT_ERROR: ec = HttpErrorCode::SSL; break;
251 default: break;
252 }
253 resp.error = HttpError{ec, curl_easy_strerror(result)};
254 }
255
256 if (ctx.req.onComplete) {
257 try {
258 ctx.req.onComplete(resp);
259 } catch (...) {
260 // Nuốt exception để không làm vỡ worker loop.
261 }
262 }
263}
264
265void HttpService::cleanupEasy(RequestCtx& ctx) {
266 curl_multi_remove_handle(multi_, ctx.easy);
267 curl_easy_cleanup(ctx.easy);
268 ctx.easy = nullptr;
269 if (ctx.headerList) {
270 curl_slist_free_all(ctx.headerList);
271 ctx.headerList = nullptr;
272 }
273}
274
275} // namespace sw_task::net
void setMaxConcurrent(std::size_t n)
bool cancel(RequestId id)
std::size_t pending() const
RequestId submit(HttpRequest req)
std::size_t queuedCount() const
std::size_t activeCount() const