Line data Source code
1 : /*
2 : Minetest
3 : Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
4 :
5 : This program is free software; you can redistribute it and/or modify
6 : it under the terms of the GNU Lesser General Public License as published by
7 : the Free Software Foundation; either version 2.1 of the License, or
8 : (at your option) any later version.
9 :
10 : This program is distributed in the hope that it will be useful,
11 : but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : GNU Lesser General Public License for more details.
14 :
15 : You should have received a copy of the GNU Lesser General Public License along
16 : with this program; if not, write to the Free Software Foundation, Inc.,
17 : 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include <stdio.h>
21 : #include <stdlib.h>
22 :
23 : extern "C" {
24 : #include "lua.h"
25 : #include "lauxlib.h"
26 : #include "lualib.h"
27 : }
28 :
29 : #include "server.h"
30 : #include "s_async.h"
31 : #include "log.h"
32 : #include "filesys.h"
33 : #include "porting.h"
34 : #include "common/c_internal.h"
35 :
36 : /******************************************************************************/
37 1 : AsyncEngine::AsyncEngine() :
38 : initDone(false),
39 1 : jobIdCounter(0)
40 : {
41 1 : }
42 :
43 : /******************************************************************************/
44 2 : AsyncEngine::~AsyncEngine()
45 : {
46 :
47 : // Request all threads to stop
48 15 : for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
49 10 : it != workerThreads.end(); it++) {
50 4 : (*it)->Stop();
51 : }
52 :
53 :
54 : // Wake up all threads
55 15 : for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
56 10 : it != workerThreads.end(); it++) {
57 4 : jobQueueCounter.Post();
58 : }
59 :
60 : // Wait for threads to finish
61 15 : for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
62 10 : it != workerThreads.end(); it++) {
63 4 : (*it)->Wait();
64 : }
65 :
66 : // Force kill all threads
67 15 : for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
68 10 : it != workerThreads.end(); it++) {
69 4 : (*it)->Kill();
70 4 : delete *it;
71 : }
72 :
73 1 : jobQueueMutex.Lock();
74 1 : jobQueue.clear();
75 1 : jobQueueMutex.Unlock();
76 1 : workerThreads.clear();
77 1 : }
78 :
79 : /******************************************************************************/
80 27 : bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
81 : {
82 27 : if (initDone) {
83 0 : return false;
84 : }
85 27 : functionList[name] = func;
86 27 : return true;
87 : }
88 :
89 : /******************************************************************************/
90 1 : void AsyncEngine::initialize(unsigned int numEngines)
91 : {
92 1 : initDone = true;
93 :
94 5 : for (unsigned int i = 0; i < numEngines; i++) {
95 4 : AsyncWorkerThread *toAdd = new AsyncWorkerThread(this, i);
96 4 : workerThreads.push_back(toAdd);
97 4 : toAdd->Start();
98 : }
99 1 : }
100 :
101 : /******************************************************************************/
102 0 : unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params)
103 : {
104 0 : jobQueueMutex.Lock();
105 0 : LuaJobInfo toAdd;
106 0 : toAdd.id = jobIdCounter++;
107 0 : toAdd.serializedFunction = func;
108 0 : toAdd.serializedParams = params;
109 :
110 0 : jobQueue.push_back(toAdd);
111 :
112 0 : jobQueueCounter.Post();
113 :
114 0 : jobQueueMutex.Unlock();
115 :
116 0 : return toAdd.id;
117 : }
118 :
119 : /******************************************************************************/
120 4 : LuaJobInfo AsyncEngine::getJob()
121 : {
122 4 : jobQueueCounter.Wait();
123 4 : jobQueueMutex.Lock();
124 :
125 4 : LuaJobInfo retval;
126 4 : retval.valid = false;
127 :
128 4 : if (!jobQueue.empty()) {
129 0 : retval = jobQueue.front();
130 0 : jobQueue.pop_front();
131 0 : retval.valid = true;
132 : }
133 4 : jobQueueMutex.Unlock();
134 :
135 4 : return retval;
136 : }
137 :
138 : /******************************************************************************/
139 0 : void AsyncEngine::putJobResult(LuaJobInfo result)
140 : {
141 0 : resultQueueMutex.Lock();
142 0 : resultQueue.push_back(result);
143 0 : resultQueueMutex.Unlock();
144 0 : }
145 :
146 : /******************************************************************************/
147 91 : void AsyncEngine::step(lua_State *L, int errorhandler)
148 : {
149 91 : lua_getglobal(L, "core");
150 91 : resultQueueMutex.Lock();
151 91 : while (!resultQueue.empty()) {
152 0 : LuaJobInfo jobDone = resultQueue.front();
153 0 : resultQueue.pop_front();
154 :
155 0 : lua_getfield(L, -1, "async_event_handler");
156 :
157 0 : if (lua_isnil(L, -1)) {
158 0 : FATAL_ERROR("Async event handler does not exist!");
159 : }
160 :
161 0 : luaL_checktype(L, -1, LUA_TFUNCTION);
162 :
163 0 : lua_pushinteger(L, jobDone.id);
164 0 : lua_pushlstring(L, jobDone.serializedResult.data(),
165 0 : jobDone.serializedResult.size());
166 :
167 0 : if (lua_pcall(L, 2, 0, errorhandler)) {
168 0 : script_error(L);
169 : }
170 : }
171 91 : resultQueueMutex.Unlock();
172 91 : lua_pop(L, 1); // Pop core
173 91 : }
174 :
175 : /******************************************************************************/
176 0 : void AsyncEngine::pushFinishedJobs(lua_State* L) {
177 : // Result Table
178 0 : resultQueueMutex.Lock();
179 :
180 0 : unsigned int index = 1;
181 0 : lua_createtable(L, resultQueue.size(), 0);
182 0 : int top = lua_gettop(L);
183 :
184 0 : while (!resultQueue.empty()) {
185 0 : LuaJobInfo jobDone = resultQueue.front();
186 0 : resultQueue.pop_front();
187 :
188 0 : lua_createtable(L, 0, 2); // Pre-allocate space for two map fields
189 0 : int top_lvl2 = lua_gettop(L);
190 :
191 0 : lua_pushstring(L, "jobid");
192 0 : lua_pushnumber(L, jobDone.id);
193 0 : lua_settable(L, top_lvl2);
194 :
195 0 : lua_pushstring(L, "retval");
196 0 : lua_pushlstring(L, jobDone.serializedResult.data(),
197 0 : jobDone.serializedResult.size());
198 0 : lua_settable(L, top_lvl2);
199 :
200 0 : lua_rawseti(L, top, index++);
201 : }
202 :
203 0 : resultQueueMutex.Unlock();
204 0 : }
205 :
206 : /******************************************************************************/
207 4 : void AsyncEngine::prepareEnvironment(lua_State* L, int top)
208 : {
209 336 : for (std::map<std::string, lua_CFunction>::iterator it = functionList.begin();
210 224 : it != functionList.end(); it++) {
211 108 : lua_pushstring(L, it->first.c_str());
212 108 : lua_pushcfunction(L, it->second);
213 108 : lua_settable(L, top);
214 : }
215 4 : }
216 :
217 : /******************************************************************************/
218 4 : AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
219 : unsigned int threadNum) :
220 : ScriptApiBase(),
221 : jobDispatcher(jobDispatcher),
222 4 : threadnum(threadNum)
223 : {
224 4 : lua_State *L = getStack();
225 :
226 : // Prepare job lua environment
227 4 : lua_getglobal(L, "core");
228 4 : int top = lua_gettop(L);
229 :
230 : // Push builtin initialization type
231 4 : lua_pushstring(L, "async");
232 4 : lua_setglobal(L, "INIT");
233 :
234 4 : jobDispatcher->prepareEnvironment(L, top);
235 4 : }
236 :
237 : /******************************************************************************/
238 12 : AsyncWorkerThread::~AsyncWorkerThread()
239 : {
240 4 : sanity_check(IsRunning() == false);
241 8 : }
242 :
243 : /******************************************************************************/
244 4 : void* AsyncWorkerThread::Thread()
245 : {
246 4 : ThreadStarted();
247 :
248 : // Register thread for error logging
249 : char number[21];
250 4 : snprintf(number, sizeof(number), "%u", threadnum);
251 4 : log_register_thread(std::string("AsyncWorkerThread_") + number);
252 :
253 4 : porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
254 :
255 4 : lua_State *L = getStack();
256 :
257 8 : std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
258 4 : if (!loadScript(script)) {
259 : errorstream
260 0 : << "AsyncWorkderThread execution of async base environment failed!"
261 0 : << std::endl;
262 0 : abort();
263 : }
264 :
265 4 : lua_getglobal(L, "core");
266 4 : if (lua_isnil(L, -1)) {
267 0 : errorstream << "Unable to find core within async environment!";
268 0 : abort();
269 : }
270 :
271 : // Main loop
272 12 : while (!StopRequested()) {
273 : // Wait for job
274 4 : LuaJobInfo toProcess = jobDispatcher->getJob();
275 :
276 4 : if (toProcess.valid == false || StopRequested()) {
277 4 : continue;
278 : }
279 :
280 0 : lua_getfield(L, -1, "job_processor");
281 0 : if (lua_isnil(L, -1)) {
282 0 : errorstream << "Unable to get async job processor!" << std::endl;
283 0 : abort();
284 : }
285 :
286 0 : luaL_checktype(L, -1, LUA_TFUNCTION);
287 :
288 : // Call it
289 0 : lua_pushlstring(L,
290 : toProcess.serializedFunction.data(),
291 0 : toProcess.serializedFunction.size());
292 0 : lua_pushlstring(L,
293 : toProcess.serializedParams.data(),
294 0 : toProcess.serializedParams.size());
295 :
296 0 : if (lua_pcall(L, 2, 1, m_errorhandler)) {
297 0 : scriptError();
298 0 : toProcess.serializedResult = "";
299 : } else {
300 : // Fetch result
301 : size_t length;
302 0 : const char *retval = lua_tolstring(L, -1, &length);
303 0 : toProcess.serializedResult = std::string(retval, length);
304 : }
305 :
306 0 : lua_pop(L, 1); // Pop retval
307 :
308 : // Put job result
309 0 : jobDispatcher->putJobResult(toProcess);
310 : }
311 :
312 4 : lua_pop(L, 1); // Pop core
313 :
314 4 : log_deregister_thread();
315 :
316 8 : return 0;
317 3 : }
318 :
|