LCOV - code coverage report
Current view: top level - src - httpfetch.cpp (source / functions) Hit Total Coverage
Test: report Lines: 40 370 10.8 %
Date: 2015-07-11 18:23:49 Functions: 12 39 30.8 %

          Line data    Source code
       1             : /*
       2             : Minetest
       3             : Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
       4             : 
       5             : This program is free software; you can redistribute it and/or modify
       6             : it under the terms of the GNU Lesser General Public License as published by
       7             : the Free Software Foundation; either version 2.1 of the License, or
       8             : (at your option) any later version.
       9             : 
      10             : This program is distributed in the hope that it will be useful,
      11             : but WITHOUT ANY WARRANTY; without even the implied warranty of
      12             : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      13             : GNU Lesser General Public License for more details.
      14             : 
      15             : You should have received a copy of the GNU Lesser General Public License along
      16             : with this program; if not, write to the Free Software Foundation, Inc.,
      17             : 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             : */
      19             : 
      20             : #include "socket.h" // for select()
      21             : #include "porting.h" // for sleep_ms(), get_sysinfo()
      22             : #include "httpfetch.h"
      23             : #include <iostream>
      24             : #include <sstream>
      25             : #include <list>
      26             : #include <map>
      27             : #include <errno.h>
      28             : #include "jthread/jevent.h"
      29             : #include "config.h"
      30             : #include "exceptions.h"
      31             : #include "debug.h"
      32             : #include "log.h"
      33             : #include "util/container.h"
      34             : #include "util/thread.h"
      35             : #include "version.h"
      36             : #include "settings.h"
      37             : 
      38           1 : JMutex g_httpfetch_mutex;
      39           1 : std::map<unsigned long, std::queue<HTTPFetchResult> > g_httpfetch_results;
      40             : 
      41           1 : HTTPFetchRequest::HTTPFetchRequest()
      42             : {
      43           1 :         url = "";
      44           1 :         caller = HTTPFETCH_DISCARD;
      45           1 :         request_id = 0;
      46           1 :         timeout = g_settings->getS32("curl_timeout");
      47           1 :         connect_timeout = timeout;
      48           1 :         multipart = false;
      49             : 
      50           1 :         useragent = std::string(PROJECT_NAME_C "/") + g_version_hash + " (" + porting::get_sysinfo() + ")";
      51           1 : }
      52             : 
      53             : 
      54           0 : static void httpfetch_deliver_result(const HTTPFetchResult &fetch_result)
      55             : {
      56           0 :         unsigned long caller = fetch_result.caller;
      57           0 :         if (caller != HTTPFETCH_DISCARD) {
      58           0 :                 JMutexAutoLock lock(g_httpfetch_mutex);
      59           0 :                 g_httpfetch_results[caller].push(fetch_result);
      60             :         }
      61           0 : }
      62             : 
      63             : static void httpfetch_request_clear(unsigned long caller);
      64             : 
      65           0 : unsigned long httpfetch_caller_alloc()
      66             : {
      67           0 :         JMutexAutoLock lock(g_httpfetch_mutex);
      68             : 
      69             :         // Check each caller ID except HTTPFETCH_DISCARD
      70           0 :         const unsigned long discard = HTTPFETCH_DISCARD;
      71           0 :         for (unsigned long caller = discard + 1; caller != discard; ++caller) {
      72             :                 std::map<unsigned long, std::queue<HTTPFetchResult> >::iterator
      73           0 :                         it = g_httpfetch_results.find(caller);
      74           0 :                 if (it == g_httpfetch_results.end()) {
      75           0 :                         verbosestream << "httpfetch_caller_alloc: allocating "
      76           0 :                                         << caller << std::endl;
      77             :                         // Access element to create it
      78           0 :                         g_httpfetch_results[caller];
      79           0 :                         return caller;
      80             :                 }
      81             :         }
      82             : 
      83           0 :         FATAL_ERROR("httpfetch_caller_alloc: ran out of caller IDs");
      84             :         return discard;
      85             : }
      86             : 
      87           0 : void httpfetch_caller_free(unsigned long caller)
      88             : {
      89           0 :         verbosestream<<"httpfetch_caller_free: freeing "
      90           0 :                         <<caller<<std::endl;
      91             : 
      92           0 :         httpfetch_request_clear(caller);
      93           0 :         if (caller != HTTPFETCH_DISCARD) {
      94           0 :                 JMutexAutoLock lock(g_httpfetch_mutex);
      95           0 :                 g_httpfetch_results.erase(caller);
      96             :         }
      97           0 : }
      98             : 
      99           0 : bool httpfetch_async_get(unsigned long caller, HTTPFetchResult &fetch_result)
     100             : {
     101           0 :         JMutexAutoLock lock(g_httpfetch_mutex);
     102             : 
     103             :         // Check that caller exists
     104             :         std::map<unsigned long, std::queue<HTTPFetchResult> >::iterator
     105           0 :                 it = g_httpfetch_results.find(caller);
     106           0 :         if (it == g_httpfetch_results.end())
     107           0 :                 return false;
     108             : 
     109             :         // Check that result queue is nonempty
     110           0 :         std::queue<HTTPFetchResult> &caller_results = it->second;
     111           0 :         if (caller_results.empty())
     112           0 :                 return false;
     113             : 
     114             :         // Pop first result
     115           0 :         fetch_result = caller_results.front();
     116           0 :         caller_results.pop();
     117           0 :         return true;
     118             : }
     119             : 
     120             : #if USE_CURL
     121             : #include <curl/curl.h>
     122             : 
     123             : /*
     124             :         USE_CURL is on: use cURL based httpfetch implementation
     125             : */
     126             : 
     127           0 : static size_t httpfetch_writefunction(
     128             :                 char *ptr, size_t size, size_t nmemb, void *userdata)
     129             : {
     130           0 :         std::ostringstream *stream = (std::ostringstream*)userdata;
     131           0 :         size_t count = size * nmemb;
     132           0 :         stream->write(ptr, count);
     133           0 :         return count;
     134             : }
     135             : 
     136           0 : static size_t httpfetch_discardfunction(
     137             :                 char *ptr, size_t size, size_t nmemb, void *userdata)
     138             : {
     139           0 :         return size * nmemb;
     140             : }
     141             : 
     142             : class CurlHandlePool
     143             : {
     144             :         std::list<CURL*> handles;
     145             : 
     146             : public:
     147           0 :         CurlHandlePool() {}
     148           0 :         ~CurlHandlePool()
     149           0 :         {
     150           0 :                 for (std::list<CURL*>::iterator it = handles.begin();
     151           0 :                                 it != handles.end(); ++it) {
     152           0 :                         curl_easy_cleanup(*it);
     153             :                 }
     154           0 :         }
     155           0 :         CURL * alloc()
     156             :         {
     157             :                 CURL *curl;
     158           0 :                 if (handles.empty()) {
     159           0 :                         curl = curl_easy_init();
     160           0 :                         if (curl == NULL) {
     161           0 :                                 errorstream<<"curl_easy_init returned NULL"<<std::endl;
     162             :                         }
     163             :                 }
     164             :                 else {
     165           0 :                         curl = handles.front();
     166           0 :                         handles.pop_front();
     167             :                 }
     168           0 :                 return curl;
     169             :         }
     170           0 :         void free(CURL *handle)
     171             :         {
     172           0 :                 if (handle)
     173           0 :                         handles.push_back(handle);
     174           0 :         }
     175             : };
     176             : 
     177             : class HTTPFetchOngoing
     178             : {
     179             : public:
     180             :         HTTPFetchOngoing(HTTPFetchRequest request, CurlHandlePool *pool);
     181             :         ~HTTPFetchOngoing();
     182             : 
     183             :         CURLcode start(CURLM *multi);
     184             :         const HTTPFetchResult * complete(CURLcode res);
     185             : 
     186           0 :         const HTTPFetchRequest &getRequest()    const { return request; };
     187           0 :         const CURL             *getEasyHandle() const { return curl; };
     188             : 
     189             : private:
     190             :         CurlHandlePool *pool;
     191             :         CURL *curl;
     192             :         CURLM *multi;
     193             :         HTTPFetchRequest request;
     194             :         HTTPFetchResult result;
     195             :         std::ostringstream oss;
     196             :         struct curl_slist *http_header;
     197             :         curl_httppost *post;
     198             : };
     199             : 
     200             : 
     201           0 : HTTPFetchOngoing::HTTPFetchOngoing(HTTPFetchRequest request_, CurlHandlePool *pool_):
     202             :         pool(pool_),
     203             :         curl(NULL),
     204             :         multi(NULL),
     205             :         request(request_),
     206             :         result(request_),
     207             :         oss(std::ios::binary),
     208             :         http_header(NULL),
     209           0 :         post(NULL)
     210             : {
     211           0 :         curl = pool->alloc();
     212           0 :         if (curl == NULL) {
     213           0 :                 return;
     214             :         }
     215             : 
     216             :         // Set static cURL options
     217           0 :         curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
     218           0 :         curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1);
     219           0 :         curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
     220           0 :         curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 1);
     221             : 
     222           0 :         std::string bind_address = g_settings->get("bind_address");
     223           0 :         if (!bind_address.empty()) {
     224           0 :                 curl_easy_setopt(curl, CURLOPT_INTERFACE, bind_address.c_str());
     225             :         }
     226             : 
     227             : #if LIBCURL_VERSION_NUM >= 0x071304
     228             :         // Restrict protocols so that curl vulnerabilities in
     229             :         // other protocols don't affect us.
     230             :         // These settings were introduced in curl 7.19.4.
     231             :         long protocols =
     232             :                 CURLPROTO_HTTP |
     233             :                 CURLPROTO_HTTPS |
     234             :                 CURLPROTO_FTP |
     235           0 :                 CURLPROTO_FTPS;
     236           0 :         curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
     237           0 :         curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
     238             : #endif
     239             : 
     240             :         // Set cURL options based on HTTPFetchRequest
     241           0 :         curl_easy_setopt(curl, CURLOPT_URL,
     242           0 :                         request.url.c_str());
     243           0 :         curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS,
     244           0 :                         request.timeout);
     245           0 :         curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS,
     246           0 :                         request.connect_timeout);
     247             : 
     248           0 :         if (request.useragent != "")
     249           0 :                 curl_easy_setopt(curl, CURLOPT_USERAGENT, request.useragent.c_str());
     250             : 
     251             :         // Set up a write callback that writes to the
     252             :         // ostringstream ongoing->oss, unless the data
     253             :         // is to be discarded
     254           0 :         if (request.caller == HTTPFETCH_DISCARD) {
     255           0 :                 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
     256           0 :                                 httpfetch_discardfunction);
     257           0 :                 curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL);
     258             :         } else {
     259           0 :                 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
     260           0 :                                 httpfetch_writefunction);
     261           0 :                 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &oss);
     262             :         }
     263             : 
     264             :         // Set POST (or GET) data
     265           0 :         if (request.post_fields.empty()) {
     266           0 :                 curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
     267           0 :         } else if (request.multipart) {
     268           0 :                 curl_httppost *last = NULL;
     269           0 :                 for (StringMap::iterator it = request.post_fields.begin();
     270           0 :                                 it != request.post_fields.end(); ++it) {
     271           0 :                         curl_formadd(&post, &last,
     272           0 :                                         CURLFORM_NAMELENGTH, it->first.size(),
     273           0 :                                         CURLFORM_PTRNAME, it->first.c_str(),
     274           0 :                                         CURLFORM_CONTENTSLENGTH, it->second.size(),
     275           0 :                                         CURLFORM_PTRCONTENTS, it->second.c_str(),
     276           0 :                                         CURLFORM_END);
     277             :                 }
     278           0 :                 curl_easy_setopt(curl, CURLOPT_HTTPPOST, post);
     279             :                 // request.post_fields must now *never* be
     280             :                 // modified until CURLOPT_HTTPPOST is cleared
     281           0 :         } else if (request.post_data.empty()) {
     282           0 :                 curl_easy_setopt(curl, CURLOPT_POST, 1);
     283           0 :                 std::string str;
     284           0 :                 for (StringMap::iterator it = request.post_fields.begin();
     285           0 :                                 it != request.post_fields.end(); ++it) {
     286           0 :                         if (str != "")
     287           0 :                                 str += "&";
     288           0 :                         str += urlencode(it->first);
     289           0 :                         str += "=";
     290           0 :                         str += urlencode(it->second);
     291             :                 }
     292           0 :                 curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE,
     293           0 :                                 str.size());
     294           0 :                 curl_easy_setopt(curl, CURLOPT_COPYPOSTFIELDS,
     295           0 :                                 str.c_str());
     296             :         } else {
     297           0 :                 curl_easy_setopt(curl, CURLOPT_POST, 1);
     298           0 :                 curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE,
     299           0 :                                 request.post_data.size());
     300           0 :                 curl_easy_setopt(curl, CURLOPT_POSTFIELDS,
     301           0 :                                 request.post_data.c_str());
     302             :                 // request.post_data must now *never* be
     303             :                 // modified until CURLOPT_POSTFIELDS is cleared
     304             :         }
     305             :         // Set additional HTTP headers
     306           0 :         for (std::vector<std::string>::iterator it = request.extra_headers.begin();
     307           0 :                         it != request.extra_headers.end(); ++it) {
     308           0 :                 http_header = curl_slist_append(http_header, it->c_str());
     309             :         }
     310           0 :         curl_easy_setopt(curl, CURLOPT_HTTPHEADER, http_header);
     311             : 
     312           0 :         if (!g_settings->getBool("curl_verify_cert")) {
     313           0 :                 curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, false);
     314             :         }
     315             : }
     316             : 
     317           0 : CURLcode HTTPFetchOngoing::start(CURLM *multi_)
     318             : {
     319           0 :         if (!curl)
     320           0 :                 return CURLE_FAILED_INIT;
     321             : 
     322           0 :         if (!multi_) {
     323             :                 // Easy interface (sync)
     324           0 :                 return curl_easy_perform(curl);
     325             :         }
     326             : 
     327             :         // Multi interface (async)
     328           0 :         CURLMcode mres = curl_multi_add_handle(multi_, curl);
     329           0 :         if (mres != CURLM_OK) {
     330           0 :                 errorstream << "curl_multi_add_handle"
     331           0 :                         << " returned error code " << mres
     332           0 :                         << std::endl;
     333           0 :                 return CURLE_FAILED_INIT;
     334             :         }
     335           0 :         multi = multi_; // store for curl_multi_remove_handle
     336           0 :         return CURLE_OK;
     337             : }
     338             : 
     339           0 : const HTTPFetchResult * HTTPFetchOngoing::complete(CURLcode res)
     340             : {
     341           0 :         result.succeeded = (res == CURLE_OK);
     342           0 :         result.timeout = (res == CURLE_OPERATION_TIMEDOUT);
     343           0 :         result.data = oss.str();
     344             : 
     345             :         // Get HTTP/FTP response code
     346           0 :         result.response_code = 0;
     347           0 :         if (curl && (curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE,
     348             :                                 &result.response_code) != CURLE_OK)) {
     349             :                 // We failed to get a return code, make sure it is still 0
     350           0 :                 result.response_code = 0;
     351             :         }
     352             : 
     353           0 :         if (res != CURLE_OK) {
     354           0 :                 errorstream << request.url << " not found ("
     355           0 :                         << curl_easy_strerror(res) << ")"
     356           0 :                         << " (response code " << result.response_code << ")"
     357           0 :                         << std::endl;
     358             :         }
     359             : 
     360           0 :         return &result;
     361             : }
     362             : 
     363           0 : HTTPFetchOngoing::~HTTPFetchOngoing()
     364             : {
     365           0 :         if (multi) {
     366           0 :                 CURLMcode mres = curl_multi_remove_handle(multi, curl);
     367           0 :                 if (mres != CURLM_OK) {
     368           0 :                         errorstream << "curl_multi_remove_handle"
     369           0 :                                 << " returned error code " << mres
     370           0 :                                 << std::endl;
     371             :                 }
     372             :         }
     373             : 
     374             :         // Set safe options for the reusable cURL handle
     375           0 :         curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
     376           0 :                         httpfetch_discardfunction);
     377           0 :         curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL);
     378           0 :         curl_easy_setopt(curl, CURLOPT_POSTFIELDS, NULL);
     379           0 :         if (http_header) {
     380           0 :                 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, NULL);
     381           0 :                 curl_slist_free_all(http_header);
     382             :         }
     383           0 :         if (post) {
     384           0 :                 curl_easy_setopt(curl, CURLOPT_HTTPPOST, NULL);
     385           0 :                 curl_formfree(post);
     386             :         }
     387             : 
     388             :         // Store the cURL handle for reuse
     389           0 :         pool->free(curl);
     390           0 : }
     391             : 
     392             : 
     393           2 : class CurlFetchThread : public JThread
     394             : {
     395             : protected:
     396             :         enum RequestType {
     397             :                 RT_FETCH,
     398             :                 RT_CLEAR,
     399             :                 RT_WAKEUP,
     400             :         };
     401             : 
     402           6 :         struct Request {
     403             :                 RequestType type;
     404             :                 HTTPFetchRequest fetch_request;
     405             :                 Event *event;
     406             :         };
     407             : 
     408             :         CURLM *m_multi;
     409             :         MutexedQueue<Request> m_requests;
     410             :         size_t m_parallel_limit;
     411             : 
     412             :         // Variables exclusively used within thread
     413             :         std::vector<HTTPFetchOngoing*> m_all_ongoing;
     414             :         std::list<HTTPFetchRequest> m_queued_fetches;
     415             : 
     416             : public:
     417           1 :         CurlFetchThread(int parallel_limit)
     418           1 :         {
     419           1 :                 if (parallel_limit >= 1)
     420           1 :                         m_parallel_limit = parallel_limit;
     421             :                 else
     422           0 :                         m_parallel_limit = 1;
     423           1 :         }
     424             : 
     425           0 :         void requestFetch(const HTTPFetchRequest &fetch_request)
     426             :         {
     427           0 :                 Request req;
     428           0 :                 req.type = RT_FETCH;
     429           0 :                 req.fetch_request = fetch_request;
     430           0 :                 req.event = NULL;
     431           0 :                 m_requests.push_back(req);
     432           0 :         }
     433             : 
     434           0 :         void requestClear(unsigned long caller, Event *event)
     435             :         {
     436           0 :                 Request req;
     437           0 :                 req.type = RT_CLEAR;
     438           0 :                 req.fetch_request.caller = caller;
     439           0 :                 req.event = event;
     440           0 :                 m_requests.push_back(req);
     441           0 :         }
     442             : 
     443           1 :         void requestWakeUp()
     444             :         {
     445           2 :                 Request req;
     446           1 :                 req.type = RT_WAKEUP;
     447           1 :                 req.event = NULL;
     448           1 :                 m_requests.push_back(req);
     449           1 :         }
     450             : 
     451             : protected:
     452             :         // Handle a request from some other thread
     453             :         // E.g. new fetch; clear fetches for one caller; wake up
     454           0 :         void processRequest(const Request &req)
     455             :         {
     456           0 :                 if (req.type == RT_FETCH) {
     457             :                         // New fetch, queue until there are less
     458             :                         // than m_parallel_limit ongoing fetches
     459           0 :                         m_queued_fetches.push_back(req.fetch_request);
     460             : 
     461             :                         // see processQueued() for what happens next
     462             : 
     463             :                 }
     464           0 :                 else if (req.type == RT_CLEAR) {
     465           0 :                         unsigned long caller = req.fetch_request.caller;
     466             : 
     467             :                         // Abort all ongoing fetches for the caller
     468           0 :                         for (std::vector<HTTPFetchOngoing*>::iterator
     469           0 :                                         it = m_all_ongoing.begin();
     470           0 :                                         it != m_all_ongoing.end();) {
     471           0 :                                 if ((*it)->getRequest().caller == caller) {
     472           0 :                                         delete (*it);
     473           0 :                                         it = m_all_ongoing.erase(it);
     474             :                                 } else {
     475           0 :                                         ++it;
     476             :                                 }
     477             :                         }
     478             : 
     479             :                         // Also abort all queued fetches for the caller
     480           0 :                         for (std::list<HTTPFetchRequest>::iterator
     481           0 :                                         it = m_queued_fetches.begin();
     482           0 :                                         it != m_queued_fetches.end();) {
     483           0 :                                 if ((*it).caller == caller)
     484           0 :                                         it = m_queued_fetches.erase(it);
     485             :                                 else
     486           0 :                                         ++it;
     487             :                         }
     488             :                 }
     489           0 :                 else if (req.type == RT_WAKEUP) {
     490             :                         // Wakeup: Nothing to do, thread is awake at this point
     491             :                 }
     492             : 
     493           0 :                 if (req.event != NULL)
     494           0 :                         req.event->signal();
     495           0 :         }
     496             : 
     497             :         // Start new ongoing fetches if m_parallel_limit allows
     498           0 :         void processQueued(CurlHandlePool *pool)
     499             :         {
     500           0 :                 while (m_all_ongoing.size() < m_parallel_limit &&
     501           0 :                                 !m_queued_fetches.empty()) {
     502           0 :                         HTTPFetchRequest request = m_queued_fetches.front();
     503           0 :                         m_queued_fetches.pop_front();
     504             : 
     505             :                         // Create ongoing fetch data and make a cURL handle
     506             :                         // Set cURL options based on HTTPFetchRequest
     507             :                         HTTPFetchOngoing *ongoing =
     508           0 :                                 new HTTPFetchOngoing(request, pool);
     509             : 
     510             :                         // Initiate the connection (curl_multi_add_handle)
     511           0 :                         CURLcode res = ongoing->start(m_multi);
     512           0 :                         if (res == CURLE_OK) {
     513           0 :                                 m_all_ongoing.push_back(ongoing);
     514             :                         }
     515             :                         else {
     516           0 :                                 httpfetch_deliver_result(*ongoing->complete(res));
     517           0 :                                 delete ongoing;
     518             :                         }
     519             :                 }
     520           0 :         }
     521             : 
     522             :         // Process CURLMsg (indicates completion of a fetch)
     523           0 :         void processCurlMessage(CURLMsg *msg)
     524             :         {
     525             :                 // Determine which ongoing fetch the message pertains to
     526           0 :                 size_t i = 0;
     527           0 :                 bool found = false;
     528           0 :                 for (i = 0; i < m_all_ongoing.size(); ++i) {
     529           0 :                         if (m_all_ongoing[i]->getEasyHandle() == msg->easy_handle) {
     530           0 :                                 found = true;
     531           0 :                                 break;
     532             :                         }
     533             :                 }
     534           0 :                 if (msg->msg == CURLMSG_DONE && found) {
     535             :                         // m_all_ongoing[i] succeeded or failed.
     536           0 :                         HTTPFetchOngoing *ongoing = m_all_ongoing[i];
     537           0 :                         httpfetch_deliver_result(*ongoing->complete(msg->data.result));
     538           0 :                         delete ongoing;
     539           0 :                         m_all_ongoing.erase(m_all_ongoing.begin() + i);
     540             :                 }
     541           0 :         }
     542             : 
     543             :         // Wait for a request from another thread, or timeout elapses
     544           0 :         void waitForRequest(long timeout)
     545             :         {
     546           0 :                 if (m_queued_fetches.empty()) {
     547             :                         try {
     548           0 :                                 Request req = m_requests.pop_front(timeout);
     549           0 :                                 processRequest(req);
     550             :                         }
     551           0 :                         catch (ItemNotFoundException &e) {}
     552             :                 }
     553           0 :         }
     554             : 
     555             :         // Wait until some IO happens, or timeout elapses
     556           0 :         void waitForIO(long timeout)
     557             :         {
     558             :                 fd_set read_fd_set;
     559             :                 fd_set write_fd_set;
     560             :                 fd_set exc_fd_set;
     561             :                 int max_fd;
     562           0 :                 long select_timeout = -1;
     563             :                 struct timeval select_tv;
     564             :                 CURLMcode mres;
     565             : 
     566           0 :                 FD_ZERO(&read_fd_set);
     567           0 :                 FD_ZERO(&write_fd_set);
     568           0 :                 FD_ZERO(&exc_fd_set);
     569             : 
     570           0 :                 mres = curl_multi_fdset(m_multi, &read_fd_set,
     571           0 :                                 &write_fd_set, &exc_fd_set, &max_fd);
     572           0 :                 if (mres != CURLM_OK) {
     573           0 :                         errorstream<<"curl_multi_fdset"
     574           0 :                                 <<" returned error code "<<mres
     575           0 :                                 <<std::endl;
     576           0 :                         select_timeout = 0;
     577             :                 }
     578             : 
     579           0 :                 mres = curl_multi_timeout(m_multi, &select_timeout);
     580           0 :                 if (mres != CURLM_OK) {
     581           0 :                         errorstream<<"curl_multi_timeout"
     582           0 :                                 <<" returned error code "<<mres
     583           0 :                                 <<std::endl;
     584           0 :                         select_timeout = 0;
     585             :                 }
     586             : 
     587             :                 // Limit timeout so new requests get through
     588           0 :                 if (select_timeout < 0 || select_timeout > timeout)
     589           0 :                         select_timeout = timeout;
     590             : 
     591           0 :                 if (select_timeout > 0) {
     592             :                         // in Winsock it is forbidden to pass three empty
     593             :                         // fd_sets to select(), so in that case use sleep_ms
     594           0 :                         if (max_fd != -1) {
     595           0 :                                 select_tv.tv_sec = select_timeout / 1000;
     596           0 :                                 select_tv.tv_usec = (select_timeout % 1000) * 1000;
     597           0 :                                 int retval = select(max_fd + 1, &read_fd_set,
     598             :                                                 &write_fd_set, &exc_fd_set,
     599           0 :                                                 &select_tv);
     600           0 :                                 if (retval == -1) {
     601             :                                         #ifdef _WIN32
     602             :                                         errorstream<<"select returned error code "
     603             :                                                 <<WSAGetLastError()<<std::endl;
     604             :                                         #else
     605           0 :                                         errorstream<<"select returned error code "
     606           0 :                                                 <<errno<<std::endl;
     607             :                                         #endif
     608             :                                 }
     609             :                         }
     610             :                         else {
     611           0 :                                 sleep_ms(select_timeout);
     612             :                         }
     613             :                 }
     614           0 :         }
     615             : 
     616           0 :         void * Thread()
     617             :         {
     618           0 :                 ThreadStarted();
     619           0 :                 log_register_thread("CurlFetchThread");
     620           0 :                 DSTACK(__FUNCTION_NAME);
     621             : 
     622           0 :                 porting::setThreadName("CurlFetchThread");
     623             : 
     624           0 :                 CurlHandlePool pool;
     625             : 
     626           0 :                 m_multi = curl_multi_init();
     627           0 :                 if (m_multi == NULL) {
     628           0 :                         errorstream<<"curl_multi_init returned NULL\n";
     629           0 :                         return NULL;
     630             :                 }
     631             : 
     632           0 :                 FATAL_ERROR_IF(!m_all_ongoing.empty(), "Expected empty");
     633             : 
     634           0 :                 while (!StopRequested()) {
     635             :                         BEGIN_DEBUG_EXCEPTION_HANDLER
     636             : 
     637             :                         /*
     638             :                                 Handle new async requests
     639             :                         */
     640             : 
     641           0 :                         while (!m_requests.empty()) {
     642           0 :                                 Request req = m_requests.pop_frontNoEx();
     643           0 :                                 processRequest(req);
     644             :                         }
     645           0 :                         processQueued(&pool);
     646             : 
     647             :                         /*
     648             :                                 Handle ongoing async requests
     649             :                         */
     650             : 
     651           0 :                         int still_ongoing = 0;
     652           0 :                         while (curl_multi_perform(m_multi, &still_ongoing) ==
     653             :                                         CURLM_CALL_MULTI_PERFORM)
     654             :                                 /* noop */;
     655             : 
     656             :                         /*
     657             :                                 Handle completed async requests
     658             :                         */
     659           0 :                         if (still_ongoing < (int) m_all_ongoing.size()) {
     660             :                                 CURLMsg *msg;
     661             :                                 int msgs_in_queue;
     662           0 :                                 msg = curl_multi_info_read(m_multi, &msgs_in_queue);
     663           0 :                                 while (msg != NULL) {
     664           0 :                                         processCurlMessage(msg);
     665           0 :                                         msg = curl_multi_info_read(m_multi, &msgs_in_queue);
     666             :                                 }
     667             :                         }
     668             : 
     669             :                         /*
     670             :                                 If there are ongoing requests, wait for data
     671             :                                 (with a timeout of 100ms so that new requests
     672             :                                 can be processed).
     673             : 
     674             :                                 If no ongoing requests, wait for a new request.
     675             :                                 (Possibly an empty request that signals
     676             :                                 that the thread should be stopped.)
     677             :                         */
     678           0 :                         if (m_all_ongoing.empty())
     679           0 :                                 waitForRequest(100000000);
     680             :                         else
     681           0 :                                 waitForIO(100);
     682             : 
     683           0 :                         END_DEBUG_EXCEPTION_HANDLER(errorstream)
     684             :                 }
     685             : 
     686             :                 // Call curl_multi_remove_handle and cleanup easy handles
     687           0 :                 for (size_t i = 0; i < m_all_ongoing.size(); ++i) {
     688           0 :                         delete m_all_ongoing[i];
     689             :                 }
     690           0 :                 m_all_ongoing.clear();
     691             : 
     692           0 :                 m_queued_fetches.clear();
     693             : 
     694           0 :                 CURLMcode mres = curl_multi_cleanup(m_multi);
     695           0 :                 if (mres != CURLM_OK) {
     696           0 :                         errorstream<<"curl_multi_cleanup"
     697           0 :                                 <<" returned error code "<<mres
     698           0 :                                 <<std::endl;
     699             :                 }
     700             : 
     701           0 :                 return NULL;
     702             :         }
     703             : };
     704             : 
     705             : CurlFetchThread *g_httpfetch_thread = NULL;
     706             : 
     707           1 : void httpfetch_init(int parallel_limit)
     708             : {
     709           1 :         verbosestream<<"httpfetch_init: parallel_limit="<<parallel_limit
     710           1 :                         <<std::endl;
     711             : 
     712           1 :         CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT);
     713           1 :         FATAL_ERROR_IF(res != CURLE_OK, "CURL init failed");
     714             : 
     715           1 :         g_httpfetch_thread = new CurlFetchThread(parallel_limit);
     716           1 : }
     717             : 
     718           1 : void httpfetch_cleanup()
     719             : {
     720           1 :         verbosestream<<"httpfetch_cleanup: cleaning up"<<std::endl;
     721             : 
     722           1 :         g_httpfetch_thread->Stop();
     723           1 :         g_httpfetch_thread->requestWakeUp();
     724           1 :         g_httpfetch_thread->Wait();
     725           1 :         delete g_httpfetch_thread;
     726             : 
     727           1 :         curl_global_cleanup();
     728           1 : }
     729             : 
     730           0 : void httpfetch_async(const HTTPFetchRequest &fetch_request)
     731             : {
     732           0 :         g_httpfetch_thread->requestFetch(fetch_request);
     733           0 :         if (!g_httpfetch_thread->IsRunning())
     734           0 :                 g_httpfetch_thread->Start();
     735           0 : }
     736             : 
     737           0 : static void httpfetch_request_clear(unsigned long caller)
     738             : {
     739           0 :         if (g_httpfetch_thread->IsRunning()) {
     740           0 :                 Event event;
     741           0 :                 g_httpfetch_thread->requestClear(caller, &event);
     742           0 :                 event.wait();
     743             :         }
     744             :         else {
     745           0 :                 g_httpfetch_thread->requestClear(caller, NULL);
     746             :         }
     747           0 : }
     748             : 
     749           0 : void httpfetch_sync(const HTTPFetchRequest &fetch_request,
     750             :                 HTTPFetchResult &fetch_result)
     751             : {
     752             :         // Create ongoing fetch data and make a cURL handle
     753             :         // Set cURL options based on HTTPFetchRequest
     754           0 :         CurlHandlePool pool;
     755           0 :         HTTPFetchOngoing ongoing(fetch_request, &pool);
     756             :         // Do the fetch (curl_easy_perform)
     757           0 :         CURLcode res = ongoing.start(NULL);
     758             :         // Update fetch result
     759           0 :         fetch_result = *ongoing.complete(res);
     760           3 : }
     761             : 
     762             : #else  // USE_CURL
     763             : 
     764             : /*
     765             :         USE_CURL is off:
     766             : 
     767             :         Dummy httpfetch implementation that always returns an error.
     768             : */
     769             : 
     770             : void httpfetch_init(int parallel_limit)
     771             : {
     772             : }
     773             : 
     774             : void httpfetch_cleanup()
     775             : {
     776             : }
     777             : 
     778             : void httpfetch_async(const HTTPFetchRequest &fetch_request)
     779             : {
     780             :         errorstream << "httpfetch_async: unable to fetch " << fetch_request.url
     781             :                         << " because USE_CURL=0" << std::endl;
     782             : 
     783             :         HTTPFetchResult fetch_result(fetch_request); // sets succeeded = false etc.
     784             :         httpfetch_deliver_result(fetch_result);
     785             : }
     786             : 
     787             : static void httpfetch_request_clear(unsigned long caller)
     788             : {
     789             : }
     790             : 
     791             : void httpfetch_sync(const HTTPFetchRequest &fetch_request,
     792             :                 HTTPFetchResult &fetch_result)
     793             : {
     794             :         errorstream << "httpfetch_sync: unable to fetch " << fetch_request.url
     795             :                         << " because USE_CURL=0" << std::endl;
     796             : 
     797             :         fetch_result = HTTPFetchResult(fetch_request); // sets succeeded = false etc.
     798             : }
     799             : 
     800             : #endif  // USE_CURL

Generated by: LCOV version 1.11