Line data Source code
1 : /*
2 : Minetest
3 : Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4 :
5 : This program is free software; you can redistribute it and/or modify
6 : it under the terms of the GNU Lesser General Public License as published by
7 : the Free Software Foundation; either version 2.1 of the License, or
8 : (at your option) any later version.
9 :
10 : This program is distributed in the hope that it will be useful,
11 : but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : GNU Lesser General Public License for more details.
14 :
15 : You should have received a copy of the GNU Lesser General Public License along
16 : with this program; if not, write to the Free Software Foundation, Inc.,
17 : 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include <iomanip>
21 : #include <errno.h>
22 : #include "connection.h"
23 : #include "serialization.h"
24 : #include "log.h"
25 : #include "porting.h"
26 : #include "network/networkpacket.h"
27 : #include "util/serialize.h"
28 : #include "util/numeric.h"
29 : #include "util/string.h"
30 : #include "settings.h"
31 : #include "profiler.h"
32 :
33 : namespace con
34 : {
35 :
36 : /******************************************************************************/
37 : /* defines used for debugging and profiling */
38 : /******************************************************************************/
39 : #ifdef NDEBUG
40 : #define LOG(a) a
41 : #define PROFILE(a)
42 : #undef DEBUG_CONNECTION_KBPS
43 : #else
44 : /* this mutex is used to achieve log message consistency */
45 : JMutex log_message_mutex;
46 : #define LOG(a) \
47 : { \
48 : JMutexAutoLock loglock(log_message_mutex); \
49 : a; \
50 : }
51 : #define PROFILE(a) a
52 : //#define DEBUG_CONNECTION_KBPS
53 : #undef DEBUG_CONNECTION_KBPS
54 : #endif
55 :
56 :
57 6769 : static inline float CALC_DTIME(unsigned int lasttime, unsigned int curtime) {
58 6769 : float value = ( curtime - lasttime) / 1000.0;
59 6769 : return MYMAX(MYMIN(value,0.1),0.0);
60 : }
61 :
62 : /* maximum window size to use, 0xFFFF is theoretical maximum don't think about
63 : * touching it, the less you're away from it the more likely data corruption
64 : * will occur
65 : */
66 : #define MAX_RELIABLE_WINDOW_SIZE 0x8000
67 : /* starting value for window size */
68 : #define MIN_RELIABLE_WINDOW_SIZE 0x40
69 :
70 : #define MAX_UDP_PEERS 65535
71 :
72 : #define PING_TIMEOUT 5.0
73 :
74 5202 : static u16 readPeerId(u8 *packetdata)
75 : {
76 5202 : return readU16(&packetdata[4]);
77 : }
78 5202 : static u8 readChannel(u8 *packetdata)
79 : {
80 5202 : return readU8(&packetdata[6]);
81 : }
82 :
83 8348 : BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
84 : u32 protocol_id, u16 sender_peer_id, u8 channel)
85 : {
86 8348 : u32 packet_size = datasize + BASE_HEADER_SIZE;
87 8348 : BufferedPacket p(packet_size);
88 8349 : p.address = address;
89 :
90 8349 : writeU32(&p.data[0], protocol_id);
91 8349 : writeU16(&p.data[4], sender_peer_id);
92 8349 : writeU8(&p.data[6], channel);
93 :
94 8349 : memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
95 :
96 8349 : return p;
97 : }
98 :
99 8347 : BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
100 : u32 protocol_id, u16 sender_peer_id, u8 channel)
101 : {
102 : return makePacket(address, *data, data.getSize(),
103 8347 : protocol_id, sender_peer_id, channel);
104 : }
105 :
106 992 : SharedBuffer<u8> makeOriginalPacket(
107 : SharedBuffer<u8> data)
108 : {
109 992 : u32 header_size = 1;
110 992 : u32 packet_size = data.getSize() + header_size;
111 992 : SharedBuffer<u8> b(packet_size);
112 :
113 992 : writeU8(&(b[0]), TYPE_ORIGINAL);
114 992 : if (data.getSize() > 0) {
115 992 : memcpy(&(b[header_size]), *data, data.getSize());
116 : }
117 992 : return b;
118 : }
119 :
120 0 : std::list<SharedBuffer<u8> > makeSplitPacket(
121 : SharedBuffer<u8> data,
122 : u32 chunksize_max,
123 : u16 seqnum)
124 : {
125 : // Chunk packets, containing the TYPE_SPLIT header
126 0 : std::list<SharedBuffer<u8> > chunks;
127 :
128 0 : u32 chunk_header_size = 7;
129 0 : u32 maximum_data_size = chunksize_max - chunk_header_size;
130 0 : u32 start = 0;
131 0 : u32 end = 0;
132 0 : u32 chunk_num = 0;
133 0 : u16 chunk_count = 0;
134 0 : do{
135 0 : end = start + maximum_data_size - 1;
136 0 : if (end > data.getSize() - 1)
137 0 : end = data.getSize() - 1;
138 :
139 0 : u32 payload_size = end - start + 1;
140 0 : u32 packet_size = chunk_header_size + payload_size;
141 :
142 0 : SharedBuffer<u8> chunk(packet_size);
143 :
144 0 : writeU8(&chunk[0], TYPE_SPLIT);
145 0 : writeU16(&chunk[1], seqnum);
146 : // [3] u16 chunk_count is written at next stage
147 0 : writeU16(&chunk[5], chunk_num);
148 0 : memcpy(&chunk[chunk_header_size], &data[start], payload_size);
149 :
150 0 : chunks.push_back(chunk);
151 0 : chunk_count++;
152 :
153 0 : start = end + 1;
154 0 : chunk_num++;
155 : }
156 0 : while(end != data.getSize() - 1);
157 :
158 0 : for(std::list<SharedBuffer<u8> >::iterator i = chunks.begin();
159 0 : i != chunks.end(); ++i)
160 : {
161 : // Write chunk_count
162 0 : writeU16(&((*i)[3]), chunk_count);
163 : }
164 :
165 0 : return chunks;
166 : }
167 :
168 992 : std::list<SharedBuffer<u8> > makeAutoSplitPacket(
169 : SharedBuffer<u8> data,
170 : u32 chunksize_max,
171 : u16 &split_seqnum)
172 : {
173 992 : u32 original_header_size = 1;
174 992 : std::list<SharedBuffer<u8> > list;
175 992 : if (data.getSize() + original_header_size > chunksize_max)
176 : {
177 0 : list = makeSplitPacket(data, chunksize_max, split_seqnum);
178 0 : split_seqnum++;
179 0 : return list;
180 : }
181 : else
182 : {
183 992 : list.push_back(makeOriginalPacket(data));
184 : }
185 992 : return list;
186 : }
187 :
188 802 : SharedBuffer<u8> makeReliablePacket(
189 : SharedBuffer<u8> data,
190 : u16 seqnum)
191 : {
192 802 : u32 header_size = 3;
193 802 : u32 packet_size = data.getSize() + header_size;
194 802 : SharedBuffer<u8> b(packet_size);
195 :
196 802 : writeU8(&b[0], TYPE_RELIABLE);
197 802 : writeU16(&b[1], seqnum);
198 :
199 802 : memcpy(&b[header_size], *data, data.getSize());
200 :
201 802 : return b;
202 : }
203 :
204 : /*
205 : ReliablePacketBuffer
206 : */
207 :
208 12 : ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {}
209 :
210 0 : void ReliablePacketBuffer::print()
211 : {
212 0 : JMutexAutoLock listlock(m_list_mutex);
213 0 : LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
214 0 : unsigned int index = 0;
215 0 : for(std::list<BufferedPacket>::iterator i = m_list.begin();
216 0 : i != m_list.end();
217 : ++i)
218 : {
219 0 : u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
220 0 : LOG(dout_con<<index<< ":" << s << std::endl);
221 0 : index++;
222 : }
223 0 : }
224 802 : bool ReliablePacketBuffer::empty()
225 : {
226 1604 : JMutexAutoLock listlock(m_list_mutex);
227 1604 : return m_list.empty();
228 : }
229 :
230 21914 : u32 ReliablePacketBuffer::size()
231 : {
232 21914 : return m_list_size;
233 : }
234 :
235 0 : bool ReliablePacketBuffer::containsPacket(u16 seqnum)
236 : {
237 0 : return !(findPacket(seqnum) == m_list.end());
238 : }
239 :
240 802 : RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
241 : {
242 802 : std::list<BufferedPacket>::iterator i = m_list.begin();
243 802 : for(; i != m_list.end(); ++i)
244 : {
245 802 : u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
246 : /*dout_con<<"findPacket(): finding seqnum="<<seqnum
247 : <<", comparing to s="<<s<<std::endl;*/
248 802 : if (s == seqnum)
249 802 : break;
250 : }
251 802 : return i;
252 : }
253 1604 : RPBSearchResult ReliablePacketBuffer::notFound()
254 : {
255 1604 : return m_list.end();
256 : }
257 26716 : bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
258 : {
259 53433 : JMutexAutoLock listlock(m_list_mutex);
260 26717 : if (m_list.empty())
261 25245 : return false;
262 2944 : BufferedPacket p = *m_list.begin();
263 1472 : result = readU16(&p.data[BASE_HEADER_SIZE+1]);
264 1472 : return true;
265 : }
266 :
267 407 : BufferedPacket ReliablePacketBuffer::popFirst()
268 : {
269 814 : JMutexAutoLock listlock(m_list_mutex);
270 407 : if (m_list.empty())
271 0 : throw NotFoundException("Buffer is empty");
272 407 : BufferedPacket p = *m_list.begin();
273 407 : m_list.erase(m_list.begin());
274 407 : --m_list_size;
275 :
276 407 : if (m_list_size == 0) {
277 1 : m_oldest_non_answered_ack = 0;
278 : } else {
279 : m_oldest_non_answered_ack =
280 406 : readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);
281 : }
282 814 : return p;
283 : }
284 802 : BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
285 : {
286 1604 : JMutexAutoLock listlock(m_list_mutex);
287 802 : RPBSearchResult r = findPacket(seqnum);
288 802 : if (r == notFound()) {
289 0 : LOG(dout_con<<"Sequence number: " << seqnum
290 0 : << " not found in reliable buffer"<<std::endl);
291 0 : throw NotFoundException("seqnum not found in buffer");
292 : }
293 802 : BufferedPacket p = *r;
294 :
295 :
296 802 : RPBSearchResult next = r;
297 802 : next++;
298 802 : if (next != notFound()) {
299 122 : u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1]));
300 122 : m_oldest_non_answered_ack = s;
301 : }
302 :
303 802 : m_list.erase(r);
304 802 : --m_list_size;
305 :
306 802 : if (m_list_size == 0)
307 680 : { m_oldest_non_answered_ack = 0; }
308 : else
309 122 : { m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]); }
310 1604 : return p;
311 : }
312 1209 : void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
313 : {
314 1737 : JMutexAutoLock listlock(m_list_mutex);
315 1209 : FATAL_ERROR_IF(p.data.getSize() < BASE_HEADER_SIZE+3, "Invalid data size");
316 1209 : u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
317 1209 : sanity_check(type == TYPE_RELIABLE);
318 1209 : u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
319 :
320 1209 : sanity_check(seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE));
321 1209 : sanity_check(seqnum != next_expected);
322 :
323 1209 : ++m_list_size;
324 1209 : sanity_check(m_list_size <= SEQNUM_MAX+1); // FIXME: Handle the error?
325 :
326 : // Find the right place for the packet and insert it there
327 : // If list is empty, just add it
328 1209 : if (m_list.empty())
329 : {
330 681 : m_list.push_back(p);
331 681 : m_oldest_non_answered_ack = seqnum;
332 : // Done.
333 681 : return;
334 : }
335 :
336 : // Otherwise find the right place
337 528 : std::list<BufferedPacket>::iterator i = m_list.begin();
338 : // Find the first packet in the list which has a higher seqnum
339 528 : u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
340 :
341 : /* case seqnum is smaller then next_expected seqnum */
342 : /* this is true e.g. on wrap around */
343 528 : if (seqnum < next_expected) {
344 360 : while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
345 121 : i++;
346 121 : if (i != m_list.end())
347 3 : s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
348 : }
349 : }
350 : /* non wrap around case (at least for incoming and next_expected */
351 : else
352 : {
353 81046 : while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
354 40318 : i++;
355 40318 : if (i != m_list.end())
356 40052 : s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
357 : }
358 : }
359 :
360 528 : if (s == seqnum) {
361 0 : if (
362 0 : (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
363 0 : (i->data.getSize() != p.data.getSize()) ||
364 0 : (i->address != p.address)
365 : )
366 : {
367 : /* if this happens your maximum transfer window may be to big */
368 0 : fprintf(stderr,
369 : "Duplicated seqnum %d non matching packet detected:\n",
370 0 : seqnum);
371 0 : fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n",
372 0 : readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(),
373 0 : i->address.serializeString().c_str());
374 0 : fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n",
375 0 : readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(),
376 0 : p.address.serializeString().c_str());
377 0 : throw IncomingDataCorruption("duplicated packet isn't same as original one");
378 : }
379 :
380 0 : sanity_check(readU16(&(i->data[BASE_HEADER_SIZE+1])) == seqnum);
381 0 : sanity_check(i->data.getSize() == p.data.getSize());
382 0 : sanity_check(i->address == p.address);
383 :
384 : /* nothing to do this seems to be a resent packet */
385 : /* for paranoia reason data should be compared */
386 0 : --m_list_size;
387 : }
388 : /* insert or push back */
389 528 : else if (i != m_list.end()) {
390 144 : m_list.insert(i, p);
391 : }
392 : else {
393 384 : m_list.push_back(p);
394 : }
395 :
396 : /* update last packet number */
397 528 : m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);
398 : }
399 :
400 10152 : void ReliablePacketBuffer::incrementTimeouts(float dtime)
401 : {
402 20304 : JMutexAutoLock listlock(m_list_mutex);
403 30723 : for(std::list<BufferedPacket>::iterator i = m_list.begin();
404 20482 : i != m_list.end(); ++i)
405 : {
406 89 : i->time += dtime;
407 89 : i->totaltime += dtime;
408 : }
409 10152 : }
410 :
411 10152 : std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
412 : unsigned int max_packets)
413 : {
414 20304 : JMutexAutoLock listlock(m_list_mutex);
415 10152 : std::list<BufferedPacket> timed_outs;
416 30723 : for(std::list<BufferedPacket>::iterator i = m_list.begin();
417 20482 : i != m_list.end(); ++i)
418 : {
419 89 : if (i->time >= timeout) {
420 0 : timed_outs.push_back(*i);
421 :
422 : //this packet will be sent right afterwards reset timeout here
423 0 : i->time = 0.0;
424 0 : if (timed_outs.size() >= max_packets)
425 0 : break;
426 : }
427 : }
428 20304 : return timed_outs;
429 : }
430 :
431 : /*
432 : IncomingSplitBuffer
433 : */
434 :
435 12 : IncomingSplitBuffer::~IncomingSplitBuffer()
436 : {
437 12 : JMutexAutoLock listlock(m_map_mutex);
438 18 : for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
439 12 : i != m_buf.end(); ++i)
440 : {
441 0 : delete i->second;
442 : }
443 6 : }
444 : /*
445 : This will throw a GotSplitPacketException when a full
446 : split packet is constructed.
447 : */
448 3331 : SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
449 : {
450 6662 : JMutexAutoLock listlock(m_map_mutex);
451 3331 : u32 headersize = BASE_HEADER_SIZE + 7;
452 3331 : FATAL_ERROR_IF(p.data.getSize() < headersize, "Invalid data size");
453 3331 : u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
454 3331 : sanity_check(type == TYPE_SPLIT);
455 3331 : u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
456 3331 : u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
457 3331 : u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
458 :
459 : // Add if doesn't exist
460 3331 : if (m_buf.find(seqnum) == m_buf.end())
461 : {
462 672 : IncomingSplitPacket *sp = new IncomingSplitPacket();
463 672 : sp->chunk_count = chunk_count;
464 672 : sp->reliable = reliable;
465 672 : m_buf[seqnum] = sp;
466 : }
467 :
468 3331 : IncomingSplitPacket *sp = m_buf[seqnum];
469 :
470 : // TODO: These errors should be thrown or something? Dunno.
471 3331 : if (chunk_count != sp->chunk_count)
472 0 : LOG(derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
473 : <<" != sp->chunk_count="<<sp->chunk_count
474 0 : <<std::endl);
475 3331 : if (reliable != sp->reliable)
476 0 : LOG(derr_con<<"Connection: WARNING: reliable="<<reliable
477 : <<" != sp->reliable="<<sp->reliable
478 0 : <<std::endl);
479 :
480 : // If chunk already exists, ignore it.
481 : // Sometimes two identical packets may arrive when there is network
482 : // lag and the server re-sends stuff.
483 3331 : if (sp->chunks.find(chunk_num) != sp->chunks.end())
484 0 : return SharedBuffer<u8>();
485 :
486 : // Cut chunk data out of packet
487 3331 : u32 chunkdatasize = p.data.getSize() - headersize;
488 6662 : SharedBuffer<u8> chunkdata(chunkdatasize);
489 3331 : memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
490 :
491 : // Set chunk data in buffer
492 3331 : sp->chunks[chunk_num] = chunkdata;
493 :
494 : // If not all chunks are received, return empty buffer
495 3331 : if (sp->allReceived() == false)
496 2659 : return SharedBuffer<u8>();
497 :
498 : // Calculate total size
499 672 : u32 totalsize = 0;
500 12009 : for(std::map<u16, SharedBuffer<u8> >::iterator i = sp->chunks.begin();
501 8006 : i != sp->chunks.end(); ++i)
502 : {
503 3331 : totalsize += i->second.getSize();
504 : }
505 :
506 1344 : SharedBuffer<u8> fulldata(totalsize);
507 :
508 : // Copy chunks to data buffer
509 672 : u32 start = 0;
510 4003 : for(u32 chunk_i=0; chunk_i<sp->chunk_count;
511 : chunk_i++)
512 : {
513 6662 : SharedBuffer<u8> buf = sp->chunks[chunk_i];
514 3331 : u16 chunkdatasize = buf.getSize();
515 3331 : memcpy(&fulldata[start], *buf, chunkdatasize);
516 3331 : start += chunkdatasize;;
517 : }
518 :
519 : // Remove sp from buffer
520 672 : m_buf.erase(seqnum);
521 672 : delete sp;
522 :
523 672 : return fulldata;
524 : }
525 10152 : void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
526 : {
527 20304 : std::list<u16> remove_queue;
528 : {
529 20304 : JMutexAutoLock listlock(m_map_mutex);
530 32292 : for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
531 21528 : i != m_buf.end(); ++i)
532 : {
533 612 : IncomingSplitPacket *p = i->second;
534 : // Reliable ones are not removed by timeout
535 612 : if (p->reliable == true)
536 609 : continue;
537 3 : p->time += dtime;
538 3 : if (p->time >= timeout)
539 0 : remove_queue.push_back(i->first);
540 : }
541 : }
542 30456 : for(std::list<u16>::iterator j = remove_queue.begin();
543 20304 : j != remove_queue.end(); ++j)
544 : {
545 0 : JMutexAutoLock listlock(m_map_mutex);
546 0 : LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl);
547 0 : delete m_buf[*j];
548 0 : m_buf.erase(*j);
549 : }
550 10152 : }
551 :
552 : /*
553 : Channel
554 : */
555 :
556 6 : Channel::Channel() :
557 : window_size(MIN_RELIABLE_WINDOW_SIZE),
558 : next_incoming_seqnum(SEQNUM_INITIAL),
559 : next_outgoing_seqnum(SEQNUM_INITIAL),
560 : next_outgoing_split_seqnum(SEQNUM_INITIAL),
561 : current_packet_loss(0),
562 : current_packet_too_late(0),
563 : current_packet_successfull(0),
564 : packet_loss_counter(0),
565 : current_bytes_transfered(0),
566 : current_bytes_received(0),
567 : current_bytes_lost(0),
568 : max_kbps(0.0),
569 : cur_kbps(0.0),
570 : avg_kbps(0.0),
571 : max_incoming_kbps(0.0),
572 : cur_incoming_kbps(0.0),
573 : avg_incoming_kbps(0.0),
574 : max_kbps_lost(0.0),
575 : cur_kbps_lost(0.0),
576 : avg_kbps_lost(0.0),
577 : bpm_counter(0.0),
578 6 : rate_samples(0)
579 : {
580 6 : }
581 :
582 6 : Channel::~Channel()
583 : {
584 6 : }
585 :
586 18825 : u16 Channel::readNextIncomingSeqNum()
587 : {
588 37651 : JMutexAutoLock internal(m_internal_mutex);
589 37652 : return next_incoming_seqnum;
590 : }
591 :
592 3609 : u16 Channel::incNextIncomingSeqNum()
593 : {
594 7218 : JMutexAutoLock internal(m_internal_mutex);
595 3609 : u16 retval = next_incoming_seqnum;
596 3609 : next_incoming_seqnum++;
597 7218 : return retval;
598 : }
599 :
600 793 : u16 Channel::readNextSplitSeqNum()
601 : {
602 1586 : JMutexAutoLock internal(m_internal_mutex);
603 1586 : return next_outgoing_split_seqnum;
604 : }
605 992 : void Channel::setNextSplitSeqNum(u16 seqnum)
606 : {
607 1984 : JMutexAutoLock internal(m_internal_mutex);
608 992 : next_outgoing_split_seqnum = seqnum;
609 992 : }
610 :
611 802 : u16 Channel::getOutgoingSequenceNumber(bool& successful)
612 : {
613 1604 : JMutexAutoLock internal(m_internal_mutex);
614 802 : u16 retval = next_outgoing_seqnum;
615 : u16 lowest_unacked_seqnumber;
616 :
617 : /* shortcut if there ain't any packet in outgoing list */
618 802 : if (outgoing_reliables_sent.empty())
619 : {
620 794 : next_outgoing_seqnum++;
621 794 : return retval;
622 : }
623 :
624 8 : if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber))
625 : {
626 8 : if (lowest_unacked_seqnumber < next_outgoing_seqnum) {
627 : // ugly cast but this one is required in order to tell compiler we
628 : // know about difference of two unsigned may be negative in general
629 : // but we already made sure it won't happen in this case
630 8 : if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > window_size) {
631 0 : successful = false;
632 0 : return 0;
633 : }
634 : }
635 : else {
636 : // ugly cast but this one is required in order to tell compiler we
637 : // know about difference of two unsigned may be negative in general
638 : // but we already made sure it won't happen in this case
639 0 : if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
640 0 : window_size) {
641 0 : successful = false;
642 0 : return 0;
643 : }
644 : }
645 : }
646 :
647 8 : next_outgoing_seqnum++;
648 8 : return retval;
649 : }
650 :
651 802 : u16 Channel::readOutgoingSequenceNumber()
652 : {
653 1604 : JMutexAutoLock internal(m_internal_mutex);
654 1604 : return next_outgoing_seqnum;
655 : }
656 :
657 0 : bool Channel::putBackSequenceNumber(u16 seqnum)
658 : {
659 0 : if (((seqnum + 1) % (SEQNUM_MAX+1)) == next_outgoing_seqnum) {
660 :
661 0 : next_outgoing_seqnum = seqnum;
662 0 : return true;
663 : }
664 0 : return false;
665 : }
666 :
667 802 : void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets)
668 : {
669 1604 : JMutexAutoLock internal(m_internal_mutex);
670 802 : current_bytes_transfered += bytes;
671 802 : current_packet_successfull += packets;
672 802 : }
673 :
674 4795 : void Channel::UpdateBytesReceived(unsigned int bytes) {
675 9590 : JMutexAutoLock internal(m_internal_mutex);
676 4795 : current_bytes_received += bytes;
677 4795 : }
678 :
679 0 : void Channel::UpdateBytesLost(unsigned int bytes)
680 : {
681 0 : JMutexAutoLock internal(m_internal_mutex);
682 0 : current_bytes_lost += bytes;
683 0 : }
684 :
685 :
686 10152 : void Channel::UpdatePacketLossCounter(unsigned int count)
687 : {
688 20304 : JMutexAutoLock internal(m_internal_mutex);
689 10152 : current_packet_loss += count;
690 10152 : }
691 :
692 0 : void Channel::UpdatePacketTooLateCounter()
693 : {
694 0 : JMutexAutoLock internal(m_internal_mutex);
695 0 : current_packet_too_late++;
696 0 : }
697 :
698 10152 : void Channel::UpdateTimers(float dtime,bool legacy_peer)
699 : {
700 10152 : bpm_counter += dtime;
701 10152 : packet_loss_counter += dtime;
702 :
703 10152 : if (packet_loss_counter > 1.0)
704 : {
705 132 : packet_loss_counter -= 1.0;
706 :
707 132 : unsigned int packet_loss = 11; /* use a neutral value for initialization */
708 132 : unsigned int packets_successfull = 0;
709 : //unsigned int packet_too_late = 0;
710 :
711 132 : bool reasonable_amount_of_data_transmitted = false;
712 :
713 : {
714 264 : JMutexAutoLock internal(m_internal_mutex);
715 132 : packet_loss = current_packet_loss;
716 : //packet_too_late = current_packet_too_late;
717 132 : packets_successfull = current_packet_successfull;
718 :
719 132 : if (current_bytes_transfered > (unsigned int) (window_size*512/2))
720 : {
721 23 : reasonable_amount_of_data_transmitted = true;
722 : }
723 132 : current_packet_loss = 0;
724 132 : current_packet_too_late = 0;
725 132 : current_packet_successfull = 0;
726 : }
727 :
728 : /* dynamic window size is only available for non legacy peers */
729 132 : if (!legacy_peer) {
730 0 : float successfull_to_lost_ratio = 0.0;
731 0 : bool done = false;
732 :
733 0 : if (packets_successfull > 0) {
734 0 : successfull_to_lost_ratio = packet_loss/packets_successfull;
735 : }
736 0 : else if (packet_loss > 0)
737 : {
738 0 : window_size = MYMAX(
739 : (window_size - 10),
740 0 : MIN_RELIABLE_WINDOW_SIZE);
741 0 : done = true;
742 : }
743 :
744 0 : if (!done)
745 : {
746 0 : if ((successfull_to_lost_ratio < 0.01) &&
747 0 : (window_size < MAX_RELIABLE_WINDOW_SIZE))
748 : {
749 : /* don't even think about increasing if we didn't even
750 : * use major parts of our window */
751 0 : if (reasonable_amount_of_data_transmitted)
752 0 : window_size = MYMIN(
753 : (window_size + 100),
754 0 : MAX_RELIABLE_WINDOW_SIZE);
755 : }
756 0 : else if ((successfull_to_lost_ratio < 0.05) &&
757 0 : (window_size < MAX_RELIABLE_WINDOW_SIZE))
758 : {
759 : /* don't even think about increasing if we didn't even
760 : * use major parts of our window */
761 0 : if (reasonable_amount_of_data_transmitted)
762 0 : window_size = MYMIN(
763 : (window_size + 50),
764 0 : MAX_RELIABLE_WINDOW_SIZE);
765 : }
766 0 : else if (successfull_to_lost_ratio > 0.15)
767 : {
768 0 : window_size = MYMAX(
769 : (window_size - 100),
770 0 : MIN_RELIABLE_WINDOW_SIZE);
771 : }
772 0 : else if (successfull_to_lost_ratio > 0.1)
773 : {
774 0 : window_size = MYMAX(
775 : (window_size - 50),
776 0 : MIN_RELIABLE_WINDOW_SIZE);
777 : }
778 : }
779 : }
780 : }
781 :
782 10152 : if (bpm_counter > 10.0)
783 : {
784 : {
785 24 : JMutexAutoLock internal(m_internal_mutex);
786 : cur_kbps =
787 12 : (((float) current_bytes_transfered)/bpm_counter)/1024.0;
788 12 : current_bytes_transfered = 0;
789 : cur_kbps_lost =
790 12 : (((float) current_bytes_lost)/bpm_counter)/1024.0;
791 12 : current_bytes_lost = 0;
792 : cur_incoming_kbps =
793 12 : (((float) current_bytes_received)/bpm_counter)/1024.0;
794 12 : current_bytes_received = 0;
795 12 : bpm_counter = 0;
796 : }
797 :
798 12 : if (cur_kbps > max_kbps)
799 : {
800 6 : max_kbps = cur_kbps;
801 : }
802 :
803 12 : if (cur_kbps_lost > max_kbps_lost)
804 : {
805 0 : max_kbps_lost = cur_kbps_lost;
806 : }
807 :
808 12 : if (cur_incoming_kbps > max_incoming_kbps) {
809 6 : max_incoming_kbps = cur_incoming_kbps;
810 : }
811 :
812 12 : rate_samples = MYMIN(rate_samples+1,10);
813 12 : float old_fraction = ((float) (rate_samples-1) )/( (float) rate_samples);
814 24 : avg_kbps = avg_kbps * old_fraction +
815 24 : cur_kbps * (1.0 - old_fraction);
816 24 : avg_kbps_lost = avg_kbps_lost * old_fraction +
817 24 : cur_kbps_lost * (1.0 - old_fraction);
818 24 : avg_incoming_kbps = avg_incoming_kbps * old_fraction +
819 24 : cur_incoming_kbps * (1.0 - old_fraction);
820 : }
821 10152 : }
822 :
823 :
824 : /*
825 : Peer
826 : */
827 :
828 0 : PeerHelper::PeerHelper() :
829 0 : m_peer(0)
830 0 : {}
831 :
832 29929 : PeerHelper::PeerHelper(Peer* peer) :
833 29929 : m_peer(peer)
834 : {
835 29929 : if (peer != NULL)
836 : {
837 29928 : if (!peer->IncUseCount())
838 : {
839 0 : m_peer = 0;
840 : }
841 : }
842 29929 : }
843 :
844 59851 : PeerHelper::~PeerHelper()
845 : {
846 29923 : if (m_peer != 0)
847 29923 : m_peer->DecUseCount();
848 :
849 29928 : m_peer = 0;
850 29928 : }
851 :
852 0 : PeerHelper& PeerHelper::operator=(Peer* peer)
853 : {
854 0 : m_peer = peer;
855 0 : if (peer != NULL)
856 : {
857 0 : if (!peer->IncUseCount())
858 : {
859 0 : m_peer = 0;
860 : }
861 : }
862 0 : return *this;
863 : }
864 :
865 57434 : Peer* PeerHelper::operator->() const
866 : {
867 57434 : return m_peer;
868 : }
869 :
870 155227 : Peer* PeerHelper::operator&() const
871 : {
872 155227 : return m_peer;
873 : }
874 :
875 29928 : bool PeerHelper::operator!() {
876 29928 : return ! m_peer;
877 : }
878 :
879 1 : bool PeerHelper::operator!=(void* ptr)
880 : {
881 1 : return ((void*) m_peer != ptr);
882 : }
883 :
884 29928 : bool Peer::IncUseCount()
885 : {
886 59856 : JMutexAutoLock lock(m_exclusive_access_mutex);
887 :
888 29928 : if (!m_pending_deletion)
889 : {
890 29928 : this->m_usage++;
891 29928 : return true;
892 : }
893 :
894 0 : return false;
895 : }
896 :
897 29923 : void Peer::DecUseCount()
898 : {
899 : {
900 29923 : JMutexAutoLock lock(m_exclusive_access_mutex);
901 29928 : sanity_check(m_usage > 0);
902 29928 : m_usage--;
903 :
904 29928 : if (!((m_pending_deletion) && (m_usage == 0)))
905 29928 : return;
906 : }
907 0 : delete this;
908 : }
909 :
910 510 : void Peer::RTTStatistics(float rtt, std::string profiler_id,
911 : unsigned int num_samples) {
912 :
913 510 : if (m_last_rtt > 0) {
914 : /* set min max values */
915 509 : if (rtt < m_rtt.min_rtt)
916 1 : m_rtt.min_rtt = rtt;
917 509 : if (rtt >= m_rtt.max_rtt)
918 395 : m_rtt.max_rtt = rtt;
919 :
920 : /* do average calculation */
921 509 : if (m_rtt.avg_rtt < 0.0)
922 1 : m_rtt.avg_rtt = rtt;
923 : else
924 1016 : m_rtt.avg_rtt = m_rtt.avg_rtt * (num_samples/(num_samples-1)) +
925 1016 : rtt * (1/num_samples);
926 :
927 : /* do jitter calculation */
928 :
929 : //just use some neutral value at beginning
930 509 : float jitter = m_rtt.jitter_min;
931 :
932 509 : if (rtt > m_last_rtt)
933 4 : jitter = rtt-m_last_rtt;
934 :
935 509 : if (rtt <= m_last_rtt)
936 505 : jitter = m_last_rtt - rtt;
937 :
938 509 : if (jitter < m_rtt.jitter_min)
939 1 : m_rtt.jitter_min = jitter;
940 509 : if (jitter >= m_rtt.jitter_max)
941 397 : m_rtt.jitter_max = jitter;
942 :
943 509 : if (m_rtt.jitter_avg < 0.0)
944 1 : m_rtt.jitter_avg = jitter;
945 : else
946 1016 : m_rtt.jitter_avg = m_rtt.jitter_avg * (num_samples/(num_samples-1)) +
947 1016 : jitter * (1/num_samples);
948 :
949 509 : if (profiler_id != "")
950 : {
951 509 : g_profiler->graphAdd(profiler_id + "_rtt", rtt);
952 509 : g_profiler->graphAdd(profiler_id + "_jitter", jitter);
953 : }
954 : }
955 : /* save values required for next loop */
956 510 : m_last_rtt = rtt;
957 510 : }
958 :
959 3384 : bool Peer::isTimedOut(float timeout)
960 : {
961 6768 : JMutexAutoLock lock(m_exclusive_access_mutex);
962 3384 : u32 current_time = porting::getTimeMs();
963 :
964 3384 : float dtime = CALC_DTIME(m_last_timeout_check,current_time);
965 3384 : m_last_timeout_check = current_time;
966 :
967 3384 : m_timeout_counter += dtime;
968 :
969 6768 : return m_timeout_counter > timeout;
970 : }
971 :
972 0 : void Peer::Drop()
973 : {
974 : {
975 0 : JMutexAutoLock usage_lock(m_exclusive_access_mutex);
976 0 : m_pending_deletion = true;
977 0 : if (m_usage != 0)
978 0 : return;
979 : }
980 :
981 : PROFILE(std::stringstream peerIdentifier1);
982 : PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc()
983 : << ";" << id << ";RELIABLE]");
984 : PROFILE(g_profiler->remove(peerIdentifier1.str()));
985 : PROFILE(std::stringstream peerIdentifier2);
986 : PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc()
987 : << ";" << id << ";RELIABLE]");
988 : PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier2.str(), SPT_AVG));
989 :
990 0 : delete this;
991 : }
992 :
993 2 : UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) :
994 : Peer(a_address,a_id,connection),
995 : m_pending_disconnect(false),
996 : resend_timeout(0.5),
997 2 : m_legacy_peer(true)
998 : {
999 2 : }
1000 :
1001 12350 : bool UDPPeer::getAddress(MTProtocols type,Address& toset)
1002 : {
1003 12350 : if ((type == MTP_UDP) || (type == MTP_MINETEST_RELIABLE_UDP) || (type == MTP_PRIMARY))
1004 : {
1005 12350 : toset = address;
1006 12350 : return true;
1007 : }
1008 :
1009 0 : return false;
1010 : }
1011 :
1012 0 : void UDPPeer::setNonLegacyPeer()
1013 : {
1014 0 : m_legacy_peer = false;
1015 0 : for(unsigned int i=0; i< CHANNEL_COUNT; i++)
1016 : {
1017 0 : channels->setWindowSize(g_settings->getU16("max_packets_per_iteration"));
1018 : }
1019 0 : }
1020 :
1021 510 : void UDPPeer::reportRTT(float rtt)
1022 : {
1023 510 : if (rtt < 0.0) {
1024 0 : return;
1025 : }
1026 510 : RTTStatistics(rtt,"rudp",MAX_RELIABLE_WINDOW_SIZE*10);
1027 :
1028 510 : float timeout = getStat(AVG_RTT) * RESEND_TIMEOUT_FACTOR;
1029 510 : if (timeout < RESEND_TIMEOUT_MIN)
1030 510 : timeout = RESEND_TIMEOUT_MIN;
1031 510 : if (timeout > RESEND_TIMEOUT_MAX)
1032 0 : timeout = RESEND_TIMEOUT_MAX;
1033 :
1034 1020 : JMutexAutoLock usage_lock(m_exclusive_access_mutex);
1035 510 : resend_timeout = timeout;
1036 : }
1037 :
1038 3384 : bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
1039 : {
1040 3384 : m_ping_timer += dtime;
1041 3384 : if (m_ping_timer >= PING_TIMEOUT)
1042 : {
1043 : // Create and send PING packet
1044 8 : writeU8(&data[0], TYPE_CONTROL);
1045 8 : writeU8(&data[1], CONTROLTYPE_PING);
1046 8 : m_ping_timer = 0.0;
1047 8 : return true;
1048 : }
1049 3376 : return false;
1050 : }
1051 :
1052 793 : void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
1053 : unsigned int max_packet_size)
1054 : {
1055 793 : if (m_pending_disconnect)
1056 0 : return;
1057 :
1058 1586 : if ( channels[c.channelnum].queued_commands.empty() &&
1059 : /* don't queue more packets then window size */
1060 793 : (channels[c.channelnum].queued_reliables.size()
1061 793 : < (channels[c.channelnum].getWindowSize()/2))) {
1062 1560 : LOG(dout_con<<m_connection->getDesc()
1063 : <<" processing reliable command for peer id: " << c.peer_id
1064 780 : <<" data size: " << c.data.getSize() << std::endl);
1065 780 : if (!processReliableSendCommand(c,max_packet_size)) {
1066 0 : channels[c.channelnum].queued_commands.push_back(c);
1067 : }
1068 : }
1069 : else {
1070 26 : LOG(dout_con<<m_connection->getDesc()
1071 : <<" Queueing reliable command for peer id: " << c.peer_id
1072 13 : <<" data size: " << c.data.getSize() <<std::endl);
1073 13 : channels[c.channelnum].queued_commands.push_back(c);
1074 : }
1075 : }
1076 :
1077 793 : bool UDPPeer::processReliableSendCommand(
1078 : ConnectionCommand &c,
1079 : unsigned int max_packet_size)
1080 : {
1081 793 : if (m_pending_disconnect)
1082 0 : return true;
1083 :
1084 : u32 chunksize_max = max_packet_size
1085 : - BASE_HEADER_SIZE
1086 793 : - RELIABLE_HEADER_SIZE;
1087 :
1088 793 : sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
1089 :
1090 1586 : std::list<SharedBuffer<u8> > originals;
1091 793 : u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum();
1092 :
1093 793 : if (c.raw)
1094 : {
1095 0 : originals.push_back(c.data);
1096 : }
1097 : else {
1098 793 : originals = makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number);
1099 793 : channels[c.channelnum].setNextSplitSeqNum(split_sequence_number);
1100 : }
1101 :
1102 793 : bool have_sequence_number = true;
1103 793 : bool have_initial_sequence_number = false;
1104 1586 : std::queue<BufferedPacket> toadd;
1105 793 : volatile u16 initial_sequence_number = 0;
1106 :
1107 4758 : for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
1108 3172 : i != originals.end(); ++i)
1109 : {
1110 793 : u16 seqnum = channels[c.channelnum].getOutgoingSequenceNumber(have_sequence_number);
1111 :
1112 : /* oops, we don't have enough sequence numbers to send this packet */
1113 793 : if (!have_sequence_number)
1114 0 : break;
1115 :
1116 793 : if (!have_initial_sequence_number)
1117 : {
1118 793 : initial_sequence_number = seqnum;
1119 793 : have_initial_sequence_number = true;
1120 : }
1121 :
1122 1586 : SharedBuffer<u8> reliable = makeReliablePacket(*i, seqnum);
1123 :
1124 : // Add base headers and make a packet
1125 : BufferedPacket p = con::makePacket(address, reliable,
1126 1586 : m_connection->GetProtocolID(), m_connection->GetPeerID(),
1127 3172 : c.channelnum);
1128 :
1129 793 : toadd.push(p);
1130 : }
1131 :
1132 793 : if (have_sequence_number) {
1133 793 : volatile u16 pcount = 0;
1134 2379 : while(toadd.size() > 0) {
1135 1586 : BufferedPacket p = toadd.front();
1136 793 : toadd.pop();
1137 : // LOG(dout_con<<connection->getDesc()
1138 : // << " queuing reliable packet for peer_id: " << c.peer_id
1139 : // << " channel: " << (c.channelnum&0xFF)
1140 : // << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
1141 : // << std::endl)
1142 793 : channels[c.channelnum].queued_reliables.push(p);
1143 793 : pcount++;
1144 : }
1145 793 : sanity_check(channels[c.channelnum].queued_reliables.size() < 0xFFFF);
1146 793 : return true;
1147 : }
1148 : else {
1149 0 : volatile u16 packets_available = toadd.size();
1150 : /* we didn't get a single sequence number no need to fill queue */
1151 0 : if (!have_initial_sequence_number)
1152 : {
1153 0 : return false;
1154 : }
1155 0 : while(toadd.size() > 0) {
1156 : /* remove packet */
1157 0 : toadd.pop();
1158 :
1159 : bool successfully_put_back_sequence_number
1160 0 : = channels[c.channelnum].putBackSequenceNumber(
1161 0 : (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
1162 :
1163 0 : FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
1164 : }
1165 0 : LOG(dout_con<<m_connection->getDesc()
1166 : << " Windowsize exceeded on reliable sending "
1167 : << c.data.getSize() << " bytes"
1168 : << std::endl << "\t\tinitial_sequence_number: "
1169 : << initial_sequence_number
1170 : << std::endl << "\t\tgot at most : "
1171 : << packets_available << " packets"
1172 : << std::endl << "\t\tpackets queued : "
1173 : << channels[c.channelnum].outgoing_reliables_sent.size()
1174 0 : << std::endl);
1175 0 : return false;
1176 : }
1177 : }
1178 :
1179 3384 : void UDPPeer::RunCommandQueues(
1180 : unsigned int max_packet_size,
1181 : unsigned int maxcommands,
1182 : unsigned int maxtransfer)
1183 : {
1184 :
1185 13536 : for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
1186 10152 : unsigned int commands_processed = 0;
1187 :
1188 20317 : if ((channels[i].queued_commands.size() > 0) &&
1189 10165 : (channels[i].queued_reliables.size() < maxtransfer) &&
1190 : (commands_processed < maxcommands)) {
1191 : try {
1192 26 : ConnectionCommand c = channels[i].queued_commands.front();
1193 :
1194 26 : LOG(dout_con << m_connection->getDesc()
1195 13 : << " processing queued reliable command " << std::endl);
1196 :
1197 : // Packet is processed, remove it from queue
1198 13 : if (processReliableSendCommand(c,max_packet_size)) {
1199 13 : channels[i].queued_commands.pop_front();
1200 : } else {
1201 0 : LOG(dout_con << m_connection->getDesc()
1202 : << " Failed to queue packets for peer_id: " << c.peer_id
1203 : << ", delaying sending of " << c.data.getSize()
1204 0 : << " bytes" << std::endl);
1205 : }
1206 : }
1207 0 : catch (ItemNotFoundException &e) {
1208 : // intentionally empty
1209 : }
1210 : }
1211 : }
1212 3384 : }
1213 :
1214 199 : u16 UDPPeer::getNextSplitSequenceNumber(u8 channel)
1215 : {
1216 : assert(channel < CHANNEL_COUNT); // Pre-condition
1217 199 : return channels[channel].readNextIncomingSeqNum();
1218 : }
1219 :
1220 199 : void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
1221 : {
1222 : assert(channel < CHANNEL_COUNT); // Pre-condition
1223 199 : channels[channel].setNextSplitSeqNum(seqnum);
1224 199 : }
1225 :
1226 3331 : SharedBuffer<u8> UDPPeer::addSpiltPacket(u8 channel,
1227 : BufferedPacket toadd,
1228 : bool reliable)
1229 : {
1230 : assert(channel < CHANNEL_COUNT); // Pre-condition
1231 3331 : return channels[channel].incoming_splits.insert(toadd,reliable);
1232 : }
1233 :
1234 : /******************************************************************************/
1235 : /* Connection Threads */
1236 : /******************************************************************************/
1237 :
1238 1 : ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size,
1239 : float timeout) :
1240 : m_connection(NULL),
1241 : m_max_packet_size(max_packet_size),
1242 : m_timeout(timeout),
1243 : m_max_commands_per_iteration(1),
1244 2 : m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration")),
1245 3 : m_max_packets_requeued(256)
1246 : {
1247 1 : }
1248 :
1249 1 : void * ConnectionSendThread::Thread()
1250 : {
1251 : assert(m_connection != NULL);
1252 1 : ThreadStarted();
1253 1 : log_register_thread("ConnectionSend");
1254 :
1255 2 : LOG(dout_con<<m_connection->getDesc()
1256 1 : <<"ConnectionSend thread started"<<std::endl);
1257 :
1258 1 : u32 curtime = porting::getTimeMs();
1259 1 : u32 lasttime = curtime;
1260 :
1261 : PROFILE(std::stringstream ThreadIdentifier);
1262 : PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
1263 :
1264 1 : porting::setThreadName("ConnectionSend");
1265 :
1266 : /* if stop is requested don't stop immediately but try to send all */
1267 : /* packets first */
1268 6771 : while(!StopRequested() || packetsQueued()) {
1269 : BEGIN_DEBUG_EXCEPTION_HANDLER
1270 : PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
1271 :
1272 3385 : m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
1273 :
1274 : /* wait for trigger or timeout */
1275 3385 : m_send_sleep_semaphore.Wait(50);
1276 :
1277 : /* remove all triggers */
1278 9286 : while(m_send_sleep_semaphore.Wait(0)) {}
1279 :
1280 3385 : lasttime = curtime;
1281 3385 : curtime = porting::getTimeMs();
1282 3385 : float dtime = CALC_DTIME(lasttime,curtime);
1283 :
1284 : /* first do all the reliable stuff */
1285 3385 : runTimeouts(dtime);
1286 :
1287 : /* translate commands to packets */
1288 6770 : ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
1289 12593 : while(c.type != CONNCMD_NONE)
1290 : {
1291 4604 : if (c.reliable)
1292 794 : processReliableCommand(c);
1293 : else
1294 3810 : processNonReliableCommand(c);
1295 :
1296 4604 : c = m_connection->m_command_queue.pop_frontNoEx(0);
1297 : }
1298 :
1299 : /* send non reliable packets */
1300 3385 : sendPackets(dtime);
1301 :
1302 0 : END_DEBUG_EXCEPTION_HANDLER(errorstream);
1303 : }
1304 :
1305 : PROFILE(g_profiler->remove(ThreadIdentifier.str()));
1306 1 : return NULL;
1307 : }
1308 :
1309 8893 : void ConnectionSendThread::Trigger()
1310 : {
1311 8893 : m_send_sleep_semaphore.Post();
1312 8893 : }
1313 :
1314 1 : bool ConnectionSendThread::packetsQueued()
1315 : {
1316 2 : std::list<u16> peerIds = m_connection->getPeerIDs();
1317 :
1318 1 : if (!m_outgoing_queue.empty() && !peerIds.empty())
1319 0 : return true;
1320 :
1321 6 : for(std::list<u16>::iterator j = peerIds.begin();
1322 4 : j != peerIds.end(); ++j)
1323 : {
1324 2 : PeerHelper peer = m_connection->getPeerNoEx(*j);
1325 :
1326 1 : if (!peer)
1327 0 : continue;
1328 :
1329 1 : if (dynamic_cast<UDPPeer*>(&peer) == 0)
1330 0 : continue;
1331 :
1332 4 : for(u16 i=0; i < CHANNEL_COUNT; i++) {
1333 3 : Channel *channel = &(dynamic_cast<UDPPeer*>(&peer))->channels[i];
1334 :
1335 3 : if (channel->queued_commands.size() > 0) {
1336 0 : return true;
1337 : }
1338 : }
1339 : }
1340 :
1341 :
1342 1 : return false;
1343 : }
1344 :
1345 3385 : void ConnectionSendThread::runTimeouts(float dtime)
1346 : {
1347 6770 : std::list<u16> timeouted_peers;
1348 6770 : std::list<u16> peerIds = m_connection->getPeerIDs();
1349 :
1350 20307 : for(std::list<u16>::iterator j = peerIds.begin();
1351 13538 : j != peerIds.end(); ++j)
1352 : {
1353 6768 : PeerHelper peer = m_connection->getPeerNoEx(*j);
1354 :
1355 3384 : if (!peer)
1356 0 : continue;
1357 :
1358 3384 : if (dynamic_cast<UDPPeer*>(&peer) == 0)
1359 0 : continue;
1360 :
1361 : PROFILE(std::stringstream peerIdentifier);
1362 : PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
1363 : << ";" << *j << ";RELIABLE]");
1364 : PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
1365 :
1366 6768 : SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
1367 :
1368 : /*
1369 : Check peer timeout
1370 : */
1371 3384 : if (peer->isTimedOut(m_timeout))
1372 : {
1373 0 : infostream<<m_connection->getDesc()
1374 0 : <<"RunTimeouts(): Peer "<<peer->id
1375 0 : <<" has timed out."
1376 0 : <<" (source=peer->timeout_counter)"
1377 0 : <<std::endl;
1378 : // Add peer to the list
1379 0 : timeouted_peers.push_back(peer->id);
1380 : // Don't bother going through the buffers of this one
1381 0 : continue;
1382 : }
1383 :
1384 3384 : float resend_timeout = dynamic_cast<UDPPeer*>(&peer)->getResendTimeout();
1385 13536 : for(u16 i=0; i<CHANNEL_COUNT; i++)
1386 : {
1387 20304 : std::list<BufferedPacket> timed_outs;
1388 10152 : Channel *channel = &(dynamic_cast<UDPPeer*>(&peer))->channels[i];
1389 :
1390 10152 : if (dynamic_cast<UDPPeer*>(&peer)->getLegacyPeer())
1391 10152 : channel->setWindowSize(g_settings->getU16("workaround_window_size"));
1392 :
1393 : // Remove timed out incomplete unreliable split packets
1394 10152 : channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
1395 :
1396 : // Increment reliable packet times
1397 10152 : channel->outgoing_reliables_sent.incrementTimeouts(dtime);
1398 :
1399 10152 : unsigned int numpeers = m_connection->m_peers.size();
1400 :
1401 10152 : if (numpeers == 0)
1402 0 : return;
1403 :
1404 : // Re-send timed out outgoing reliables
1405 : timed_outs = channel->
1406 20304 : outgoing_reliables_sent.getTimedOuts(resend_timeout,
1407 20304 : (m_max_data_packets_per_iteration/numpeers));
1408 :
1409 10152 : channel->UpdatePacketLossCounter(timed_outs.size());
1410 10152 : g_profiler->graphAdd("packets_lost", timed_outs.size());
1411 :
1412 10152 : m_iteration_packets_avaialble -= timed_outs.size();
1413 :
1414 30456 : for(std::list<BufferedPacket>::iterator k = timed_outs.begin();
1415 20304 : k != timed_outs.end(); ++k)
1416 : {
1417 0 : u16 peer_id = readPeerId(*(k->data));
1418 0 : u8 channelnum = readChannel(*(k->data));
1419 0 : u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE+1]));
1420 :
1421 0 : channel->UpdateBytesLost(k->data.getSize());
1422 0 : k->resend_count++;
1423 :
1424 0 : LOG(derr_con<<m_connection->getDesc()
1425 : <<"RE-SENDING timed-out RELIABLE to "
1426 : << k->address.serializeString()
1427 : << "(t/o="<<resend_timeout<<"): "
1428 : <<"from_peer_id="<<peer_id
1429 : <<", channel="<<((int)channelnum&0xff)
1430 : <<", seqnum="<<seqnum
1431 0 : <<std::endl);
1432 :
1433 0 : rawSend(*k);
1434 :
1435 : // do not handle rtt here as we can't decide if this packet was
1436 : // lost or really takes more time to transmit
1437 : }
1438 10152 : channel->UpdateTimers(dtime,dynamic_cast<UDPPeer*>(&peer)->getLegacyPeer());
1439 : }
1440 :
1441 : /* send ping if necessary */
1442 3384 : if (dynamic_cast<UDPPeer*>(&peer)->Ping(dtime,data)) {
1443 16 : LOG(dout_con<<m_connection->getDesc()
1444 : <<"Sending ping for peer_id: "
1445 8 : << dynamic_cast<UDPPeer*>(&peer)->id <<std::endl);
1446 : /* this may fail if there ain't a sequence number left */
1447 8 : if (!rawSendAsPacket(dynamic_cast<UDPPeer*>(&peer)->id, 0, data, true))
1448 : {
1449 : //retrigger with reduced ping interval
1450 0 : dynamic_cast<UDPPeer*>(&peer)->Ping(4.0,data);
1451 : }
1452 : }
1453 :
1454 3384 : dynamic_cast<UDPPeer*>(&peer)->RunCommandQueues(m_max_packet_size,
1455 : m_max_commands_per_iteration,
1456 3384 : m_max_packets_requeued);
1457 : }
1458 :
1459 : // Remove timed out peers
1460 10155 : for(std::list<u16>::iterator i = timeouted_peers.begin();
1461 6770 : i != timeouted_peers.end(); ++i)
1462 : {
1463 0 : LOG(derr_con<<m_connection->getDesc()
1464 0 : <<"RunTimeouts(): Removing peer "<<(*i)<<std::endl);
1465 0 : m_connection->deletePeer(*i, true);
1466 : }
1467 : }
1468 :
1469 4611 : void ConnectionSendThread::rawSend(const BufferedPacket &packet)
1470 : {
1471 : try{
1472 4611 : m_connection->m_udpSocket.Send(packet.address, *packet.data,
1473 9222 : packet.data.getSize());
1474 9222 : LOG(dout_con <<m_connection->getDesc()
1475 : << " rawSend: " << packet.data.getSize()
1476 4611 : << " bytes sent" << std::endl);
1477 0 : } catch(SendFailedException &e) {
1478 0 : LOG(derr_con<<m_connection->getDesc()
1479 : <<"Connection::rawSend(): SendFailedException: "
1480 0 : <<packet.address.serializeString()<<std::endl);
1481 : }
1482 4611 : }
1483 :
1484 802 : void ConnectionSendThread::sendAsPacketReliable(BufferedPacket& p, Channel* channel)
1485 : {
1486 : try{
1487 802 : p.absolute_send_time = porting::getTimeMs();
1488 : // Buffer the packet
1489 802 : channel->outgoing_reliables_sent.insert(p,
1490 802 : (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
1491 1604 : % (MAX_RELIABLE_WINDOW_SIZE+1));
1492 : }
1493 0 : catch(AlreadyExistsException &e)
1494 : {
1495 0 : LOG(derr_con<<m_connection->getDesc()
1496 : <<"WARNING: Going to send a reliable packet"
1497 0 : <<" in outgoing buffer" <<std::endl);
1498 : }
1499 :
1500 : // Send the packet
1501 802 : rawSend(p);
1502 802 : }
1503 :
1504 3818 : bool ConnectionSendThread::rawSendAsPacket(u16 peer_id, u8 channelnum,
1505 : SharedBuffer<u8> data, bool reliable)
1506 : {
1507 7636 : PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1508 3818 : if (!peer) {
1509 0 : LOG(dout_con<<m_connection->getDesc()
1510 : <<" INFO: dropped packet for non existent peer_id: "
1511 0 : << peer_id << std::endl);
1512 0 : FATAL_ERROR_IF(!reliable, "Trying to send raw packet reliable but no peer found!");
1513 0 : return false;
1514 : }
1515 3818 : Channel *channel = &(dynamic_cast<UDPPeer*>(&peer)->channels[channelnum]);
1516 :
1517 3818 : if (reliable)
1518 : {
1519 9 : bool have_sequence_number_for_raw_packet = true;
1520 : u16 seqnum =
1521 9 : channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
1522 :
1523 9 : if (!have_sequence_number_for_raw_packet)
1524 0 : return false;
1525 :
1526 18 : SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1527 9 : Address peer_address;
1528 9 : peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
1529 :
1530 : // Add base headers and make a packet
1531 : BufferedPacket p = con::makePacket(peer_address, reliable,
1532 18 : m_connection->GetProtocolID(), m_connection->GetPeerID(),
1533 36 : channelnum);
1534 :
1535 : // first check if our send window is already maxed out
1536 18 : if (channel->outgoing_reliables_sent.size()
1537 9 : < channel->getWindowSize()) {
1538 18 : LOG(dout_con<<m_connection->getDesc()
1539 : <<" INFO: sending a reliable packet to peer_id " << peer_id
1540 : <<" channel: " << channelnum
1541 9 : <<" seqnum: " << seqnum << std::endl);
1542 9 : sendAsPacketReliable(p,channel);
1543 9 : return true;
1544 : }
1545 : else {
1546 0 : LOG(dout_con<<m_connection->getDesc()
1547 : <<" INFO: queueing reliable packet for peer_id: " << peer_id
1548 : <<" channel: " << channelnum
1549 0 : <<" seqnum: " << seqnum << std::endl);
1550 0 : channel->queued_reliables.push(p);
1551 0 : return false;
1552 : }
1553 : }
1554 : else
1555 : {
1556 3809 : Address peer_address;
1557 :
1558 3809 : if (peer->getAddress(MTP_UDP, peer_address))
1559 : {
1560 : // Add base headers and make a packet
1561 : BufferedPacket p = con::makePacket(peer_address, data,
1562 7618 : m_connection->GetProtocolID(), m_connection->GetPeerID(),
1563 15236 : channelnum);
1564 :
1565 : // Send the packet
1566 3809 : rawSend(p);
1567 3809 : return true;
1568 : }
1569 : else {
1570 0 : LOG(dout_con<<m_connection->getDesc()
1571 : <<" INFO: dropped unreliable packet for peer_id: " << peer_id
1572 0 : <<" because of (yet) missing udp address" << std::endl);
1573 0 : return false;
1574 : }
1575 : }
1576 :
1577 : //never reached
1578 : return false;
1579 : }
1580 :
1581 794 : void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
1582 : {
1583 : assert(c.reliable); // Pre-condition
1584 :
1585 794 : switch(c.type) {
1586 : case CONNCMD_NONE:
1587 0 : LOG(dout_con<<m_connection->getDesc()
1588 0 : <<"UDP processing reliable CONNCMD_NONE"<<std::endl);
1589 0 : return;
1590 :
1591 : case CONNCMD_SEND:
1592 1586 : LOG(dout_con<<m_connection->getDesc()
1593 793 : <<"UDP processing reliable CONNCMD_SEND"<<std::endl);
1594 793 : sendReliable(c);
1595 793 : return;
1596 :
1597 : case CONNCMD_SEND_TO_ALL:
1598 0 : LOG(dout_con<<m_connection->getDesc()
1599 0 : <<"UDP processing CONNCMD_SEND_TO_ALL"<<std::endl);
1600 0 : sendToAllReliable(c);
1601 0 : return;
1602 :
1603 : case CONCMD_CREATE_PEER:
1604 0 : LOG(dout_con<<m_connection->getDesc()
1605 0 : <<"UDP processing reliable CONCMD_CREATE_PEER"<<std::endl);
1606 0 : if (!rawSendAsPacket(c.peer_id,c.channelnum,c.data,c.reliable))
1607 : {
1608 : /* put to queue if we couldn't send it immediately */
1609 0 : sendReliable(c);
1610 : }
1611 0 : return;
1612 :
1613 : case CONCMD_DISABLE_LEGACY:
1614 2 : LOG(dout_con<<m_connection->getDesc()
1615 1 : <<"UDP processing reliable CONCMD_DISABLE_LEGACY"<<std::endl);
1616 1 : if (!rawSendAsPacket(c.peer_id,c.channelnum,c.data,c.reliable))
1617 : {
1618 : /* put to queue if we couldn't send it immediately */
1619 0 : sendReliable(c);
1620 : }
1621 1 : return;
1622 :
1623 : case CONNCMD_SERVE:
1624 : case CONNCMD_CONNECT:
1625 : case CONNCMD_DISCONNECT:
1626 : case CONCMD_ACK:
1627 0 : FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
1628 : default:
1629 0 : LOG(dout_con<<m_connection->getDesc()
1630 0 : <<" Invalid reliable command type: " << c.type <<std::endl);
1631 : }
1632 : }
1633 :
1634 :
1635 3810 : void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
1636 : {
1637 : assert(!c.reliable); // Pre-condition
1638 :
1639 3810 : switch(c.type) {
1640 : case CONNCMD_NONE:
1641 0 : LOG(dout_con<<m_connection->getDesc()
1642 0 : <<" UDP processing CONNCMD_NONE"<<std::endl);
1643 0 : return;
1644 : case CONNCMD_SERVE:
1645 0 : LOG(dout_con<<m_connection->getDesc()
1646 : <<" UDP processing CONNCMD_SERVE port="
1647 0 : <<c.address.serializeString()<<std::endl);
1648 0 : serve(c.address);
1649 0 : return;
1650 : case CONNCMD_CONNECT:
1651 2 : LOG(dout_con<<m_connection->getDesc()
1652 1 : <<" UDP processing CONNCMD_CONNECT"<<std::endl);
1653 1 : connect(c.address);
1654 1 : return;
1655 : case CONNCMD_DISCONNECT:
1656 2 : LOG(dout_con<<m_connection->getDesc()
1657 1 : <<" UDP processing CONNCMD_DISCONNECT"<<std::endl);
1658 1 : disconnect();
1659 1 : return;
1660 : case CONNCMD_DISCONNECT_PEER:
1661 0 : LOG(dout_con<<m_connection->getDesc()
1662 0 : <<" UDP processing CONNCMD_DISCONNECT_PEER"<<std::endl);
1663 0 : disconnect_peer(c.peer_id);
1664 0 : return;
1665 : case CONNCMD_SEND:
1666 398 : LOG(dout_con<<m_connection->getDesc()
1667 199 : <<" UDP processing CONNCMD_SEND"<<std::endl);
1668 199 : send(c.peer_id, c.channelnum, c.data);
1669 199 : return;
1670 : case CONNCMD_SEND_TO_ALL:
1671 0 : LOG(dout_con<<m_connection->getDesc()
1672 0 : <<" UDP processing CONNCMD_SEND_TO_ALL"<<std::endl);
1673 0 : sendToAll(c.channelnum, c.data);
1674 0 : return;
1675 : case CONCMD_ACK:
1676 7218 : LOG(dout_con<<m_connection->getDesc()
1677 3609 : <<" UDP processing CONCMD_ACK"<<std::endl);
1678 3609 : sendAsPacket(c.peer_id,c.channelnum,c.data,true);
1679 3609 : return;
1680 : case CONCMD_CREATE_PEER:
1681 0 : FATAL_ERROR("Got command that should be reliable as unreliable command");
1682 : default:
1683 0 : LOG(dout_con<<m_connection->getDesc()
1684 0 : <<" Invalid command type: " << c.type <<std::endl);
1685 : }
1686 : }
1687 :
1688 0 : void ConnectionSendThread::serve(Address bind_address)
1689 : {
1690 0 : LOG(dout_con<<m_connection->getDesc()
1691 0 : <<"UDP serving at port " << bind_address.serializeString() <<std::endl);
1692 : try{
1693 0 : m_connection->m_udpSocket.Bind(bind_address);
1694 0 : m_connection->SetPeerID(PEER_ID_SERVER);
1695 : }
1696 0 : catch(SocketException &e) {
1697 : // Create event
1698 0 : ConnectionEvent ce;
1699 0 : ce.bindFailed();
1700 0 : m_connection->putEvent(ce);
1701 : }
1702 0 : }
1703 :
1704 1 : void ConnectionSendThread::connect(Address address)
1705 : {
1706 2 : LOG(dout_con<<m_connection->getDesc()<<" connecting to "<<address.serializeString()
1707 1 : <<":"<<address.getPort()<<std::endl);
1708 :
1709 1 : UDPPeer *peer = m_connection->createServerPeer(address);
1710 :
1711 : // Create event
1712 2 : ConnectionEvent e;
1713 1 : e.peerAdded(peer->id, peer->address);
1714 1 : m_connection->putEvent(e);
1715 :
1716 1 : Address bind_addr;
1717 :
1718 1 : if (address.isIPv6())
1719 0 : bind_addr.setAddress((IPv6AddressBytes*) NULL);
1720 : else
1721 1 : bind_addr.setAddress(0,0,0,0);
1722 :
1723 1 : m_connection->m_udpSocket.Bind(bind_addr);
1724 :
1725 : // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1726 1 : m_connection->SetPeerID(PEER_ID_INEXISTENT);
1727 2 : NetworkPacket pkt(0,0);
1728 1 : m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
1729 1 : }
1730 :
1731 1 : void ConnectionSendThread::disconnect()
1732 : {
1733 1 : LOG(dout_con<<m_connection->getDesc()<<" disconnecting"<<std::endl);
1734 :
1735 : // Create and send DISCO packet
1736 2 : SharedBuffer<u8> data(2);
1737 1 : writeU8(&data[0], TYPE_CONTROL);
1738 1 : writeU8(&data[1], CONTROLTYPE_DISCO);
1739 :
1740 :
1741 : // Send to all
1742 2 : std::list<u16> peerids = m_connection->getPeerIDs();
1743 :
1744 6 : for (std::list<u16>::iterator i = peerids.begin();
1745 4 : i != peerids.end();
1746 : i++)
1747 : {
1748 1 : sendAsPacket(*i, 0,data,false);
1749 : }
1750 1 : }
1751 :
1752 0 : void ConnectionSendThread::disconnect_peer(u16 peer_id)
1753 : {
1754 0 : LOG(dout_con<<m_connection->getDesc()<<" disconnecting peer"<<std::endl);
1755 :
1756 : // Create and send DISCO packet
1757 0 : SharedBuffer<u8> data(2);
1758 0 : writeU8(&data[0], TYPE_CONTROL);
1759 0 : writeU8(&data[1], CONTROLTYPE_DISCO);
1760 0 : sendAsPacket(peer_id, 0,data,false);
1761 :
1762 0 : PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1763 :
1764 0 : if (!peer)
1765 0 : return;
1766 :
1767 0 : if (dynamic_cast<UDPPeer*>(&peer) == 0)
1768 : {
1769 0 : return;
1770 : }
1771 :
1772 0 : dynamic_cast<UDPPeer*>(&peer)->m_pending_disconnect = true;
1773 : }
1774 :
1775 199 : void ConnectionSendThread::send(u16 peer_id, u8 channelnum,
1776 : SharedBuffer<u8> data)
1777 : {
1778 : assert(channelnum < CHANNEL_COUNT); // Pre-condition
1779 :
1780 398 : PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1781 199 : if (!peer)
1782 : {
1783 0 : LOG(dout_con<<m_connection->getDesc()<<" peer: peer_id="<<peer_id
1784 : << ">>>NOT<<< found on sending packet"
1785 : << ", channel " << (channelnum % 0xFF)
1786 0 : << ", size: " << data.getSize() <<std::endl);
1787 0 : return;
1788 : }
1789 :
1790 398 : LOG(dout_con<<m_connection->getDesc()<<" sending to peer_id="<<peer_id
1791 : << ", channel " << (channelnum % 0xFF)
1792 199 : << ", size: " << data.getSize() <<std::endl);
1793 :
1794 199 : u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
1795 :
1796 199 : u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1797 398 : std::list<SharedBuffer<u8> > originals;
1798 :
1799 199 : originals = makeAutoSplitPacket(data, chunksize_max,split_sequence_number);
1800 :
1801 199 : peer->setNextSplitSequenceNumber(channelnum,split_sequence_number);
1802 :
1803 1194 : for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
1804 796 : i != originals.end(); ++i)
1805 : {
1806 398 : SharedBuffer<u8> original = *i;
1807 199 : sendAsPacket(peer_id, channelnum, original);
1808 : }
1809 : }
1810 :
1811 793 : void ConnectionSendThread::sendReliable(ConnectionCommand &c)
1812 : {
1813 1586 : PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
1814 793 : if (!peer)
1815 0 : return;
1816 :
1817 793 : peer->PutReliableSendCommand(c,m_max_packet_size);
1818 : }
1819 :
1820 0 : void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
1821 : {
1822 0 : std::list<u16> peerids = m_connection->getPeerIDs();
1823 :
1824 0 : for (std::list<u16>::iterator i = peerids.begin();
1825 0 : i != peerids.end();
1826 : i++)
1827 : {
1828 0 : send(*i, channelnum, data);
1829 : }
1830 0 : }
1831 :
1832 0 : void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
1833 : {
1834 0 : std::list<u16> peerids = m_connection->getPeerIDs();
1835 :
1836 0 : for (std::list<u16>::iterator i = peerids.begin();
1837 0 : i != peerids.end();
1838 : i++)
1839 : {
1840 0 : PeerHelper peer = m_connection->getPeerNoEx(*i);
1841 :
1842 0 : if (!peer)
1843 0 : continue;
1844 :
1845 0 : peer->PutReliableSendCommand(c,m_max_packet_size);
1846 : }
1847 0 : }
1848 :
1849 3385 : void ConnectionSendThread::sendPackets(float dtime)
1850 : {
1851 6770 : std::list<u16> peerIds = m_connection->getPeerIDs();
1852 6770 : std::list<u16> pendingDisconnect;
1853 6770 : std::map<u16,bool> pending_unreliable;
1854 :
1855 16925 : for(std::list<u16>::iterator
1856 3385 : j = peerIds.begin();
1857 13540 : j != peerIds.end(); ++j)
1858 : {
1859 6770 : PeerHelper peer = m_connection->getPeerNoEx(*j);
1860 : //peer may have been removed
1861 3385 : if (!peer) {
1862 0 : LOG(dout_con<<m_connection->getDesc()<< " Peer not found: peer_id=" << *j << std::endl);
1863 0 : continue;
1864 : }
1865 3385 : peer->m_increment_packets_remaining = m_iteration_packets_avaialble/m_connection->m_peers.size();
1866 :
1867 3385 : if (dynamic_cast<UDPPeer*>(&peer) == 0)
1868 : {
1869 0 : continue;
1870 : }
1871 :
1872 3385 : if (dynamic_cast<UDPPeer*>(&peer)->m_pending_disconnect)
1873 : {
1874 0 : pendingDisconnect.push_back(*j);
1875 : }
1876 :
1877 : PROFILE(std::stringstream peerIdentifier);
1878 : PROFILE(peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << *j << ";RELIABLE]");
1879 : PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
1880 :
1881 6770 : LOG(dout_con<<m_connection->getDesc()
1882 : << " Handle per peer queues: peer_id=" << *j
1883 3385 : << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
1884 : // first send queued reliable packets for all peers (if possible)
1885 13540 : for (unsigned int i=0; i < CHANNEL_COUNT; i++)
1886 : {
1887 10155 : u16 next_to_ack = 0;
1888 10155 : dynamic_cast<UDPPeer*>(&peer)->channels[i].outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
1889 10155 : u16 next_to_receive = 0;
1890 10155 : dynamic_cast<UDPPeer*>(&peer)->channels[i].incoming_reliables.getFirstSeqnum(next_to_receive);
1891 :
1892 20310 : LOG(dout_con<<m_connection->getDesc()<< "\t channel: "
1893 : << i << ", peer quota:"
1894 : << peer->m_increment_packets_remaining
1895 : << std::endl
1896 : << "\t\t\treliables on wire: "
1897 : << dynamic_cast<UDPPeer*>(&peer)->channels[i].outgoing_reliables_sent.size()
1898 : << ", waiting for ack for " << next_to_ack
1899 : << std::endl
1900 : << "\t\t\tincoming_reliables: "
1901 : << dynamic_cast<UDPPeer*>(&peer)->channels[i].incoming_reliables.size()
1902 : << ", next reliable packet: "
1903 : << dynamic_cast<UDPPeer*>(&peer)->channels[i].readNextIncomingSeqNum()
1904 : << ", next queued: " << next_to_receive
1905 : << std::endl
1906 : << "\t\t\treliables queued : "
1907 : << dynamic_cast<UDPPeer*>(&peer)->channels[i].queued_reliables.size()
1908 : << std::endl
1909 : << "\t\t\tqueued commands : "
1910 : << dynamic_cast<UDPPeer*>(&peer)->channels[i].queued_commands.size()
1911 10155 : << std::endl);
1912 :
1913 23482 : while ((dynamic_cast<UDPPeer*>(&peer)->channels[i].queued_reliables.size() > 0) &&
1914 793 : (dynamic_cast<UDPPeer*>(&peer)->channels[i].outgoing_reliables_sent.size()
1915 12534 : < dynamic_cast<UDPPeer*>(&peer)->channels[i].getWindowSize())&&
1916 793 : (peer->m_increment_packets_remaining > 0))
1917 : {
1918 1586 : BufferedPacket p = dynamic_cast<UDPPeer*>(&peer)->channels[i].queued_reliables.front();
1919 793 : dynamic_cast<UDPPeer*>(&peer)->channels[i].queued_reliables.pop();
1920 793 : Channel* channel = &(dynamic_cast<UDPPeer*>(&peer)->channels[i]);
1921 1586 : LOG(dout_con<<m_connection->getDesc()
1922 : <<" INFO: sending a queued reliable packet "
1923 : <<" channel: " << i
1924 : <<", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
1925 793 : << std::endl);
1926 793 : sendAsPacketReliable(p,channel);
1927 793 : peer->m_increment_packets_remaining--;
1928 : }
1929 : }
1930 : }
1931 :
1932 3385 : if (m_outgoing_queue.size())
1933 : {
1934 2900 : LOG(dout_con<<m_connection->getDesc()
1935 : << " Handle non reliable queue ("
1936 1450 : << m_outgoing_queue.size() << " pkts)" << std::endl);
1937 : }
1938 :
1939 3385 : unsigned int initial_queuesize = m_outgoing_queue.size();
1940 : /* send non reliable packets*/
1941 7194 : for(unsigned int i=0;i < initial_queuesize;i++) {
1942 7618 : OutgoingPacket packet = m_outgoing_queue.front();
1943 3809 : m_outgoing_queue.pop();
1944 :
1945 3809 : if (packet.reliable)
1946 0 : continue;
1947 :
1948 7618 : PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
1949 3809 : if (!peer) {
1950 0 : LOG(dout_con<<m_connection->getDesc()
1951 : <<" Outgoing queue: peer_id="<<packet.peer_id
1952 : << ">>>NOT<<< found on sending packet"
1953 : << ", channel " << (packet.channelnum % 0xFF)
1954 0 : << ", size: " << packet.data.getSize() <<std::endl);
1955 0 : continue;
1956 : }
1957 : /* send acks immediately */
1958 3809 : else if (packet.ack)
1959 : {
1960 7218 : rawSendAsPacket(packet.peer_id, packet.channelnum,
1961 7218 : packet.data, packet.reliable);
1962 3609 : peer->m_increment_packets_remaining =
1963 3609 : MYMIN(0,peer->m_increment_packets_remaining--);
1964 : }
1965 200 : else if (
1966 200 : ( peer->m_increment_packets_remaining > 0) ||
1967 0 : (StopRequested())) {
1968 400 : rawSendAsPacket(packet.peer_id, packet.channelnum,
1969 400 : packet.data, packet.reliable);
1970 200 : peer->m_increment_packets_remaining--;
1971 : }
1972 : else {
1973 0 : m_outgoing_queue.push(packet);
1974 0 : pending_unreliable[packet.peer_id] = true;
1975 : }
1976 : }
1977 :
1978 6770 : for(std::list<u16>::iterator
1979 3385 : k = pendingDisconnect.begin();
1980 6770 : k != pendingDisconnect.end(); ++k)
1981 : {
1982 0 : if (!pending_unreliable[*k])
1983 : {
1984 0 : m_connection->deletePeer(*k,false);
1985 : }
1986 : }
1987 3385 : }
1988 :
1989 3809 : void ConnectionSendThread::sendAsPacket(u16 peer_id, u8 channelnum,
1990 : SharedBuffer<u8> data, bool ack)
1991 : {
1992 7618 : OutgoingPacket packet(peer_id, channelnum, data, false, ack);
1993 3809 : m_outgoing_queue.push(packet);
1994 3809 : }
1995 :
1996 1 : ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
1997 1 : m_connection(NULL)
1998 : {
1999 1 : }
2000 :
2001 1 : void * ConnectionReceiveThread::Thread()
2002 : {
2003 : assert(m_connection != NULL);
2004 1 : ThreadStarted();
2005 1 : log_register_thread("ConnectionReceive");
2006 :
2007 2 : LOG(dout_con<<m_connection->getDesc()
2008 1 : <<"ConnectionReceive thread started"<<std::endl);
2009 :
2010 : PROFILE(std::stringstream ThreadIdentifier);
2011 : PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
2012 :
2013 1 : porting::setThreadName("ConnectionReceive");
2014 :
2015 : #ifdef DEBUG_CONNECTION_KBPS
2016 : u32 curtime = porting::getTimeMs();
2017 : u32 lasttime = curtime;
2018 : float debug_print_timer = 0.0;
2019 : #endif
2020 :
2021 1667 : while(!StopRequested()) {
2022 : BEGIN_DEBUG_EXCEPTION_HANDLER
2023 : PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
2024 :
2025 : #ifdef DEBUG_CONNECTION_KBPS
2026 : lasttime = curtime;
2027 : curtime = porting::getTimeMs();
2028 : float dtime = CALC_DTIME(lasttime,curtime);
2029 : #endif
2030 :
2031 : /* receive packets */
2032 833 : receive();
2033 :
2034 : #ifdef DEBUG_CONNECTION_KBPS
2035 : debug_print_timer += dtime;
2036 : if (debug_print_timer > 20.0) {
2037 : debug_print_timer -= 20.0;
2038 :
2039 : std::list<u16> peerids = m_connection->getPeerIDs();
2040 :
2041 : for (std::list<u16>::iterator i = peerids.begin();
2042 : i != peerids.end();
2043 : i++)
2044 : {
2045 : PeerHelper peer = m_connection->getPeerNoEx(*i);
2046 : if (!peer)
2047 : continue;
2048 :
2049 : float peer_current = 0.0;
2050 : float peer_loss = 0.0;
2051 : float avg_rate = 0.0;
2052 : float avg_loss = 0.0;
2053 :
2054 : for(u16 j=0; j<CHANNEL_COUNT; j++)
2055 : {
2056 : peer_current +=peer->channels[j].getCurrentDownloadRateKB();
2057 : peer_loss += peer->channels[j].getCurrentLossRateKB();
2058 : avg_rate += peer->channels[j].getAvgDownloadRateKB();
2059 : avg_loss += peer->channels[j].getAvgLossRateKB();
2060 : }
2061 :
2062 : std::stringstream output;
2063 : output << std::fixed << std::setprecision(1);
2064 : output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
2065 : output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
2066 : output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
2067 : output << std::setfill(' ');
2068 : for(u16 j=0; j<CHANNEL_COUNT; j++)
2069 : {
2070 : output << "\tcha " << j << ":"
2071 : << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
2072 : << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
2073 : << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
2074 : << " /"
2075 : << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
2076 : << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
2077 : << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
2078 : << " / WS: " << peer->channels[j].getWindowSize()
2079 : << std::endl;
2080 : }
2081 :
2082 : fprintf(stderr,"%s\n",output.str().c_str());
2083 : }
2084 : }
2085 : #endif
2086 0 : END_DEBUG_EXCEPTION_HANDLER(errorstream);
2087 : }
2088 : PROFILE(g_profiler->remove(ThreadIdentifier.str()));
2089 1 : return NULL;
2090 : }
2091 :
2092 : // Receive packets from the network and buffers and create ConnectionEvents
2093 833 : void ConnectionReceiveThread::receive()
2094 : {
2095 : // use IPv6 minimum allowed MTU as receive buffer size as this is
2096 : // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
2097 : // infrastructure
2098 833 : unsigned int packet_maxsize = 1500;
2099 1666 : SharedBuffer<u8> packetdata(packet_maxsize);
2100 :
2101 833 : bool packet_queued = true;
2102 :
2103 833 : unsigned int loop_count = 0;
2104 :
2105 : /* first of all read packets from socket */
2106 : /* check for incoming data available */
2107 15630 : while( (loop_count < 10) &&
2108 5207 : (m_connection->m_udpSocket.WaitData(50))) {
2109 4795 : loop_count++;
2110 : try {
2111 4795 : if (packet_queued) {
2112 930 : bool data_left = true;
2113 : u16 peer_id;
2114 1860 : SharedBuffer<u8> resultdata;
2115 3604 : while(data_left) {
2116 : try {
2117 1337 : data_left = getFromBuffers(peer_id, resultdata);
2118 942 : if (data_left) {
2119 24 : ConnectionEvent e;
2120 12 : e.dataReceived(peer_id, resultdata);
2121 12 : m_connection->putEvent(e);
2122 : }
2123 : }
2124 395 : catch(ProcessedSilentlyException &e) {
2125 : /* try reading again */
2126 : }
2127 : }
2128 930 : packet_queued = false;
2129 : }
2130 :
2131 4795 : Address sender;
2132 4795 : s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata, packet_maxsize);
2133 :
2134 9590 : if ((received_size < BASE_HEADER_SIZE) ||
2135 4795 : (readU32(&packetdata[0]) != m_connection->GetProtocolID()))
2136 : {
2137 0 : LOG(derr_con<<m_connection->getDesc()
2138 : <<"Receive(): Invalid incoming packet, "
2139 : <<"size: " << received_size
2140 : <<", protocol: "
2141 : << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
2142 0 : << std::endl);
2143 0 : continue;
2144 : }
2145 :
2146 4795 : u16 peer_id = readPeerId(*packetdata);
2147 4795 : u8 channelnum = readChannel(*packetdata);
2148 :
2149 4795 : if (channelnum > CHANNEL_COUNT-1) {
2150 0 : LOG(derr_con<<m_connection->getDesc()
2151 0 : <<"Receive(): Invalid channel "<<channelnum<<std::endl);
2152 0 : throw InvalidIncomingDataException("Channel doesn't exist");
2153 : }
2154 :
2155 : /* preserve original peer_id for later usage */
2156 4795 : u16 packet_peer_id = peer_id;
2157 :
2158 : /* Try to identify peer by sender address (may happen on join) */
2159 4795 : if (peer_id == PEER_ID_INEXISTENT) {
2160 0 : peer_id = m_connection->lookupPeer(sender);
2161 : }
2162 :
2163 : /* The peer was not found in our lists. Add it. */
2164 4795 : if (peer_id == PEER_ID_INEXISTENT) {
2165 0 : peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
2166 : }
2167 :
2168 9590 : PeerHelper peer = m_connection->getPeerNoEx(peer_id);
2169 :
2170 4795 : if (!peer) {
2171 0 : LOG(dout_con<<m_connection->getDesc()
2172 : <<" got packet from unknown peer_id: "
2173 0 : <<peer_id<<" Ignoring."<<std::endl);
2174 0 : continue;
2175 : }
2176 :
2177 : // Validate peer address
2178 :
2179 4795 : Address peer_address;
2180 :
2181 4795 : if (peer->getAddress(MTP_UDP, peer_address)) {
2182 4795 : if (peer_address != sender) {
2183 0 : LOG(derr_con<<m_connection->getDesc()
2184 : <<m_connection->getDesc()
2185 : <<" Peer "<<peer_id<<" sending from different address."
2186 0 : " Ignoring."<<std::endl);
2187 0 : continue;
2188 : }
2189 : }
2190 : else {
2191 :
2192 0 : bool invalid_address = true;
2193 0 : if (invalid_address) {
2194 0 : LOG(derr_con<<m_connection->getDesc()
2195 : <<m_connection->getDesc()
2196 : <<" Peer "<<peer_id<<" unknown."
2197 0 : " Ignoring."<<std::endl);
2198 0 : continue;
2199 : }
2200 : }
2201 :
2202 :
2203 : /* mark peer as seen with id */
2204 4795 : if (!(packet_peer_id == PEER_ID_INEXISTENT))
2205 4795 : peer->setSentWithID();
2206 :
2207 4795 : peer->ResetTimeout();
2208 :
2209 4795 : Channel *channel = 0;
2210 :
2211 4795 : if (dynamic_cast<UDPPeer*>(&peer) != 0)
2212 : {
2213 4795 : channel = &(dynamic_cast<UDPPeer*>(&peer)->channels[channelnum]);
2214 : }
2215 :
2216 4795 : if (channel != 0) {
2217 4795 : channel->UpdateBytesReceived(received_size);
2218 : }
2219 :
2220 : // Throw the received packet to channel->processPacket()
2221 :
2222 : // Make a new SharedBuffer from the data without the base headers
2223 9590 : SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
2224 4795 : memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
2225 9590 : strippeddata.getSize());
2226 :
2227 : try{
2228 : // Process it (the result is some data with no headers made by us)
2229 : SharedBuffer<u8> resultdata = processPacket
2230 9590 : (channel, strippeddata, peer_id, channelnum, false);
2231 :
2232 2626 : LOG(dout_con<<m_connection->getDesc()
2233 : <<" ProcessPacket from peer_id: " << peer_id
2234 : << ",channel: " << (channelnum & 0xFF) << ", returned "
2235 1313 : << resultdata.getSize() << " bytes" <<std::endl);
2236 :
2237 2626 : ConnectionEvent e;
2238 1313 : e.dataReceived(peer_id, resultdata);
2239 1313 : m_connection->putEvent(e);
2240 : }
2241 3075 : catch(ProcessedSilentlyException &e) {
2242 : }
2243 814 : catch(ProcessedQueued &e) {
2244 407 : packet_queued = true;
2245 : }
2246 : }
2247 0 : catch(InvalidIncomingDataException &e) {
2248 : }
2249 0 : catch(ProcessedSilentlyException &e) {
2250 : }
2251 : }
2252 833 : }
2253 :
2254 1337 : bool ConnectionReceiveThread::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
2255 : {
2256 2674 : std::list<u16> peerids = m_connection->getPeerIDs();
2257 :
2258 6801 : for(std::list<u16>::iterator j = peerids.begin();
2259 4534 : j != peerids.end(); ++j)
2260 : {
2261 2662 : PeerHelper peer = m_connection->getPeerNoEx(*j);
2262 1337 : if (!peer)
2263 0 : continue;
2264 :
2265 1337 : if (dynamic_cast<UDPPeer*>(&peer) == 0)
2266 0 : continue;
2267 :
2268 4127 : for(u16 i=0; i<CHANNEL_COUNT; i++)
2269 : {
2270 3197 : Channel *channel = &(dynamic_cast<UDPPeer*>(&peer))->channels[i];
2271 :
2272 3197 : if (checkIncomingBuffers(channel, peer_id, dst)) {
2273 12 : return true;
2274 : }
2275 : }
2276 : }
2277 930 : return false;
2278 : }
2279 :
2280 3197 : bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
2281 : u16 &peer_id, SharedBuffer<u8> &dst)
2282 : {
2283 3197 : u16 firstseqnum = 0;
2284 3197 : if (channel->incoming_reliables.getFirstSeqnum(firstseqnum))
2285 : {
2286 847 : if (firstseqnum == channel->readNextIncomingSeqNum())
2287 : {
2288 814 : BufferedPacket p = channel->incoming_reliables.popFirst();
2289 407 : peer_id = readPeerId(*p.data);
2290 407 : u8 channelnum = readChannel(*p.data);
2291 407 : u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
2292 :
2293 814 : LOG(dout_con<<m_connection->getDesc()
2294 : <<"UNBUFFERING TYPE_RELIABLE"
2295 : <<" seqnum="<<seqnum
2296 : <<" peer_id="<<peer_id
2297 : <<" channel="<<((int)channelnum&0xff)
2298 407 : <<std::endl);
2299 :
2300 407 : channel->incNextIncomingSeqNum();
2301 :
2302 407 : u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
2303 : // Get out the inside packet and re-process it
2304 814 : SharedBuffer<u8> payload(p.data.getSize() - headers_size);
2305 407 : memcpy(*payload, &p.data[headers_size], payload.getSize());
2306 :
2307 407 : dst = processPacket(channel, payload, peer_id, channelnum, true);
2308 12 : return true;
2309 : }
2310 : }
2311 2790 : return false;
2312 : }
2313 :
2314 8404 : SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
2315 : SharedBuffer<u8> packetdata, u16 peer_id, u8 channelnum, bool reliable)
2316 : {
2317 16808 : PeerHelper peer = m_connection->getPeerNoEx(peer_id);
2318 :
2319 8404 : if (!peer) {
2320 0 : errorstream << "Peer not found (possible timeout)" << std::endl;
2321 0 : throw ProcessedSilentlyException("Peer not found (possible timeout)");
2322 : }
2323 :
2324 8404 : if (packetdata.getSize() < 1)
2325 0 : throw InvalidIncomingDataException("packetdata.getSize() < 1");
2326 :
2327 8404 : u8 type = readU8(&(packetdata[0]));
2328 :
2329 8404 : if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
2330 0 : errorstream << "Something is wrong with peer_id" << std::endl;
2331 0 : FATAL_ERROR("");
2332 : }
2333 :
2334 8404 : if (type == TYPE_CONTROL)
2335 : {
2336 811 : if (packetdata.getSize() < 2)
2337 0 : throw InvalidIncomingDataException("packetdata.getSize() < 2");
2338 :
2339 811 : u8 controltype = readU8(&(packetdata[1]));
2340 :
2341 811 : if (controltype == CONTROLTYPE_ACK)
2342 : {
2343 802 : FATAL_ERROR_IF(channel == 0, "Invalid channel (0)");
2344 802 : if (packetdata.getSize() < 4)
2345 : throw InvalidIncomingDataException
2346 0 : ("packetdata.getSize() < 4 (ACK header size)");
2347 :
2348 802 : u16 seqnum = readU16(&packetdata[2]);
2349 1604 : LOG(dout_con<<m_connection->getDesc()
2350 : <<" [ CONTROLTYPE_ACK: channelnum="
2351 : <<((int)channelnum&0xff)<<", peer_id="<<peer_id
2352 802 : <<", seqnum="<<seqnum<< " ]"<<std::endl);
2353 :
2354 : try{
2355 : BufferedPacket p =
2356 1604 : channel->outgoing_reliables_sent.popSeqnum(seqnum);
2357 :
2358 : // only calculate rtt from straight sent packets
2359 802 : if (p.resend_count == 0) {
2360 : // Get round trip time
2361 802 : unsigned int current_time = porting::getTimeMs();
2362 :
2363 : // a overflow is quite unlikely but as it'd result in major
2364 : // rtt miscalculation we handle it here
2365 802 : if (current_time > p.absolute_send_time)
2366 : {
2367 497 : float rtt = (current_time - p.absolute_send_time) / 1000.0;
2368 :
2369 : // Let peer calculate stuff according to it
2370 : // (avg_rtt and resend_timeout)
2371 497 : dynamic_cast<UDPPeer*>(&peer)->reportRTT(rtt);
2372 : }
2373 305 : else if (p.totaltime > 0)
2374 : {
2375 13 : float rtt = p.totaltime;
2376 :
2377 : // Let peer calculate stuff according to it
2378 : // (avg_rtt and resend_timeout)
2379 13 : dynamic_cast<UDPPeer*>(&peer)->reportRTT(rtt);
2380 : }
2381 : }
2382 : //put bytes for max bandwidth calculation
2383 802 : channel->UpdateBytesSent(p.data.getSize(),1);
2384 802 : if (channel->outgoing_reliables_sent.size() == 0)
2385 : {
2386 680 : m_connection->TriggerSend();
2387 : }
2388 : }
2389 0 : catch(NotFoundException &e) {
2390 0 : LOG(derr_con<<m_connection->getDesc()
2391 : <<"WARNING: ACKed packet not "
2392 : "in outgoing queue"
2393 0 : <<std::endl);
2394 0 : channel->UpdatePacketTooLateCounter();
2395 : }
2396 802 : throw ProcessedSilentlyException("Got an ACK");
2397 : }
2398 9 : else if (controltype == CONTROLTYPE_SET_PEER_ID) {
2399 : // Got a packet to set our peer id
2400 1 : if (packetdata.getSize() < 4)
2401 : throw InvalidIncomingDataException
2402 0 : ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
2403 1 : u16 peer_id_new = readU16(&packetdata[2]);
2404 2 : LOG(dout_con<<m_connection->getDesc()
2405 1 : <<"Got new peer id: "<<peer_id_new<<"... "<<std::endl);
2406 :
2407 1 : if (m_connection->GetPeerID() != PEER_ID_INEXISTENT)
2408 : {
2409 0 : LOG(derr_con<<m_connection->getDesc()
2410 : <<"WARNING: Not changing"
2411 0 : " existing peer id."<<std::endl);
2412 : }
2413 : else
2414 : {
2415 1 : LOG(dout_con<<m_connection->getDesc()<<"changing own peer id"<<std::endl);
2416 1 : m_connection->SetPeerID(peer_id_new);
2417 : }
2418 :
2419 2 : ConnectionCommand cmd;
2420 :
2421 2 : SharedBuffer<u8> reply(2);
2422 1 : writeU8(&reply[0], TYPE_CONTROL);
2423 1 : writeU8(&reply[1], CONTROLTYPE_ENABLE_BIG_SEND_WINDOW);
2424 1 : cmd.disableLegacy(PEER_ID_SERVER,reply);
2425 1 : m_connection->putCommand(cmd);
2426 :
2427 1 : throw ProcessedSilentlyException("Got a SET_PEER_ID");
2428 : }
2429 8 : else if (controltype == CONTROLTYPE_PING)
2430 : {
2431 : // Just ignore it, the incoming data already reset
2432 : // the timeout counter
2433 8 : LOG(dout_con<<m_connection->getDesc()<<"PING"<<std::endl);
2434 8 : throw ProcessedSilentlyException("Got a PING");
2435 : }
2436 0 : else if (controltype == CONTROLTYPE_DISCO)
2437 : {
2438 : // Just ignore it, the incoming data already reset
2439 : // the timeout counter
2440 0 : LOG(dout_con<<m_connection->getDesc()
2441 0 : <<"DISCO: Removing peer "<<(peer_id)<<std::endl);
2442 :
2443 0 : if (m_connection->deletePeer(peer_id, false) == false)
2444 : {
2445 0 : derr_con<<m_connection->getDesc()
2446 0 : <<"DISCO: Peer not found"<<std::endl;
2447 : }
2448 :
2449 0 : throw ProcessedSilentlyException("Got a DISCO");
2450 : }
2451 0 : else if (controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW)
2452 : {
2453 0 : dynamic_cast<UDPPeer*>(&peer)->setNonLegacyPeer();
2454 0 : throw ProcessedSilentlyException("Got non legacy control");
2455 : }
2456 : else{
2457 0 : LOG(derr_con<<m_connection->getDesc()
2458 : <<"INVALID TYPE_CONTROL: invalid controltype="
2459 0 : <<((int)controltype&0xff)<<std::endl);
2460 0 : throw InvalidIncomingDataException("Invalid control type");
2461 : }
2462 : }
2463 7593 : else if (type == TYPE_ORIGINAL)
2464 : {
2465 653 : if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
2466 : throw InvalidIncomingDataException
2467 0 : ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
2468 1306 : LOG(dout_con<<m_connection->getDesc()
2469 : <<"RETURNING TYPE_ORIGINAL to user"
2470 653 : <<std::endl);
2471 : // Get the inside packet out and return it
2472 1306 : SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
2473 653 : memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
2474 653 : return payload;
2475 : }
2476 6940 : else if (type == TYPE_SPLIT)
2477 : {
2478 3331 : Address peer_address;
2479 :
2480 3331 : if (peer->getAddress(MTP_UDP, peer_address)) {
2481 :
2482 : // We have to create a packet again for buffering
2483 : // This isn't actually too bad an idea.
2484 : BufferedPacket packet = makePacket(
2485 : peer_address,
2486 : packetdata,
2487 3331 : m_connection->GetProtocolID(),
2488 : peer_id,
2489 9993 : channelnum);
2490 :
2491 : // Buffer the packet
2492 : SharedBuffer<u8> data =
2493 6662 : peer->addSpiltPacket(channelnum,packet,reliable);
2494 :
2495 3331 : if (data.getSize() != 0)
2496 : {
2497 1344 : LOG(dout_con<<m_connection->getDesc()
2498 : <<"RETURNING TYPE_SPLIT: Constructed full data, "
2499 672 : <<"size="<<data.getSize()<<std::endl);
2500 672 : return data;
2501 : }
2502 2659 : LOG(dout_con<<m_connection->getDesc()<<"BUFFERED TYPE_SPLIT"<<std::endl);
2503 2659 : throw ProcessedSilentlyException("Buffered a split packet chunk");
2504 : }
2505 : else {
2506 : //TODO throw some error
2507 : }
2508 : }
2509 3609 : else if (type == TYPE_RELIABLE)
2510 : {
2511 3609 : FATAL_ERROR_IF(channel == 0, "Invalid channel (0)");
2512 : // Recursive reliable packets not allowed
2513 3609 : if (reliable)
2514 0 : throw InvalidIncomingDataException("Found nested reliable packets");
2515 :
2516 3609 : if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
2517 : throw InvalidIncomingDataException
2518 0 : ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
2519 :
2520 3609 : u16 seqnum = readU16(&packetdata[1]);
2521 3609 : bool is_future_packet = false;
2522 3609 : bool is_old_packet = false;
2523 :
2524 : /* packet is within our receive window send ack */
2525 3609 : if (seqnum_in_window(seqnum, channel->readNextIncomingSeqNum(),MAX_RELIABLE_WINDOW_SIZE))
2526 : {
2527 3609 : m_connection->sendAck(peer_id,channelnum,seqnum);
2528 : }
2529 : else {
2530 0 : is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
2531 0 : is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
2532 :
2533 :
2534 : /* packet is not within receive window, don't send ack. *
2535 : * if this was a valid packet it's gonna be retransmitted */
2536 0 : if (is_future_packet)
2537 : {
2538 0 : throw ProcessedSilentlyException("Received packet newer then expected, not sending ack");
2539 : }
2540 :
2541 : /* seems like our ack was lost, send another one for a old packet */
2542 0 : if (is_old_packet)
2543 : {
2544 0 : LOG(dout_con<<m_connection->getDesc()
2545 : << "RE-SENDING ACK: peer_id: " << peer_id
2546 : << ", channel: " << (channelnum&0xFF)
2547 : << ", seqnum: " << seqnum << std::endl;)
2548 0 : m_connection->sendAck(peer_id,channelnum,seqnum);
2549 :
2550 : // we already have this packet so this one was on wire at least
2551 : // the current timeout
2552 : // we don't know how long this packet was on wire don't do silly guessing
2553 : // dynamic_cast<UDPPeer*>(&peer)->reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
2554 :
2555 0 : throw ProcessedSilentlyException("Retransmitting ack for old packet");
2556 : }
2557 : }
2558 :
2559 3609 : if (seqnum != channel->readNextIncomingSeqNum())
2560 : {
2561 407 : Address peer_address;
2562 :
2563 : // this is a reliable packet so we have a udp address for sure
2564 407 : peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
2565 : // This one comes later, buffer it.
2566 : // Actually we have to make a packet to buffer one.
2567 : // Well, we have all the ingredients, so just do it.
2568 : BufferedPacket packet = con::makePacket(
2569 : peer_address,
2570 : packetdata,
2571 407 : m_connection->GetProtocolID(),
2572 : peer_id,
2573 1221 : channelnum);
2574 : try{
2575 407 : channel->incoming_reliables.insert(packet,channel->readNextIncomingSeqNum());
2576 :
2577 407 : LOG(dout_con<<m_connection->getDesc()
2578 : << "BUFFERING, TYPE_RELIABLE peer_id: " << peer_id
2579 : << ", channel: " << (channelnum&0xFF)
2580 : << ", seqnum: " << seqnum << std::endl;)
2581 :
2582 407 : throw ProcessedQueued("Buffered future reliable packet");
2583 : }
2584 0 : catch(AlreadyExistsException &e)
2585 : {
2586 : }
2587 0 : catch(IncomingDataCorruption &e)
2588 : {
2589 0 : ConnectionCommand discon;
2590 0 : discon.disconnect_peer(peer_id);
2591 0 : m_connection->putCommand(discon);
2592 :
2593 0 : LOG(derr_con<<m_connection->getDesc()
2594 : << "INVALID, TYPE_RELIABLE peer_id: " << peer_id
2595 : << ", channel: " << (channelnum&0xFF)
2596 : << ", seqnum: " << seqnum
2597 : << "DROPPING CLIENT!" << std::endl;)
2598 : }
2599 : }
2600 :
2601 : /* we got a packet to process right now */
2602 3202 : LOG(dout_con<<m_connection->getDesc()
2603 : << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer_id
2604 : << ", channel: " << (channelnum&0xFF)
2605 : << ", seqnum: " << seqnum << std::endl;)
2606 :
2607 :
2608 : /* check for resend case */
2609 3202 : u16 queued_seqnum = 0;
2610 3202 : if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum))
2611 : {
2612 317 : if (queued_seqnum == seqnum)
2613 : {
2614 0 : BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
2615 : /** TODO find a way to verify the new against the old packet */
2616 : }
2617 : }
2618 :
2619 3202 : channel->incNextIncomingSeqNum();
2620 :
2621 : // Get out the inside packet and re-process it
2622 6404 : SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
2623 3202 : memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
2624 :
2625 5369 : return processPacket(channel, payload, peer_id, channelnum, true);
2626 : }
2627 : else
2628 : {
2629 0 : derr_con<<m_connection->getDesc()
2630 0 : <<"Got invalid type="<<((int)type&0xff)<<std::endl;
2631 0 : throw InvalidIncomingDataException("Invalid packet type");
2632 : }
2633 :
2634 : // We should never get here.
2635 0 : FATAL_ERROR("Invalid execution point");
2636 : }
2637 :
2638 : /*
2639 : Connection
2640 : */
2641 :
2642 0 : Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
2643 : bool ipv6) :
2644 : m_udpSocket(ipv6),
2645 : m_command_queue(),
2646 : m_event_queue(),
2647 : m_peer_id(0),
2648 : m_protocol_id(protocol_id),
2649 : m_sendThread(max_packet_size, timeout),
2650 : m_receiveThread(max_packet_size),
2651 : m_info_mutex(),
2652 : m_bc_peerhandler(0),
2653 : m_bc_receive_timeout(0),
2654 : m_shutting_down(false),
2655 0 : m_next_remote_peer_id(2)
2656 : {
2657 0 : m_udpSocket.setTimeoutMs(5);
2658 :
2659 0 : m_sendThread.setParent(this);
2660 0 : m_receiveThread.setParent(this);
2661 :
2662 0 : m_sendThread.Start();
2663 0 : m_receiveThread.Start();
2664 0 : }
2665 :
2666 1 : Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
2667 : bool ipv6, PeerHandler *peerhandler) :
2668 : m_udpSocket(ipv6),
2669 : m_command_queue(),
2670 : m_event_queue(),
2671 : m_peer_id(0),
2672 : m_protocol_id(protocol_id),
2673 : m_sendThread(max_packet_size, timeout),
2674 : m_receiveThread(max_packet_size),
2675 : m_info_mutex(),
2676 : m_bc_peerhandler(peerhandler),
2677 : m_bc_receive_timeout(0),
2678 : m_shutting_down(false),
2679 1 : m_next_remote_peer_id(2)
2680 :
2681 : {
2682 1 : m_udpSocket.setTimeoutMs(5);
2683 :
2684 1 : m_sendThread.setParent(this);
2685 1 : m_receiveThread.setParent(this);
2686 :
2687 1 : m_sendThread.Start();
2688 1 : m_receiveThread.Start();
2689 :
2690 1 : }
2691 :
2692 :
2693 2 : Connection::~Connection()
2694 : {
2695 1 : m_shutting_down = true;
2696 : // request threads to stop
2697 1 : m_sendThread.Stop();
2698 1 : m_receiveThread.Stop();
2699 :
2700 : //TODO for some unkonwn reason send/receive threads do not exit as they're
2701 : // supposed to be but wait on peer timeout. To speed up shutdown we reduce
2702 : // timeout to half a second.
2703 1 : m_sendThread.setPeerTimeout(0.5);
2704 :
2705 : // wait for threads to finish
2706 1 : m_sendThread.Wait();
2707 1 : m_receiveThread.Wait();
2708 :
2709 : // Delete peers
2710 5 : for(std::map<u16, Peer*>::iterator
2711 1 : j = m_peers.begin();
2712 4 : j != m_peers.end(); ++j)
2713 : {
2714 1 : delete j->second;
2715 : }
2716 1 : }
2717 :
2718 : /* Internal stuff */
2719 1326 : void Connection::putEvent(ConnectionEvent &e)
2720 : {
2721 : assert(e.type != CONNEVENT_NONE); // Pre-condition
2722 1326 : m_event_queue.push_back(e);
2723 1326 : }
2724 :
2725 0 : PeerHelper Connection::getPeer(u16 peer_id)
2726 : {
2727 0 : JMutexAutoLock peerlock(m_peers_mutex);
2728 0 : std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
2729 :
2730 0 : if (node == m_peers.end()) {
2731 0 : throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
2732 : }
2733 :
2734 : // Error checking
2735 0 : FATAL_ERROR_IF(node->second->id != peer_id, "Invalid peer id");
2736 :
2737 0 : return PeerHelper(node->second);
2738 : }
2739 :
2740 29927 : PeerHelper Connection::getPeerNoEx(u16 peer_id)
2741 : {
2742 59856 : JMutexAutoLock peerlock(m_peers_mutex);
2743 29929 : std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
2744 :
2745 29929 : if (node == m_peers.end()) {
2746 1 : return PeerHelper(NULL);
2747 : }
2748 :
2749 : // Error checking
2750 29928 : FATAL_ERROR_IF(node->second->id != peer_id, "Invalid peer id");
2751 :
2752 29928 : return PeerHelper(node->second);
2753 : }
2754 :
2755 : /* find peer_id for address */
2756 0 : u16 Connection::lookupPeer(Address& sender)
2757 : {
2758 0 : JMutexAutoLock peerlock(m_peers_mutex);
2759 0 : std::map<u16, Peer*>::iterator j;
2760 0 : j = m_peers.begin();
2761 0 : for(; j != m_peers.end(); ++j)
2762 : {
2763 0 : Peer *peer = j->second;
2764 0 : if (peer->isActive())
2765 0 : continue;
2766 :
2767 0 : Address tocheck;
2768 :
2769 0 : if ((peer->getAddress(MTP_MINETEST_RELIABLE_UDP, tocheck)) && (tocheck == sender))
2770 0 : return peer->id;
2771 :
2772 0 : if ((peer->getAddress(MTP_UDP, tocheck)) && (tocheck == sender))
2773 0 : return peer->id;
2774 : }
2775 :
2776 0 : return PEER_ID_INEXISTENT;
2777 : }
2778 :
2779 0 : std::list<Peer*> Connection::getPeers()
2780 : {
2781 0 : std::list<Peer*> list;
2782 0 : for(std::map<u16, Peer*>::iterator j = m_peers.begin();
2783 0 : j != m_peers.end(); ++j)
2784 : {
2785 0 : Peer *peer = j->second;
2786 0 : list.push_back(peer);
2787 : }
2788 0 : return list;
2789 : }
2790 :
2791 0 : bool Connection::deletePeer(u16 peer_id, bool timeout)
2792 : {
2793 0 : Peer *peer = 0;
2794 :
2795 : /* lock list as short as possible */
2796 : {
2797 0 : JMutexAutoLock peerlock(m_peers_mutex);
2798 0 : if (m_peers.find(peer_id) == m_peers.end())
2799 0 : return false;
2800 0 : peer = m_peers[peer_id];
2801 0 : m_peers.erase(peer_id);
2802 0 : m_peer_ids.remove(peer_id);
2803 : }
2804 :
2805 0 : Address peer_address;
2806 : //any peer has a primary address this never fails!
2807 0 : peer->getAddress(MTP_PRIMARY, peer_address);
2808 : // Create event
2809 0 : ConnectionEvent e;
2810 0 : e.peerRemoved(peer_id, timeout, peer_address);
2811 0 : putEvent(e);
2812 :
2813 :
2814 0 : peer->Drop();
2815 0 : return true;
2816 : }
2817 :
2818 : /* Interface */
2819 :
2820 0 : ConnectionEvent Connection::getEvent()
2821 : {
2822 0 : if (m_event_queue.empty()) {
2823 0 : ConnectionEvent e;
2824 0 : e.type = CONNEVENT_NONE;
2825 0 : return e;
2826 : }
2827 0 : return m_event_queue.pop_frontNoEx();
2828 : }
2829 :
2830 2512 : ConnectionEvent Connection::waitEvent(u32 timeout_ms)
2831 : {
2832 : try {
2833 2512 : return m_event_queue.pop_front(timeout_ms);
2834 2380 : } catch(ItemNotFoundException &ex) {
2835 2380 : ConnectionEvent e;
2836 1190 : e.type = CONNEVENT_NONE;
2837 1190 : return e;
2838 : }
2839 : }
2840 :
2841 4604 : void Connection::putCommand(ConnectionCommand &c)
2842 : {
2843 4604 : if (!m_shutting_down) {
2844 4604 : m_command_queue.push_back(c);
2845 4604 : m_sendThread.Trigger();
2846 : }
2847 4604 : }
2848 :
2849 0 : void Connection::Serve(Address bind_addr)
2850 : {
2851 0 : ConnectionCommand c;
2852 0 : c.serve(bind_addr);
2853 0 : putCommand(c);
2854 0 : }
2855 :
2856 1 : void Connection::Connect(Address address)
2857 : {
2858 2 : ConnectionCommand c;
2859 1 : c.connect(address);
2860 1 : putCommand(c);
2861 1 : }
2862 :
2863 0 : bool Connection::Connected()
2864 : {
2865 0 : JMutexAutoLock peerlock(m_peers_mutex);
2866 :
2867 0 : if (m_peers.size() != 1)
2868 0 : return false;
2869 :
2870 0 : std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
2871 0 : if (node == m_peers.end())
2872 0 : return false;
2873 :
2874 0 : if (m_peer_id == PEER_ID_INEXISTENT)
2875 0 : return false;
2876 :
2877 0 : return true;
2878 : }
2879 :
2880 1 : void Connection::Disconnect()
2881 : {
2882 2 : ConnectionCommand c;
2883 1 : c.disconnect();
2884 1 : putCommand(c);
2885 1 : }
2886 :
2887 2512 : void Connection::Receive(NetworkPacket* pkt)
2888 : {
2889 1 : for(;;) {
2890 3702 : ConnectionEvent e = waitEvent(m_bc_receive_timeout);
2891 2512 : if (e.type != CONNEVENT_NONE)
2892 2644 : LOG(dout_con << getDesc() << ": Receive: got event: "
2893 1322 : << e.describe() << std::endl);
2894 2512 : switch(e.type) {
2895 : case CONNEVENT_NONE:
2896 1190 : throw NoIncomingDataException("No incoming data");
2897 : case CONNEVENT_DATA_RECEIVED:
2898 : // Data size is lesser than command size, ignoring packet
2899 1321 : if (e.data.getSize() < 2) {
2900 0 : continue;
2901 : }
2902 :
2903 1321 : pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id);
2904 2642 : return;
2905 : case CONNEVENT_PEER_ADDED: {
2906 2 : UDPPeer tmp(e.peer_id, e.address, this);
2907 1 : if (m_bc_peerhandler)
2908 1 : m_bc_peerhandler->peerAdded(&tmp);
2909 1 : continue;
2910 : }
2911 : case CONNEVENT_PEER_REMOVED: {
2912 0 : UDPPeer tmp(e.peer_id, e.address, this);
2913 0 : if (m_bc_peerhandler)
2914 0 : m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
2915 0 : continue;
2916 : }
2917 : case CONNEVENT_BIND_FAILED:
2918 : throw ConnectionBindFailed("Failed to bind socket "
2919 0 : "(port already in use?)");
2920 : }
2921 : }
2922 : throw NoIncomingDataException("No incoming data");
2923 : }
2924 :
2925 992 : void Connection::Send(u16 peer_id, u8 channelnum,
2926 : NetworkPacket* pkt, bool reliable)
2927 : {
2928 : assert(channelnum < CHANNEL_COUNT); // Pre-condition
2929 :
2930 1984 : ConnectionCommand c;
2931 :
2932 992 : c.send(peer_id, channelnum, pkt, reliable);
2933 992 : putCommand(c);
2934 992 : }
2935 :
2936 0 : Address Connection::GetPeerAddress(u16 peer_id)
2937 : {
2938 0 : PeerHelper peer = getPeerNoEx(peer_id);
2939 :
2940 0 : if (!peer)
2941 0 : throw PeerNotFoundException("No address for peer found!");
2942 0 : Address peer_address;
2943 0 : peer->getAddress(MTP_PRIMARY, peer_address);
2944 0 : return peer_address;
2945 : }
2946 :
2947 3 : float Connection::getPeerStat(u16 peer_id, rtt_stat_type type)
2948 : {
2949 6 : PeerHelper peer = getPeerNoEx(peer_id);
2950 3 : if (!peer) return -1;
2951 3 : return peer->getStat(type);
2952 : }
2953 :
2954 0 : float Connection::getLocalStat(rate_stat_type type)
2955 : {
2956 0 : PeerHelper peer = getPeerNoEx(PEER_ID_SERVER);
2957 :
2958 0 : FATAL_ERROR_IF(!peer, "Connection::getLocalStat we couldn't get our own peer? are you serious???");
2959 :
2960 0 : float retval = 0.0;
2961 :
2962 0 : for (u16 j=0; j<CHANNEL_COUNT; j++) {
2963 0 : switch(type) {
2964 : case CUR_DL_RATE:
2965 0 : retval += dynamic_cast<UDPPeer*>(&peer)->channels[j].getCurrentDownloadRateKB();
2966 0 : break;
2967 : case AVG_DL_RATE:
2968 0 : retval += dynamic_cast<UDPPeer*>(&peer)->channels[j].getAvgDownloadRateKB();
2969 0 : break;
2970 : case CUR_INC_RATE:
2971 0 : retval += dynamic_cast<UDPPeer*>(&peer)->channels[j].getCurrentIncomingRateKB();
2972 0 : break;
2973 : case AVG_INC_RATE:
2974 0 : retval += dynamic_cast<UDPPeer*>(&peer)->channels[j].getAvgIncomingRateKB();
2975 0 : break;
2976 : case AVG_LOSS_RATE:
2977 0 : retval += dynamic_cast<UDPPeer*>(&peer)->channels[j].getAvgLossRateKB();
2978 0 : break;
2979 : case CUR_LOSS_RATE:
2980 0 : retval += dynamic_cast<UDPPeer*>(&peer)->channels[j].getCurrentLossRateKB();
2981 0 : break;
2982 : default:
2983 0 : FATAL_ERROR("Connection::getLocalStat Invalid stat type");
2984 : }
2985 : }
2986 0 : return retval;
2987 : }
2988 :
2989 0 : u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
2990 : {
2991 : // Somebody wants to make a new connection
2992 :
2993 : // Get a unique peer id (2 or higher)
2994 0 : u16 peer_id_new = m_next_remote_peer_id;
2995 0 : u16 overflow = MAX_UDP_PEERS;
2996 :
2997 : /*
2998 : Find an unused peer id
2999 : */
3000 0 : JMutexAutoLock lock(m_peers_mutex);
3001 0 : bool out_of_ids = false;
3002 0 : for(;;) {
3003 : // Check if exists
3004 0 : if (m_peers.find(peer_id_new) == m_peers.end())
3005 :
3006 0 : break;
3007 : // Check for overflow
3008 0 : if (peer_id_new == overflow) {
3009 0 : out_of_ids = true;
3010 0 : break;
3011 : }
3012 0 : peer_id_new++;
3013 : }
3014 :
3015 0 : if (out_of_ids) {
3016 0 : errorstream << getDesc() << " ran out of peer ids" << std::endl;
3017 0 : return PEER_ID_INEXISTENT;
3018 : }
3019 :
3020 : // Create a peer
3021 0 : Peer *peer = 0;
3022 0 : peer = new UDPPeer(peer_id_new, sender, this);
3023 :
3024 0 : m_peers[peer->id] = peer;
3025 0 : m_peer_ids.push_back(peer->id);
3026 :
3027 0 : m_next_remote_peer_id = (peer_id_new +1 ) % MAX_UDP_PEERS;
3028 :
3029 0 : LOG(dout_con << getDesc()
3030 0 : << "createPeer(): giving peer_id=" << peer_id_new << std::endl);
3031 :
3032 0 : ConnectionCommand cmd;
3033 0 : SharedBuffer<u8> reply(4);
3034 0 : writeU8(&reply[0], TYPE_CONTROL);
3035 0 : writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
3036 0 : writeU16(&reply[2], peer_id_new);
3037 0 : cmd.createPeer(peer_id_new,reply);
3038 0 : putCommand(cmd);
3039 :
3040 : // Create peer addition event
3041 0 : ConnectionEvent e;
3042 0 : e.peerAdded(peer_id_new, sender);
3043 0 : putEvent(e);
3044 :
3045 : // We're now talking to a valid peer_id
3046 0 : return peer_id_new;
3047 : }
3048 :
3049 0 : void Connection::PrintInfo(std::ostream &out)
3050 : {
3051 0 : m_info_mutex.Lock();
3052 0 : out<<getDesc()<<": ";
3053 0 : m_info_mutex.Unlock();
3054 0 : }
3055 :
3056 0 : void Connection::PrintInfo()
3057 : {
3058 0 : PrintInfo(dout_con);
3059 0 : }
3060 :
3061 41076 : const std::string Connection::getDesc()
3062 : {
3063 82158 : return std::string("con(")+
3064 123231 : itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")";
3065 : }
3066 :
3067 0 : void Connection::DisconnectPeer(u16 peer_id)
3068 : {
3069 0 : ConnectionCommand discon;
3070 0 : discon.disconnect_peer(peer_id);
3071 0 : putCommand(discon);
3072 0 : }
3073 :
3074 3609 : void Connection::sendAck(u16 peer_id, u8 channelnum, u16 seqnum)
3075 : {
3076 : assert(channelnum < CHANNEL_COUNT); // Pre-condition
3077 :
3078 7218 : LOG(dout_con<<getDesc()
3079 : <<" Queuing ACK command to peer_id: " << peer_id <<
3080 : " channel: " << (channelnum & 0xFF) <<
3081 3609 : " seqnum: " << seqnum << std::endl);
3082 :
3083 7218 : ConnectionCommand c;
3084 7218 : SharedBuffer<u8> ack(4);
3085 3609 : writeU8(&ack[0], TYPE_CONTROL);
3086 3609 : writeU8(&ack[1], CONTROLTYPE_ACK);
3087 3609 : writeU16(&ack[2], seqnum);
3088 :
3089 3609 : c.ack(peer_id, channelnum, ack);
3090 3609 : putCommand(c);
3091 3609 : m_sendThread.Trigger();
3092 3609 : }
3093 :
3094 1 : UDPPeer* Connection::createServerPeer(Address& address)
3095 : {
3096 1 : if (getPeerNoEx(PEER_ID_SERVER) != 0)
3097 : {
3098 0 : throw ConnectionException("Already connected to a server");
3099 : }
3100 :
3101 1 : UDPPeer *peer = new UDPPeer(PEER_ID_SERVER, address, this);
3102 :
3103 : {
3104 2 : JMutexAutoLock lock(m_peers_mutex);
3105 1 : m_peers[peer->id] = peer;
3106 1 : m_peer_ids.push_back(peer->id);
3107 : }
3108 :
3109 1 : return peer;
3110 : }
3111 :
3112 3 : } // namespace
|