Line data Source code
1 : /*
2 : Minetest
3 : Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4 :
5 : This program is free software; you can redistribute it and/or modify
6 : it under the terms of the GNU Lesser General Public License as published by
7 : the Free Software Foundation; either version 2.1 of the License, or
8 : (at your option) any later version.
9 :
10 : This program is distributed in the hope that it will be useful,
11 : but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : GNU Lesser General Public License for more details.
14 :
15 : You should have received a copy of the GNU Lesser General Public License along
16 : with this program; if not, write to the Free Software Foundation, Inc.,
17 : 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include "socket.h" // for select()
21 : #include "porting.h" // for sleep_ms(), get_sysinfo()
22 : #include "httpfetch.h"
23 : #include <iostream>
24 : #include <sstream>
25 : #include <list>
26 : #include <map>
27 : #include <errno.h>
28 : #include "jthread/jevent.h"
29 : #include "config.h"
30 : #include "exceptions.h"
31 : #include "debug.h"
32 : #include "log.h"
33 : #include "util/container.h"
34 : #include "util/thread.h"
35 : #include "version.h"
36 : #include "settings.h"
37 :
38 1 : JMutex g_httpfetch_mutex;
39 1 : std::map<unsigned long, std::queue<HTTPFetchResult> > g_httpfetch_results;
40 :
41 1 : HTTPFetchRequest::HTTPFetchRequest()
42 : {
43 1 : url = "";
44 1 : caller = HTTPFETCH_DISCARD;
45 1 : request_id = 0;
46 1 : timeout = g_settings->getS32("curl_timeout");
47 1 : connect_timeout = timeout;
48 1 : multipart = false;
49 :
50 1 : useragent = std::string(PROJECT_NAME_C "/") + g_version_hash + " (" + porting::get_sysinfo() + ")";
51 1 : }
52 :
53 :
54 0 : static void httpfetch_deliver_result(const HTTPFetchResult &fetch_result)
55 : {
56 0 : unsigned long caller = fetch_result.caller;
57 0 : if (caller != HTTPFETCH_DISCARD) {
58 0 : JMutexAutoLock lock(g_httpfetch_mutex);
59 0 : g_httpfetch_results[caller].push(fetch_result);
60 : }
61 0 : }
62 :
63 : static void httpfetch_request_clear(unsigned long caller);
64 :
65 0 : unsigned long httpfetch_caller_alloc()
66 : {
67 0 : JMutexAutoLock lock(g_httpfetch_mutex);
68 :
69 : // Check each caller ID except HTTPFETCH_DISCARD
70 0 : const unsigned long discard = HTTPFETCH_DISCARD;
71 0 : for (unsigned long caller = discard + 1; caller != discard; ++caller) {
72 : std::map<unsigned long, std::queue<HTTPFetchResult> >::iterator
73 0 : it = g_httpfetch_results.find(caller);
74 0 : if (it == g_httpfetch_results.end()) {
75 0 : verbosestream << "httpfetch_caller_alloc: allocating "
76 0 : << caller << std::endl;
77 : // Access element to create it
78 0 : g_httpfetch_results[caller];
79 0 : return caller;
80 : }
81 : }
82 :
83 0 : FATAL_ERROR("httpfetch_caller_alloc: ran out of caller IDs");
84 : return discard;
85 : }
86 :
87 0 : void httpfetch_caller_free(unsigned long caller)
88 : {
89 0 : verbosestream<<"httpfetch_caller_free: freeing "
90 0 : <<caller<<std::endl;
91 :
92 0 : httpfetch_request_clear(caller);
93 0 : if (caller != HTTPFETCH_DISCARD) {
94 0 : JMutexAutoLock lock(g_httpfetch_mutex);
95 0 : g_httpfetch_results.erase(caller);
96 : }
97 0 : }
98 :
99 0 : bool httpfetch_async_get(unsigned long caller, HTTPFetchResult &fetch_result)
100 : {
101 0 : JMutexAutoLock lock(g_httpfetch_mutex);
102 :
103 : // Check that caller exists
104 : std::map<unsigned long, std::queue<HTTPFetchResult> >::iterator
105 0 : it = g_httpfetch_results.find(caller);
106 0 : if (it == g_httpfetch_results.end())
107 0 : return false;
108 :
109 : // Check that result queue is nonempty
110 0 : std::queue<HTTPFetchResult> &caller_results = it->second;
111 0 : if (caller_results.empty())
112 0 : return false;
113 :
114 : // Pop first result
115 0 : fetch_result = caller_results.front();
116 0 : caller_results.pop();
117 0 : return true;
118 : }
119 :
120 : #if USE_CURL
121 : #include <curl/curl.h>
122 :
123 : /*
124 : USE_CURL is on: use cURL based httpfetch implementation
125 : */
126 :
127 0 : static size_t httpfetch_writefunction(
128 : char *ptr, size_t size, size_t nmemb, void *userdata)
129 : {
130 0 : std::ostringstream *stream = (std::ostringstream*)userdata;
131 0 : size_t count = size * nmemb;
132 0 : stream->write(ptr, count);
133 0 : return count;
134 : }
135 :
136 0 : static size_t httpfetch_discardfunction(
137 : char *ptr, size_t size, size_t nmemb, void *userdata)
138 : {
139 0 : return size * nmemb;
140 : }
141 :
142 : class CurlHandlePool
143 : {
144 : std::list<CURL*> handles;
145 :
146 : public:
147 0 : CurlHandlePool() {}
148 0 : ~CurlHandlePool()
149 0 : {
150 0 : for (std::list<CURL*>::iterator it = handles.begin();
151 0 : it != handles.end(); ++it) {
152 0 : curl_easy_cleanup(*it);
153 : }
154 0 : }
155 0 : CURL * alloc()
156 : {
157 : CURL *curl;
158 0 : if (handles.empty()) {
159 0 : curl = curl_easy_init();
160 0 : if (curl == NULL) {
161 0 : errorstream<<"curl_easy_init returned NULL"<<std::endl;
162 : }
163 : }
164 : else {
165 0 : curl = handles.front();
166 0 : handles.pop_front();
167 : }
168 0 : return curl;
169 : }
170 0 : void free(CURL *handle)
171 : {
172 0 : if (handle)
173 0 : handles.push_back(handle);
174 0 : }
175 : };
176 :
177 : class HTTPFetchOngoing
178 : {
179 : public:
180 : HTTPFetchOngoing(HTTPFetchRequest request, CurlHandlePool *pool);
181 : ~HTTPFetchOngoing();
182 :
183 : CURLcode start(CURLM *multi);
184 : const HTTPFetchResult * complete(CURLcode res);
185 :
186 0 : const HTTPFetchRequest &getRequest() const { return request; };
187 0 : const CURL *getEasyHandle() const { return curl; };
188 :
189 : private:
190 : CurlHandlePool *pool;
191 : CURL *curl;
192 : CURLM *multi;
193 : HTTPFetchRequest request;
194 : HTTPFetchResult result;
195 : std::ostringstream oss;
196 : struct curl_slist *http_header;
197 : curl_httppost *post;
198 : };
199 :
200 :
201 0 : HTTPFetchOngoing::HTTPFetchOngoing(HTTPFetchRequest request_, CurlHandlePool *pool_):
202 : pool(pool_),
203 : curl(NULL),
204 : multi(NULL),
205 : request(request_),
206 : result(request_),
207 : oss(std::ios::binary),
208 : http_header(NULL),
209 0 : post(NULL)
210 : {
211 0 : curl = pool->alloc();
212 0 : if (curl == NULL) {
213 0 : return;
214 : }
215 :
216 : // Set static cURL options
217 0 : curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
218 0 : curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1);
219 0 : curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
220 0 : curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 1);
221 :
222 0 : std::string bind_address = g_settings->get("bind_address");
223 0 : if (!bind_address.empty()) {
224 0 : curl_easy_setopt(curl, CURLOPT_INTERFACE, bind_address.c_str());
225 : }
226 :
227 : #if LIBCURL_VERSION_NUM >= 0x071304
228 : // Restrict protocols so that curl vulnerabilities in
229 : // other protocols don't affect us.
230 : // These settings were introduced in curl 7.19.4.
231 : long protocols =
232 : CURLPROTO_HTTP |
233 : CURLPROTO_HTTPS |
234 : CURLPROTO_FTP |
235 0 : CURLPROTO_FTPS;
236 0 : curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
237 0 : curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
238 : #endif
239 :
240 : // Set cURL options based on HTTPFetchRequest
241 0 : curl_easy_setopt(curl, CURLOPT_URL,
242 0 : request.url.c_str());
243 0 : curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS,
244 0 : request.timeout);
245 0 : curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS,
246 0 : request.connect_timeout);
247 :
248 0 : if (request.useragent != "")
249 0 : curl_easy_setopt(curl, CURLOPT_USERAGENT, request.useragent.c_str());
250 :
251 : // Set up a write callback that writes to the
252 : // ostringstream ongoing->oss, unless the data
253 : // is to be discarded
254 0 : if (request.caller == HTTPFETCH_DISCARD) {
255 0 : curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
256 0 : httpfetch_discardfunction);
257 0 : curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL);
258 : } else {
259 0 : curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
260 0 : httpfetch_writefunction);
261 0 : curl_easy_setopt(curl, CURLOPT_WRITEDATA, &oss);
262 : }
263 :
264 : // Set POST (or GET) data
265 0 : if (request.post_fields.empty()) {
266 0 : curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
267 0 : } else if (request.multipart) {
268 0 : curl_httppost *last = NULL;
269 0 : for (StringMap::iterator it = request.post_fields.begin();
270 0 : it != request.post_fields.end(); ++it) {
271 0 : curl_formadd(&post, &last,
272 0 : CURLFORM_NAMELENGTH, it->first.size(),
273 0 : CURLFORM_PTRNAME, it->first.c_str(),
274 0 : CURLFORM_CONTENTSLENGTH, it->second.size(),
275 0 : CURLFORM_PTRCONTENTS, it->second.c_str(),
276 0 : CURLFORM_END);
277 : }
278 0 : curl_easy_setopt(curl, CURLOPT_HTTPPOST, post);
279 : // request.post_fields must now *never* be
280 : // modified until CURLOPT_HTTPPOST is cleared
281 0 : } else if (request.post_data.empty()) {
282 0 : curl_easy_setopt(curl, CURLOPT_POST, 1);
283 0 : std::string str;
284 0 : for (StringMap::iterator it = request.post_fields.begin();
285 0 : it != request.post_fields.end(); ++it) {
286 0 : if (str != "")
287 0 : str += "&";
288 0 : str += urlencode(it->first);
289 0 : str += "=";
290 0 : str += urlencode(it->second);
291 : }
292 0 : curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE,
293 0 : str.size());
294 0 : curl_easy_setopt(curl, CURLOPT_COPYPOSTFIELDS,
295 0 : str.c_str());
296 : } else {
297 0 : curl_easy_setopt(curl, CURLOPT_POST, 1);
298 0 : curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE,
299 0 : request.post_data.size());
300 0 : curl_easy_setopt(curl, CURLOPT_POSTFIELDS,
301 0 : request.post_data.c_str());
302 : // request.post_data must now *never* be
303 : // modified until CURLOPT_POSTFIELDS is cleared
304 : }
305 : // Set additional HTTP headers
306 0 : for (std::vector<std::string>::iterator it = request.extra_headers.begin();
307 0 : it != request.extra_headers.end(); ++it) {
308 0 : http_header = curl_slist_append(http_header, it->c_str());
309 : }
310 0 : curl_easy_setopt(curl, CURLOPT_HTTPHEADER, http_header);
311 :
312 0 : if (!g_settings->getBool("curl_verify_cert")) {
313 0 : curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, false);
314 : }
315 : }
316 :
317 0 : CURLcode HTTPFetchOngoing::start(CURLM *multi_)
318 : {
319 0 : if (!curl)
320 0 : return CURLE_FAILED_INIT;
321 :
322 0 : if (!multi_) {
323 : // Easy interface (sync)
324 0 : return curl_easy_perform(curl);
325 : }
326 :
327 : // Multi interface (async)
328 0 : CURLMcode mres = curl_multi_add_handle(multi_, curl);
329 0 : if (mres != CURLM_OK) {
330 0 : errorstream << "curl_multi_add_handle"
331 0 : << " returned error code " << mres
332 0 : << std::endl;
333 0 : return CURLE_FAILED_INIT;
334 : }
335 0 : multi = multi_; // store for curl_multi_remove_handle
336 0 : return CURLE_OK;
337 : }
338 :
339 0 : const HTTPFetchResult * HTTPFetchOngoing::complete(CURLcode res)
340 : {
341 0 : result.succeeded = (res == CURLE_OK);
342 0 : result.timeout = (res == CURLE_OPERATION_TIMEDOUT);
343 0 : result.data = oss.str();
344 :
345 : // Get HTTP/FTP response code
346 0 : result.response_code = 0;
347 0 : if (curl && (curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE,
348 : &result.response_code) != CURLE_OK)) {
349 : // We failed to get a return code, make sure it is still 0
350 0 : result.response_code = 0;
351 : }
352 :
353 0 : if (res != CURLE_OK) {
354 0 : errorstream << request.url << " not found ("
355 0 : << curl_easy_strerror(res) << ")"
356 0 : << " (response code " << result.response_code << ")"
357 0 : << std::endl;
358 : }
359 :
360 0 : return &result;
361 : }
362 :
363 0 : HTTPFetchOngoing::~HTTPFetchOngoing()
364 : {
365 0 : if (multi) {
366 0 : CURLMcode mres = curl_multi_remove_handle(multi, curl);
367 0 : if (mres != CURLM_OK) {
368 0 : errorstream << "curl_multi_remove_handle"
369 0 : << " returned error code " << mres
370 0 : << std::endl;
371 : }
372 : }
373 :
374 : // Set safe options for the reusable cURL handle
375 0 : curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
376 0 : httpfetch_discardfunction);
377 0 : curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL);
378 0 : curl_easy_setopt(curl, CURLOPT_POSTFIELDS, NULL);
379 0 : if (http_header) {
380 0 : curl_easy_setopt(curl, CURLOPT_HTTPHEADER, NULL);
381 0 : curl_slist_free_all(http_header);
382 : }
383 0 : if (post) {
384 0 : curl_easy_setopt(curl, CURLOPT_HTTPPOST, NULL);
385 0 : curl_formfree(post);
386 : }
387 :
388 : // Store the cURL handle for reuse
389 0 : pool->free(curl);
390 0 : }
391 :
392 :
393 2 : class CurlFetchThread : public JThread
394 : {
395 : protected:
396 : enum RequestType {
397 : RT_FETCH,
398 : RT_CLEAR,
399 : RT_WAKEUP,
400 : };
401 :
402 6 : struct Request {
403 : RequestType type;
404 : HTTPFetchRequest fetch_request;
405 : Event *event;
406 : };
407 :
408 : CURLM *m_multi;
409 : MutexedQueue<Request> m_requests;
410 : size_t m_parallel_limit;
411 :
412 : // Variables exclusively used within thread
413 : std::vector<HTTPFetchOngoing*> m_all_ongoing;
414 : std::list<HTTPFetchRequest> m_queued_fetches;
415 :
416 : public:
417 1 : CurlFetchThread(int parallel_limit)
418 1 : {
419 1 : if (parallel_limit >= 1)
420 1 : m_parallel_limit = parallel_limit;
421 : else
422 0 : m_parallel_limit = 1;
423 1 : }
424 :
425 0 : void requestFetch(const HTTPFetchRequest &fetch_request)
426 : {
427 0 : Request req;
428 0 : req.type = RT_FETCH;
429 0 : req.fetch_request = fetch_request;
430 0 : req.event = NULL;
431 0 : m_requests.push_back(req);
432 0 : }
433 :
434 0 : void requestClear(unsigned long caller, Event *event)
435 : {
436 0 : Request req;
437 0 : req.type = RT_CLEAR;
438 0 : req.fetch_request.caller = caller;
439 0 : req.event = event;
440 0 : m_requests.push_back(req);
441 0 : }
442 :
443 1 : void requestWakeUp()
444 : {
445 2 : Request req;
446 1 : req.type = RT_WAKEUP;
447 1 : req.event = NULL;
448 1 : m_requests.push_back(req);
449 1 : }
450 :
451 : protected:
452 : // Handle a request from some other thread
453 : // E.g. new fetch; clear fetches for one caller; wake up
454 0 : void processRequest(const Request &req)
455 : {
456 0 : if (req.type == RT_FETCH) {
457 : // New fetch, queue until there are less
458 : // than m_parallel_limit ongoing fetches
459 0 : m_queued_fetches.push_back(req.fetch_request);
460 :
461 : // see processQueued() for what happens next
462 :
463 : }
464 0 : else if (req.type == RT_CLEAR) {
465 0 : unsigned long caller = req.fetch_request.caller;
466 :
467 : // Abort all ongoing fetches for the caller
468 0 : for (std::vector<HTTPFetchOngoing*>::iterator
469 0 : it = m_all_ongoing.begin();
470 0 : it != m_all_ongoing.end();) {
471 0 : if ((*it)->getRequest().caller == caller) {
472 0 : delete (*it);
473 0 : it = m_all_ongoing.erase(it);
474 : } else {
475 0 : ++it;
476 : }
477 : }
478 :
479 : // Also abort all queued fetches for the caller
480 0 : for (std::list<HTTPFetchRequest>::iterator
481 0 : it = m_queued_fetches.begin();
482 0 : it != m_queued_fetches.end();) {
483 0 : if ((*it).caller == caller)
484 0 : it = m_queued_fetches.erase(it);
485 : else
486 0 : ++it;
487 : }
488 : }
489 0 : else if (req.type == RT_WAKEUP) {
490 : // Wakeup: Nothing to do, thread is awake at this point
491 : }
492 :
493 0 : if (req.event != NULL)
494 0 : req.event->signal();
495 0 : }
496 :
497 : // Start new ongoing fetches if m_parallel_limit allows
498 0 : void processQueued(CurlHandlePool *pool)
499 : {
500 0 : while (m_all_ongoing.size() < m_parallel_limit &&
501 0 : !m_queued_fetches.empty()) {
502 0 : HTTPFetchRequest request = m_queued_fetches.front();
503 0 : m_queued_fetches.pop_front();
504 :
505 : // Create ongoing fetch data and make a cURL handle
506 : // Set cURL options based on HTTPFetchRequest
507 : HTTPFetchOngoing *ongoing =
508 0 : new HTTPFetchOngoing(request, pool);
509 :
510 : // Initiate the connection (curl_multi_add_handle)
511 0 : CURLcode res = ongoing->start(m_multi);
512 0 : if (res == CURLE_OK) {
513 0 : m_all_ongoing.push_back(ongoing);
514 : }
515 : else {
516 0 : httpfetch_deliver_result(*ongoing->complete(res));
517 0 : delete ongoing;
518 : }
519 : }
520 0 : }
521 :
522 : // Process CURLMsg (indicates completion of a fetch)
523 0 : void processCurlMessage(CURLMsg *msg)
524 : {
525 : // Determine which ongoing fetch the message pertains to
526 0 : size_t i = 0;
527 0 : bool found = false;
528 0 : for (i = 0; i < m_all_ongoing.size(); ++i) {
529 0 : if (m_all_ongoing[i]->getEasyHandle() == msg->easy_handle) {
530 0 : found = true;
531 0 : break;
532 : }
533 : }
534 0 : if (msg->msg == CURLMSG_DONE && found) {
535 : // m_all_ongoing[i] succeeded or failed.
536 0 : HTTPFetchOngoing *ongoing = m_all_ongoing[i];
537 0 : httpfetch_deliver_result(*ongoing->complete(msg->data.result));
538 0 : delete ongoing;
539 0 : m_all_ongoing.erase(m_all_ongoing.begin() + i);
540 : }
541 0 : }
542 :
543 : // Wait for a request from another thread, or timeout elapses
544 0 : void waitForRequest(long timeout)
545 : {
546 0 : if (m_queued_fetches.empty()) {
547 : try {
548 0 : Request req = m_requests.pop_front(timeout);
549 0 : processRequest(req);
550 : }
551 0 : catch (ItemNotFoundException &e) {}
552 : }
553 0 : }
554 :
555 : // Wait until some IO happens, or timeout elapses
556 0 : void waitForIO(long timeout)
557 : {
558 : fd_set read_fd_set;
559 : fd_set write_fd_set;
560 : fd_set exc_fd_set;
561 : int max_fd;
562 0 : long select_timeout = -1;
563 : struct timeval select_tv;
564 : CURLMcode mres;
565 :
566 0 : FD_ZERO(&read_fd_set);
567 0 : FD_ZERO(&write_fd_set);
568 0 : FD_ZERO(&exc_fd_set);
569 :
570 0 : mres = curl_multi_fdset(m_multi, &read_fd_set,
571 0 : &write_fd_set, &exc_fd_set, &max_fd);
572 0 : if (mres != CURLM_OK) {
573 0 : errorstream<<"curl_multi_fdset"
574 0 : <<" returned error code "<<mres
575 0 : <<std::endl;
576 0 : select_timeout = 0;
577 : }
578 :
579 0 : mres = curl_multi_timeout(m_multi, &select_timeout);
580 0 : if (mres != CURLM_OK) {
581 0 : errorstream<<"curl_multi_timeout"
582 0 : <<" returned error code "<<mres
583 0 : <<std::endl;
584 0 : select_timeout = 0;
585 : }
586 :
587 : // Limit timeout so new requests get through
588 0 : if (select_timeout < 0 || select_timeout > timeout)
589 0 : select_timeout = timeout;
590 :
591 0 : if (select_timeout > 0) {
592 : // in Winsock it is forbidden to pass three empty
593 : // fd_sets to select(), so in that case use sleep_ms
594 0 : if (max_fd != -1) {
595 0 : select_tv.tv_sec = select_timeout / 1000;
596 0 : select_tv.tv_usec = (select_timeout % 1000) * 1000;
597 0 : int retval = select(max_fd + 1, &read_fd_set,
598 : &write_fd_set, &exc_fd_set,
599 0 : &select_tv);
600 0 : if (retval == -1) {
601 : #ifdef _WIN32
602 : errorstream<<"select returned error code "
603 : <<WSAGetLastError()<<std::endl;
604 : #else
605 0 : errorstream<<"select returned error code "
606 0 : <<errno<<std::endl;
607 : #endif
608 : }
609 : }
610 : else {
611 0 : sleep_ms(select_timeout);
612 : }
613 : }
614 0 : }
615 :
616 0 : void * Thread()
617 : {
618 0 : ThreadStarted();
619 0 : log_register_thread("CurlFetchThread");
620 0 : DSTACK(__FUNCTION_NAME);
621 :
622 0 : porting::setThreadName("CurlFetchThread");
623 :
624 0 : CurlHandlePool pool;
625 :
626 0 : m_multi = curl_multi_init();
627 0 : if (m_multi == NULL) {
628 0 : errorstream<<"curl_multi_init returned NULL\n";
629 0 : return NULL;
630 : }
631 :
632 0 : FATAL_ERROR_IF(!m_all_ongoing.empty(), "Expected empty");
633 :
634 0 : while (!StopRequested()) {
635 : BEGIN_DEBUG_EXCEPTION_HANDLER
636 :
637 : /*
638 : Handle new async requests
639 : */
640 :
641 0 : while (!m_requests.empty()) {
642 0 : Request req = m_requests.pop_frontNoEx();
643 0 : processRequest(req);
644 : }
645 0 : processQueued(&pool);
646 :
647 : /*
648 : Handle ongoing async requests
649 : */
650 :
651 0 : int still_ongoing = 0;
652 0 : while (curl_multi_perform(m_multi, &still_ongoing) ==
653 : CURLM_CALL_MULTI_PERFORM)
654 : /* noop */;
655 :
656 : /*
657 : Handle completed async requests
658 : */
659 0 : if (still_ongoing < (int) m_all_ongoing.size()) {
660 : CURLMsg *msg;
661 : int msgs_in_queue;
662 0 : msg = curl_multi_info_read(m_multi, &msgs_in_queue);
663 0 : while (msg != NULL) {
664 0 : processCurlMessage(msg);
665 0 : msg = curl_multi_info_read(m_multi, &msgs_in_queue);
666 : }
667 : }
668 :
669 : /*
670 : If there are ongoing requests, wait for data
671 : (with a timeout of 100ms so that new requests
672 : can be processed).
673 :
674 : If no ongoing requests, wait for a new request.
675 : (Possibly an empty request that signals
676 : that the thread should be stopped.)
677 : */
678 0 : if (m_all_ongoing.empty())
679 0 : waitForRequest(100000000);
680 : else
681 0 : waitForIO(100);
682 :
683 0 : END_DEBUG_EXCEPTION_HANDLER(errorstream)
684 : }
685 :
686 : // Call curl_multi_remove_handle and cleanup easy handles
687 0 : for (size_t i = 0; i < m_all_ongoing.size(); ++i) {
688 0 : delete m_all_ongoing[i];
689 : }
690 0 : m_all_ongoing.clear();
691 :
692 0 : m_queued_fetches.clear();
693 :
694 0 : CURLMcode mres = curl_multi_cleanup(m_multi);
695 0 : if (mres != CURLM_OK) {
696 0 : errorstream<<"curl_multi_cleanup"
697 0 : <<" returned error code "<<mres
698 0 : <<std::endl;
699 : }
700 :
701 0 : return NULL;
702 : }
703 : };
704 :
705 : CurlFetchThread *g_httpfetch_thread = NULL;
706 :
707 1 : void httpfetch_init(int parallel_limit)
708 : {
709 1 : verbosestream<<"httpfetch_init: parallel_limit="<<parallel_limit
710 1 : <<std::endl;
711 :
712 1 : CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT);
713 1 : FATAL_ERROR_IF(res != CURLE_OK, "CURL init failed");
714 :
715 1 : g_httpfetch_thread = new CurlFetchThread(parallel_limit);
716 1 : }
717 :
718 1 : void httpfetch_cleanup()
719 : {
720 1 : verbosestream<<"httpfetch_cleanup: cleaning up"<<std::endl;
721 :
722 1 : g_httpfetch_thread->Stop();
723 1 : g_httpfetch_thread->requestWakeUp();
724 1 : g_httpfetch_thread->Wait();
725 1 : delete g_httpfetch_thread;
726 :
727 1 : curl_global_cleanup();
728 1 : }
729 :
730 0 : void httpfetch_async(const HTTPFetchRequest &fetch_request)
731 : {
732 0 : g_httpfetch_thread->requestFetch(fetch_request);
733 0 : if (!g_httpfetch_thread->IsRunning())
734 0 : g_httpfetch_thread->Start();
735 0 : }
736 :
737 0 : static void httpfetch_request_clear(unsigned long caller)
738 : {
739 0 : if (g_httpfetch_thread->IsRunning()) {
740 0 : Event event;
741 0 : g_httpfetch_thread->requestClear(caller, &event);
742 0 : event.wait();
743 : }
744 : else {
745 0 : g_httpfetch_thread->requestClear(caller, NULL);
746 : }
747 0 : }
748 :
749 0 : void httpfetch_sync(const HTTPFetchRequest &fetch_request,
750 : HTTPFetchResult &fetch_result)
751 : {
752 : // Create ongoing fetch data and make a cURL handle
753 : // Set cURL options based on HTTPFetchRequest
754 0 : CurlHandlePool pool;
755 0 : HTTPFetchOngoing ongoing(fetch_request, &pool);
756 : // Do the fetch (curl_easy_perform)
757 0 : CURLcode res = ongoing.start(NULL);
758 : // Update fetch result
759 0 : fetch_result = *ongoing.complete(res);
760 3 : }
761 :
762 : #else // USE_CURL
763 :
764 : /*
765 : USE_CURL is off:
766 :
767 : Dummy httpfetch implementation that always returns an error.
768 : */
769 :
770 : void httpfetch_init(int parallel_limit)
771 : {
772 : }
773 :
774 : void httpfetch_cleanup()
775 : {
776 : }
777 :
778 : void httpfetch_async(const HTTPFetchRequest &fetch_request)
779 : {
780 : errorstream << "httpfetch_async: unable to fetch " << fetch_request.url
781 : << " because USE_CURL=0" << std::endl;
782 :
783 : HTTPFetchResult fetch_result(fetch_request); // sets succeeded = false etc.
784 : httpfetch_deliver_result(fetch_result);
785 : }
786 :
787 : static void httpfetch_request_clear(unsigned long caller)
788 : {
789 : }
790 :
791 : void httpfetch_sync(const HTTPFetchRequest &fetch_request,
792 : HTTPFetchResult &fetch_result)
793 : {
794 : errorstream << "httpfetch_sync: unable to fetch " << fetch_request.url
795 : << " because USE_CURL=0" << std::endl;
796 :
797 : fetch_result = HTTPFetchResult(fetch_request); // sets succeeded = false etc.
798 : }
799 :
800 : #endif // USE_CURL
|