LCOV - code coverage report
Current view: top level - src/script/cpp_api - s_async.cpp (source / functions) Hit Total Coverage
Test: report Lines: 85 156 54.5 %
Date: 2015-07-11 18:23:49 Functions: 13 16 81.2 %

          Line data    Source code
       1             : /*
       2             : Minetest
       3             : Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
       4             : 
       5             : This program is free software; you can redistribute it and/or modify
       6             : it under the terms of the GNU Lesser General Public License as published by
       7             : the Free Software Foundation; either version 2.1 of the License, or
       8             : (at your option) any later version.
       9             : 
      10             : This program is distributed in the hope that it will be useful,
      11             : but WITHOUT ANY WARRANTY; without even the implied warranty of
      12             : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      13             : GNU Lesser General Public License for more details.
      14             : 
      15             : You should have received a copy of the GNU Lesser General Public License along
      16             : with this program; if not, write to the Free Software Foundation, Inc.,
      17             : 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             : */
      19             : 
      20             : #include <stdio.h>
      21             : #include <stdlib.h>
      22             : 
      23             : extern "C" {
      24             : #include "lua.h"
      25             : #include "lauxlib.h"
      26             : #include "lualib.h"
      27             : }
      28             : 
      29             : #include "server.h"
      30             : #include "s_async.h"
      31             : #include "log.h"
      32             : #include "filesys.h"
      33             : #include "porting.h"
      34             : #include "common/c_internal.h"
      35             : 
      36             : /******************************************************************************/
      37           1 : AsyncEngine::AsyncEngine() :
      38             :         initDone(false),
      39           1 :         jobIdCounter(0)
      40             : {
      41           1 : }
      42             : 
      43             : /******************************************************************************/
      44           2 : AsyncEngine::~AsyncEngine()
      45             : {
      46             : 
      47             :         // Request all threads to stop
      48          15 :         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
      49          10 :                         it != workerThreads.end(); it++) {
      50           4 :                 (*it)->Stop();
      51             :         }
      52             : 
      53             : 
      54             :         // Wake up all threads
      55          15 :         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
      56          10 :                         it != workerThreads.end(); it++) {
      57           4 :                 jobQueueCounter.Post();
      58             :         }
      59             : 
      60             :         // Wait for threads to finish
      61          15 :         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
      62          10 :                         it != workerThreads.end(); it++) {
      63           4 :                 (*it)->Wait();
      64             :         }
      65             : 
      66             :         // Force kill all threads
      67          15 :         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
      68          10 :                         it != workerThreads.end(); it++) {
      69           4 :                 (*it)->Kill();
      70           4 :                 delete *it;
      71             :         }
      72             : 
      73           1 :         jobQueueMutex.Lock();
      74           1 :         jobQueue.clear();
      75           1 :         jobQueueMutex.Unlock();
      76           1 :         workerThreads.clear();
      77           1 : }
      78             : 
      79             : /******************************************************************************/
      80          27 : bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
      81             : {
      82          27 :         if (initDone) {
      83           0 :                 return false;
      84             :         }
      85          27 :         functionList[name] = func;
      86          27 :         return true;
      87             : }
      88             : 
      89             : /******************************************************************************/
      90           1 : void AsyncEngine::initialize(unsigned int numEngines)
      91             : {
      92           1 :         initDone = true;
      93             : 
      94           5 :         for (unsigned int i = 0; i < numEngines; i++) {
      95           4 :                 AsyncWorkerThread *toAdd = new AsyncWorkerThread(this, i);
      96           4 :                 workerThreads.push_back(toAdd);
      97           4 :                 toAdd->Start();
      98             :         }
      99           1 : }
     100             : 
     101             : /******************************************************************************/
     102           0 : unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params)
     103             : {
     104           0 :         jobQueueMutex.Lock();
     105           0 :         LuaJobInfo toAdd;
     106           0 :         toAdd.id = jobIdCounter++;
     107           0 :         toAdd.serializedFunction = func;
     108           0 :         toAdd.serializedParams = params;
     109             : 
     110           0 :         jobQueue.push_back(toAdd);
     111             : 
     112           0 :         jobQueueCounter.Post();
     113             : 
     114           0 :         jobQueueMutex.Unlock();
     115             : 
     116           0 :         return toAdd.id;
     117             : }
     118             : 
     119             : /******************************************************************************/
     120           4 : LuaJobInfo AsyncEngine::getJob()
     121             : {
     122           4 :         jobQueueCounter.Wait();
     123           4 :         jobQueueMutex.Lock();
     124             : 
     125           4 :         LuaJobInfo retval;
     126           4 :         retval.valid = false;
     127             : 
     128           4 :         if (!jobQueue.empty()) {
     129           0 :                 retval = jobQueue.front();
     130           0 :                 jobQueue.pop_front();
     131           0 :                 retval.valid = true;
     132             :         }
     133           4 :         jobQueueMutex.Unlock();
     134             : 
     135           4 :         return retval;
     136             : }
     137             : 
     138             : /******************************************************************************/
     139           0 : void AsyncEngine::putJobResult(LuaJobInfo result)
     140             : {
     141           0 :         resultQueueMutex.Lock();
     142           0 :         resultQueue.push_back(result);
     143           0 :         resultQueueMutex.Unlock();
     144           0 : }
     145             : 
     146             : /******************************************************************************/
     147          91 : void AsyncEngine::step(lua_State *L, int errorhandler)
     148             : {
     149          91 :         lua_getglobal(L, "core");
     150          91 :         resultQueueMutex.Lock();
     151          91 :         while (!resultQueue.empty()) {
     152           0 :                 LuaJobInfo jobDone = resultQueue.front();
     153           0 :                 resultQueue.pop_front();
     154             : 
     155           0 :                 lua_getfield(L, -1, "async_event_handler");
     156             : 
     157           0 :                 if (lua_isnil(L, -1)) {
     158           0 :                         FATAL_ERROR("Async event handler does not exist!");
     159             :                 }
     160             : 
     161           0 :                 luaL_checktype(L, -1, LUA_TFUNCTION);
     162             : 
     163           0 :                 lua_pushinteger(L, jobDone.id);
     164           0 :                 lua_pushlstring(L, jobDone.serializedResult.data(),
     165           0 :                                 jobDone.serializedResult.size());
     166             : 
     167           0 :                 if (lua_pcall(L, 2, 0, errorhandler)) {
     168           0 :                         script_error(L);
     169             :                 }
     170             :         }
     171          91 :         resultQueueMutex.Unlock();
     172          91 :         lua_pop(L, 1); // Pop core
     173          91 : }
     174             : 
     175             : /******************************************************************************/
     176           0 : void AsyncEngine::pushFinishedJobs(lua_State* L) {
     177             :         // Result Table
     178           0 :         resultQueueMutex.Lock();
     179             : 
     180           0 :         unsigned int index = 1;
     181           0 :         lua_createtable(L, resultQueue.size(), 0);
     182           0 :         int top = lua_gettop(L);
     183             : 
     184           0 :         while (!resultQueue.empty()) {
     185           0 :                 LuaJobInfo jobDone = resultQueue.front();
     186           0 :                 resultQueue.pop_front();
     187             : 
     188           0 :                 lua_createtable(L, 0, 2);  // Pre-allocate space for two map fields
     189           0 :                 int top_lvl2 = lua_gettop(L);
     190             : 
     191           0 :                 lua_pushstring(L, "jobid");
     192           0 :                 lua_pushnumber(L, jobDone.id);
     193           0 :                 lua_settable(L, top_lvl2);
     194             : 
     195           0 :                 lua_pushstring(L, "retval");
     196           0 :                 lua_pushlstring(L, jobDone.serializedResult.data(),
     197           0 :                         jobDone.serializedResult.size());
     198           0 :                 lua_settable(L, top_lvl2);
     199             : 
     200           0 :                 lua_rawseti(L, top, index++);
     201             :         }
     202             : 
     203           0 :         resultQueueMutex.Unlock();
     204           0 : }
     205             : 
     206             : /******************************************************************************/
     207           4 : void AsyncEngine::prepareEnvironment(lua_State* L, int top)
     208             : {
     209         336 :         for (std::map<std::string, lua_CFunction>::iterator it = functionList.begin();
     210         224 :                         it != functionList.end(); it++) {
     211         108 :                 lua_pushstring(L, it->first.c_str());
     212         108 :                 lua_pushcfunction(L, it->second);
     213         108 :                 lua_settable(L, top);
     214             :         }
     215           4 : }
     216             : 
     217             : /******************************************************************************/
     218           4 : AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
     219             :                 unsigned int threadNum) :
     220             :         ScriptApiBase(),
     221             :         jobDispatcher(jobDispatcher),
     222           4 :         threadnum(threadNum)
     223             : {
     224           4 :         lua_State *L = getStack();
     225             : 
     226             :         // Prepare job lua environment
     227           4 :         lua_getglobal(L, "core");
     228           4 :         int top = lua_gettop(L);
     229             : 
     230             :         // Push builtin initialization type
     231           4 :         lua_pushstring(L, "async");
     232           4 :         lua_setglobal(L, "INIT");
     233             : 
     234           4 :         jobDispatcher->prepareEnvironment(L, top);
     235           4 : }
     236             : 
     237             : /******************************************************************************/
     238          12 : AsyncWorkerThread::~AsyncWorkerThread()
     239             : {
     240           4 :         sanity_check(IsRunning() == false);
     241           8 : }
     242             : 
     243             : /******************************************************************************/
     244           4 : void* AsyncWorkerThread::Thread()
     245             : {
     246           4 :         ThreadStarted();
     247             : 
     248             :         // Register thread for error logging
     249             :         char number[21];
     250           4 :         snprintf(number, sizeof(number), "%u", threadnum);
     251           4 :         log_register_thread(std::string("AsyncWorkerThread_") + number);
     252             : 
     253           4 :         porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
     254             : 
     255           4 :         lua_State *L = getStack();
     256             : 
     257           8 :         std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
     258           4 :         if (!loadScript(script)) {
     259             :                 errorstream
     260           0 :                         << "AsyncWorkderThread execution of async base environment failed!"
     261           0 :                         << std::endl;
     262           0 :                 abort();
     263             :         }
     264             : 
     265           4 :         lua_getglobal(L, "core");
     266           4 :         if (lua_isnil(L, -1)) {
     267           0 :                 errorstream << "Unable to find core within async environment!";
     268           0 :                 abort();
     269             :         }
     270             : 
     271             :         // Main loop
     272          12 :         while (!StopRequested()) {
     273             :                 // Wait for job
     274           4 :                 LuaJobInfo toProcess = jobDispatcher->getJob();
     275             : 
     276           4 :                 if (toProcess.valid == false || StopRequested()) {
     277           4 :                         continue;
     278             :                 }
     279             : 
     280           0 :                 lua_getfield(L, -1, "job_processor");
     281           0 :                 if (lua_isnil(L, -1)) {
     282           0 :                         errorstream << "Unable to get async job processor!" << std::endl;
     283           0 :                         abort();
     284             :                 }
     285             : 
     286           0 :                 luaL_checktype(L, -1, LUA_TFUNCTION);
     287             : 
     288             :                 // Call it
     289           0 :                 lua_pushlstring(L,
     290             :                                 toProcess.serializedFunction.data(),
     291           0 :                                 toProcess.serializedFunction.size());
     292           0 :                 lua_pushlstring(L,
     293             :                                 toProcess.serializedParams.data(),
     294           0 :                                 toProcess.serializedParams.size());
     295             : 
     296           0 :                 if (lua_pcall(L, 2, 1, m_errorhandler)) {
     297           0 :                         scriptError();
     298           0 :                         toProcess.serializedResult = "";
     299             :                 } else {
     300             :                         // Fetch result
     301             :                         size_t length;
     302           0 :                         const char *retval = lua_tolstring(L, -1, &length);
     303           0 :                         toProcess.serializedResult = std::string(retval, length);
     304             :                 }
     305             : 
     306           0 :                 lua_pop(L, 1);  // Pop retval
     307             : 
     308             :                 // Put job result
     309           0 :                 jobDispatcher->putJobResult(toProcess);
     310             :         }
     311             : 
     312           4 :         lua_pop(L, 1);  // Pop core
     313             : 
     314           4 :         log_deregister_thread();
     315             : 
     316           8 :         return 0;
     317           3 : }
     318             : 

Generated by: LCOV version 1.11