Line data Source code
1 : /*
2 : Minetest
3 : Copyright (C) 2010-2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4 :
5 : This program is free software; you can redistribute it and/or modify
6 : it under the terms of the GNU Lesser General Public License as published by
7 : the Free Software Foundation; either version 2.1 of the License, or
8 : (at your option) any later version.
9 :
10 : This program is distributed in the hope that it will be useful,
11 : but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : GNU Lesser General Public License for more details.
14 :
15 : You should have received a copy of the GNU Lesser General Public License along
16 : with this program; if not, write to the Free Software Foundation, Inc.,
17 : 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #ifndef UTIL_THREAD_HEADER
21 : #define UTIL_THREAD_HEADER
22 :
23 : #include "../irrlichttypes.h"
24 : #include "../jthread/jthread.h"
25 : #include "../jthread/jmutex.h"
26 : #include "../jthread/jmutexautolock.h"
27 : #include "porting.h"
28 : #include "log.h"
29 :
30 : template<typename T>
31 0 : class MutexedVariable
32 : {
33 : public:
34 0 : MutexedVariable(T value):
35 0 : m_value(value)
36 : {
37 0 : }
38 :
39 0 : T get()
40 : {
41 0 : JMutexAutoLock lock(m_mutex);
42 0 : return m_value;
43 : }
44 :
45 0 : void set(T value)
46 : {
47 0 : JMutexAutoLock lock(m_mutex);
48 0 : m_value = value;
49 0 : }
50 :
51 : // You'll want to grab this in a SharedPtr
52 : JMutexAutoLock * getLock()
53 : {
54 : return new JMutexAutoLock(m_mutex);
55 : }
56 :
57 : // You pretty surely want to grab the lock when accessing this
58 : T m_value;
59 :
60 : private:
61 : JMutex m_mutex;
62 : };
63 :
64 : /*
65 : A single worker thread - multiple client threads queue framework.
66 : */
67 : template<typename Key, typename T, typename Caller, typename CallerData>
68 20 : class GetResult
69 : {
70 : public:
71 : Key key;
72 : T item;
73 : std::pair<Caller, CallerData> caller;
74 : };
75 :
76 : template<typename Key, typename T, typename Caller, typename CallerData>
77 2 : class ResultQueue: public MutexedQueue< GetResult<Key, T, Caller, CallerData> >
78 : {
79 : };
80 :
81 : template<typename Caller, typename Data, typename Key, typename T>
82 : class CallerInfo
83 : {
84 : public:
85 : Caller caller;
86 : Data data;
87 : ResultQueue< Key, T, Caller, Data>* dest;
88 : };
89 :
90 : template<typename Key, typename T, typename Caller, typename CallerData>
91 8 : class GetRequest
92 : {
93 : public:
94 2 : GetRequest()
95 2 : {
96 2 : }
97 : GetRequest(Key a_key)
98 : {
99 : key = a_key;
100 : }
101 10 : ~GetRequest()
102 : {
103 10 : }
104 :
105 : Key key;
106 : std::list<CallerInfo<Caller, CallerData, Key, T> > callers;
107 : };
108 :
109 : /**
110 : * Notes for RequestQueue usage
111 : * @param Key unique key to identify a request for a specific resource
112 : * @param T ?
113 : * @param Caller unique id of calling thread
114 : * @param CallerData data passed back to caller
115 : */
116 : template<typename Key, typename T, typename Caller, typename CallerData>
117 6 : class RequestQueue
118 : {
119 : public:
120 2334 : bool empty()
121 : {
122 2334 : return m_queue.empty();
123 : }
124 :
125 2 : void add(Key key, Caller caller, CallerData callerdata,
126 : ResultQueue<Key, T, Caller, CallerData> *dest)
127 : {
128 : {
129 4 : JMutexAutoLock lock(m_queue.getMutex());
130 :
131 : /*
132 : If the caller is already on the list, only update CallerData
133 : */
134 4 : for(typename std::deque< GetRequest<Key, T, Caller, CallerData> >::iterator
135 2 : i = m_queue.getQueue().begin();
136 4 : i != m_queue.getQueue().end(); ++i)
137 : {
138 0 : GetRequest<Key, T, Caller, CallerData> &request = *i;
139 :
140 0 : if(request.key == key)
141 : {
142 0 : for(typename std::list< CallerInfo<Caller, CallerData, Key, T> >::iterator
143 0 : i = request.callers.begin();
144 0 : i != request.callers.end(); ++i)
145 : {
146 0 : CallerInfo<Caller, CallerData, Key, T> &ca = *i;
147 0 : if(ca.caller == caller)
148 : {
149 0 : ca.data = callerdata;
150 0 : return;
151 : }
152 : }
153 : CallerInfo<Caller, CallerData, Key, T> ca;
154 0 : ca.caller = caller;
155 0 : ca.data = callerdata;
156 0 : ca.dest = dest;
157 0 : request.callers.push_back(ca);
158 0 : return;
159 : }
160 : }
161 : }
162 :
163 : /*
164 : Else add a new request to the queue
165 : */
166 :
167 4 : GetRequest<Key, T, Caller, CallerData> request;
168 2 : request.key = key;
169 : CallerInfo<Caller, CallerData, Key, T> ca;
170 2 : ca.caller = caller;
171 2 : ca.data = callerdata;
172 2 : ca.dest = dest;
173 2 : request.callers.push_back(ca);
174 :
175 2 : m_queue.push_back(request);
176 : }
177 :
178 : GetRequest<Key, T, Caller, CallerData> pop(unsigned int timeout_ms)
179 : {
180 : return m_queue.pop_front(timeout_ms);
181 : }
182 :
183 2 : GetRequest<Key, T, Caller, CallerData> pop()
184 : {
185 2 : return m_queue.pop_frontNoEx();
186 : }
187 :
188 2 : void pushResult(GetRequest<Key, T, Caller, CallerData> req,
189 : T res) {
190 :
191 10 : for(typename std::list< CallerInfo<Caller, CallerData, Key, T> >::iterator
192 2 : i = req.callers.begin();
193 8 : i != req.callers.end(); ++i)
194 : {
195 2 : CallerInfo<Caller, CallerData, Key, T> &ca = *i;
196 :
197 4 : GetResult<Key,T,Caller,CallerData> result;
198 :
199 2 : result.key = req.key;
200 2 : result.item = res;
201 2 : result.caller.first = ca.caller;
202 2 : result.caller.second = ca.data;
203 :
204 2 : ca.dest->push_back(result);
205 : }
206 2 : }
207 :
208 : private:
209 : MutexedQueue< GetRequest<Key, T, Caller, CallerData> > m_queue;
210 : };
211 :
212 : class UpdateThread : public JThread
213 : {
214 : private:
215 : JSemaphore m_update_sem;
216 :
217 : protected:
218 : virtual void doUpdate() = 0;
219 : virtual const char *getName() = 0;
220 :
221 : public:
222 2 : UpdateThread()
223 2 : {
224 2 : }
225 1 : ~UpdateThread()
226 1 : {}
227 :
228 5828 : void deferUpdate()
229 : {
230 5828 : m_update_sem.Post();
231 5828 : }
232 :
233 2 : void Stop()
234 : {
235 2 : JThread::Stop();
236 :
237 : // give us a nudge
238 2 : m_update_sem.Post();
239 2 : }
240 :
241 2 : void *Thread()
242 : {
243 2 : ThreadStarted();
244 :
245 2 : const char *thread_name = getName();
246 :
247 2 : log_register_thread(thread_name);
248 :
249 3 : DSTACK(__FUNCTION_NAME);
250 :
251 : BEGIN_DEBUG_EXCEPTION_HANDLER
252 :
253 2 : porting::setThreadName(thread_name);
254 :
255 3018 : while (!StopRequested()) {
256 :
257 1509 : m_update_sem.Wait();
258 :
259 : // Empty the queue, just in case doUpdate() is expensive
260 5542 : while (m_update_sem.GetValue()) m_update_sem.Wait();
261 :
262 1508 : if (StopRequested()) break;
263 :
264 1508 : doUpdate();
265 : }
266 0 : END_DEBUG_EXCEPTION_HANDLER(errorstream)
267 :
268 2 : return NULL;
269 : }
270 : };
271 :
272 : #endif
273 :
|