Line data Source code
1 : /*
2 : Minetest
3 : Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4 :
5 : This program is free software; you can redistribute it and/or modify
6 : it under the terms of the GNU Lesser General Public License as published by
7 : the Free Software Foundation; either version 2.1 of the License, or
8 : (at your option) any later version.
9 :
10 : This program is distributed in the hope that it will be useful,
11 : but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : GNU Lesser General Public License for more details.
14 :
15 : You should have received a copy of the GNU Lesser General Public License along
16 : with this program; if not, write to the Free Software Foundation, Inc.,
17 : 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #ifndef CONNECTION_HEADER
21 : #define CONNECTION_HEADER
22 :
23 : #include "irrlichttypes_bloated.h"
24 : #include "socket.h"
25 : #include "exceptions.h"
26 : #include "constants.h"
27 : #include "network/networkpacket.h"
28 : #include "util/pointer.h"
29 : #include "util/container.h"
30 : #include "util/thread.h"
31 : #include "util/numeric.h"
32 : #include <iostream>
33 : #include <fstream>
34 : #include <list>
35 : #include <map>
36 :
37 : class NetworkPacket;
38 :
39 : namespace con
40 : {
41 :
42 : /*
43 : Exceptions
44 : */
45 0 : class NotFoundException : public BaseException
46 : {
47 : public:
48 0 : NotFoundException(const char *s):
49 0 : BaseException(s)
50 0 : {}
51 : };
52 :
53 0 : class PeerNotFoundException : public BaseException
54 : {
55 : public:
56 0 : PeerNotFoundException(const char *s):
57 0 : BaseException(s)
58 0 : {}
59 : };
60 :
61 0 : class ConnectionException : public BaseException
62 : {
63 : public:
64 0 : ConnectionException(const char *s):
65 0 : BaseException(s)
66 0 : {}
67 : };
68 :
69 0 : class ConnectionBindFailed : public BaseException
70 : {
71 : public:
72 0 : ConnectionBindFailed(const char *s):
73 0 : BaseException(s)
74 0 : {}
75 : };
76 :
77 0 : class InvalidIncomingDataException : public BaseException
78 : {
79 : public:
80 0 : InvalidIncomingDataException(const char *s):
81 0 : BaseException(s)
82 0 : {}
83 : };
84 :
85 : class InvalidOutgoingDataException : public BaseException
86 : {
87 : public:
88 : InvalidOutgoingDataException(const char *s):
89 : BaseException(s)
90 : {}
91 : };
92 :
93 1190 : class NoIncomingDataException : public BaseException
94 : {
95 : public:
96 1190 : NoIncomingDataException(const char *s):
97 1190 : BaseException(s)
98 1190 : {}
99 : };
100 :
101 3470 : class ProcessedSilentlyException : public BaseException
102 : {
103 : public:
104 3470 : ProcessedSilentlyException(const char *s):
105 3470 : BaseException(s)
106 3470 : {}
107 : };
108 :
109 407 : class ProcessedQueued : public BaseException
110 : {
111 : public:
112 407 : ProcessedQueued(const char *s):
113 407 : BaseException(s)
114 407 : {}
115 : };
116 :
117 0 : class IncomingDataCorruption : public BaseException
118 : {
119 : public:
120 0 : IncomingDataCorruption(const char *s):
121 0 : BaseException(s)
122 0 : {}
123 : };
124 :
125 : typedef enum MTProtocols {
126 : MTP_PRIMARY,
127 : MTP_UDP,
128 : MTP_MINETEST_RELIABLE_UDP
129 : } MTProtocols;
130 :
131 : #define SEQNUM_MAX 65535
132 0 : inline bool seqnum_higher(u16 totest, u16 base)
133 : {
134 0 : if (totest > base)
135 : {
136 0 : if ((totest - base) > (SEQNUM_MAX/2))
137 0 : return false;
138 : else
139 0 : return true;
140 : }
141 : else
142 : {
143 0 : if ((base - totest) > (SEQNUM_MAX/2))
144 0 : return true;
145 : else
146 0 : return false;
147 : }
148 : }
149 :
150 4818 : inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
151 : {
152 4818 : u16 window_start = next;
153 4818 : u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1);
154 :
155 4818 : if (window_start < window_end)
156 : {
157 4011 : return ((seqnum >= window_start) && (seqnum < window_end));
158 : }
159 : else
160 : {
161 807 : return ((seqnum < window_end) || (seqnum >= window_start));
162 : }
163 : }
164 :
165 29135 : struct BufferedPacket
166 : {
167 : BufferedPacket(u8 *a_data, u32 a_size):
168 : data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1),
169 : resend_count(0)
170 : {}
171 8348 : BufferedPacket(u32 a_size):
172 : data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1),
173 8348 : resend_count(0)
174 8349 : {}
175 : SharedBuffer<u8> data; // Data of the packet, including headers
176 : float time; // Seconds from buffering the packet or re-sending
177 : float totaltime; // Seconds from buffering the packet
178 : unsigned int absolute_send_time;
179 : Address address; // Sender or destination
180 : unsigned int resend_count;
181 : };
182 :
183 : // This adds the base headers to the data and makes a packet out of it
184 : BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
185 : u32 protocol_id, u16 sender_peer_id, u8 channel);
186 : BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
187 : u32 protocol_id, u16 sender_peer_id, u8 channel);
188 :
189 : // Add the TYPE_ORIGINAL header to the data
190 : SharedBuffer<u8> makeOriginalPacket(
191 : SharedBuffer<u8> data);
192 :
193 : // Split data in chunks and add TYPE_SPLIT headers to them
194 : std::list<SharedBuffer<u8> > makeSplitPacket(
195 : SharedBuffer<u8> data,
196 : u32 chunksize_max,
197 : u16 seqnum);
198 :
199 : // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
200 : // Increments split_seqnum if a split packet is made
201 : std::list<SharedBuffer<u8> > makeAutoSplitPacket(
202 : SharedBuffer<u8> data,
203 : u32 chunksize_max,
204 : u16 &split_seqnum);
205 :
206 : // Add the TYPE_RELIABLE header to the data
207 : SharedBuffer<u8> makeReliablePacket(
208 : SharedBuffer<u8> data,
209 : u16 seqnum);
210 :
211 672 : struct IncomingSplitPacket
212 : {
213 672 : IncomingSplitPacket()
214 672 : {
215 672 : time = 0.0;
216 672 : reliable = false;
217 672 : }
218 : // Key is chunk number, value is data without headers
219 : std::map<u16, SharedBuffer<u8> > chunks;
220 : u32 chunk_count;
221 : float time; // Seconds from adding
222 : bool reliable; // If true, isn't deleted on timeout
223 :
224 3331 : bool allReceived()
225 : {
226 3331 : return (chunks.size() == chunk_count);
227 : }
228 : };
229 :
230 : /*
231 : === NOTES ===
232 :
233 : A packet is sent through a channel to a peer with a basic header:
234 : TODO: Should we have a receiver_peer_id also?
235 : Header (7 bytes):
236 : [0] u32 protocol_id
237 : [4] u16 sender_peer_id
238 : [6] u8 channel
239 : sender_peer_id:
240 : Unique to each peer.
241 : value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
242 : value 1 (PEER_ID_SERVER) is reserved for server
243 : these constants are defined in constants.h
244 : channel:
245 : The lower the number, the higher the priority is.
246 : Only channels 0, 1 and 2 exist.
247 : */
248 : #define BASE_HEADER_SIZE 7
249 : #define CHANNEL_COUNT 3
250 : /*
251 : Packet types:
252 :
253 : CONTROL: This is a packet used by the protocol.
254 : - When this is processed, nothing is handed to the user.
255 : Header (2 byte):
256 : [0] u8 type
257 : [1] u8 controltype
258 : controltype and data description:
259 : CONTROLTYPE_ACK
260 : [2] u16 seqnum
261 : CONTROLTYPE_SET_PEER_ID
262 : [2] u16 peer_id_new
263 : CONTROLTYPE_PING
264 : - There is no actual reply, but this can be sent in a reliable
265 : packet to get a reply
266 : CONTROLTYPE_DISCO
267 : */
268 : #define TYPE_CONTROL 0
269 : #define CONTROLTYPE_ACK 0
270 : #define CONTROLTYPE_SET_PEER_ID 1
271 : #define CONTROLTYPE_PING 2
272 : #define CONTROLTYPE_DISCO 3
273 : #define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
274 :
275 : /*
276 : ORIGINAL: This is a plain packet with no control and no error
277 : checking at all.
278 : - When this is processed, it is directly handed to the user.
279 : Header (1 byte):
280 : [0] u8 type
281 : */
282 : #define TYPE_ORIGINAL 1
283 : #define ORIGINAL_HEADER_SIZE 1
284 : /*
285 : SPLIT: These are sequences of packets forming one bigger piece of
286 : data.
287 : - When processed and all the packet_nums 0...packet_count-1 are
288 : present (this should be buffered), the resulting data shall be
289 : directly handed to the user.
290 : - If the data fails to come up in a reasonable time, the buffer shall
291 : be silently discarded.
292 : - These can be sent as-is or atop of a RELIABLE packet stream.
293 : Header (7 bytes):
294 : [0] u8 type
295 : [1] u16 seqnum
296 : [3] u16 chunk_count
297 : [5] u16 chunk_num
298 : */
299 : #define TYPE_SPLIT 2
300 : /*
301 : RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
302 : and they shall be delivered in the same order as sent. This is done
303 : with a buffer in the receiving and transmitting end.
304 : - When this is processed, the contents of each packet is recursively
305 : processed as packets.
306 : Header (3 bytes):
307 : [0] u8 type
308 : [1] u16 seqnum
309 :
310 : */
311 : #define TYPE_RELIABLE 3
312 : #define RELIABLE_HEADER_SIZE 3
313 : #define SEQNUM_INITIAL 65500
314 :
315 : /*
316 : A buffer which stores reliable packets and sorts them internally
317 : for fast access to the smallest one.
318 : */
319 :
320 : typedef std::list<BufferedPacket>::iterator RPBSearchResult;
321 :
322 12 : class ReliablePacketBuffer
323 : {
324 : public:
325 : ReliablePacketBuffer();
326 :
327 : bool getFirstSeqnum(u16& result);
328 :
329 : BufferedPacket popFirst();
330 : BufferedPacket popSeqnum(u16 seqnum);
331 : void insert(BufferedPacket &p,u16 next_expected);
332 :
333 : void incrementTimeouts(float dtime);
334 : std::list<BufferedPacket> getTimedOuts(float timeout,
335 : unsigned int max_packets);
336 :
337 : void print();
338 : bool empty();
339 : bool containsPacket(u16 seqnum);
340 : RPBSearchResult notFound();
341 : u32 size();
342 :
343 :
344 : private:
345 : RPBSearchResult findPacket(u16 seqnum);
346 :
347 : std::list<BufferedPacket> m_list;
348 : u32 m_list_size;
349 :
350 : u16 m_oldest_non_answered_ack;
351 :
352 : JMutex m_list_mutex;
353 : };
354 :
355 : /*
356 : A buffer for reconstructing split packets
357 : */
358 :
359 6 : class IncomingSplitBuffer
360 : {
361 : public:
362 : ~IncomingSplitBuffer();
363 : /*
364 : Returns a reference counted buffer of length != 0 when a full split
365 : packet is constructed. If not, returns one of length 0.
366 : */
367 : SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
368 :
369 : void removeUnreliableTimedOuts(float dtime, float timeout);
370 :
371 : private:
372 : // Key is seqnum
373 : std::map<u16, IncomingSplitPacket*> m_buf;
374 :
375 : JMutex m_map_mutex;
376 : };
377 :
378 19045 : struct OutgoingPacket
379 : {
380 : u16 peer_id;
381 : u8 channelnum;
382 : SharedBuffer<u8> data;
383 : bool reliable;
384 : bool ack;
385 :
386 3809 : OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
387 : bool reliable_,bool ack_=false):
388 : peer_id(peer_id_),
389 : channelnum(channelnum_),
390 : data(data_),
391 : reliable(reliable_),
392 3809 : ack(ack_)
393 : {
394 3809 : }
395 : };
396 :
397 : enum ConnectionCommandType{
398 : CONNCMD_NONE,
399 : CONNCMD_SERVE,
400 : CONNCMD_CONNECT,
401 : CONNCMD_DISCONNECT,
402 : CONNCMD_DISCONNECT_PEER,
403 : CONNCMD_SEND,
404 : CONNCMD_SEND_TO_ALL,
405 : CONCMD_ACK,
406 : CONCMD_CREATE_PEER,
407 : CONCMD_DISABLE_LEGACY
408 : };
409 :
410 49473 : struct ConnectionCommand
411 : {
412 : enum ConnectionCommandType type;
413 : Address address;
414 : u16 peer_id;
415 : u8 channelnum;
416 : Buffer<u8> data;
417 : bool reliable;
418 : bool raw;
419 :
420 7988 : ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {}
421 :
422 0 : void serve(Address address_)
423 : {
424 0 : type = CONNCMD_SERVE;
425 0 : address = address_;
426 0 : }
427 1 : void connect(Address address_)
428 : {
429 1 : type = CONNCMD_CONNECT;
430 1 : address = address_;
431 1 : }
432 1 : void disconnect()
433 : {
434 1 : type = CONNCMD_DISCONNECT;
435 1 : }
436 0 : void disconnect_peer(u16 peer_id_)
437 : {
438 0 : type = CONNCMD_DISCONNECT_PEER;
439 0 : peer_id = peer_id_;
440 0 : }
441 992 : void send(u16 peer_id_, u8 channelnum_,
442 : NetworkPacket* pkt, bool reliable_)
443 : {
444 992 : type = CONNCMD_SEND;
445 992 : peer_id = peer_id_;
446 992 : channelnum = channelnum_;
447 992 : data = pkt->oldForgePacket();
448 992 : reliable = reliable_;
449 992 : }
450 :
451 3609 : void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_)
452 : {
453 3609 : type = CONCMD_ACK;
454 3609 : peer_id = peer_id_;
455 3609 : channelnum = channelnum_;
456 3609 : data = data_;
457 3609 : reliable = false;
458 3609 : }
459 :
460 0 : void createPeer(u16 peer_id_, SharedBuffer<u8> data_)
461 : {
462 0 : type = CONCMD_CREATE_PEER;
463 0 : peer_id = peer_id_;
464 0 : data = data_;
465 0 : channelnum = 0;
466 0 : reliable = true;
467 0 : raw = true;
468 0 : }
469 :
470 1 : void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_)
471 : {
472 1 : type = CONCMD_DISABLE_LEGACY;
473 1 : peer_id = peer_id_;
474 1 : data = data_;
475 1 : channelnum = 0;
476 1 : reliable = true;
477 1 : raw = true;
478 1 : }
479 : };
480 :
481 : class Channel
482 : {
483 :
484 : public:
485 : u16 readNextIncomingSeqNum();
486 : u16 incNextIncomingSeqNum();
487 :
488 : u16 getOutgoingSequenceNumber(bool& successfull);
489 : u16 readOutgoingSequenceNumber();
490 : bool putBackSequenceNumber(u16);
491 :
492 : u16 readNextSplitSeqNum();
493 : void setNextSplitSeqNum(u16 seqnum);
494 :
495 : // This is for buffering the incoming packets that are coming in
496 : // the wrong order
497 : ReliablePacketBuffer incoming_reliables;
498 : // This is for buffering the sent packets so that the sender can
499 : // re-send them if no ACK is received
500 : ReliablePacketBuffer outgoing_reliables_sent;
501 :
502 : //queued reliable packets
503 : std::queue<BufferedPacket> queued_reliables;
504 :
505 : //queue commands prior splitting to packets
506 : std::deque<ConnectionCommand> queued_commands;
507 :
508 : IncomingSplitBuffer incoming_splits;
509 :
510 : Channel();
511 : ~Channel();
512 :
513 : void UpdatePacketLossCounter(unsigned int count);
514 : void UpdatePacketTooLateCounter();
515 : void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
516 : void UpdateBytesLost(unsigned int bytes);
517 : void UpdateBytesReceived(unsigned int bytes);
518 :
519 : void UpdateTimers(float dtime, bool legacy_peer);
520 :
521 0 : const float getCurrentDownloadRateKB()
522 0 : { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; };
523 : const float getMaxDownloadRateKB()
524 : { JMutexAutoLock lock(m_internal_mutex); return max_kbps; };
525 :
526 0 : const float getCurrentLossRateKB()
527 0 : { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
528 : const float getMaxLossRateKB()
529 : { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
530 :
531 0 : const float getCurrentIncomingRateKB()
532 0 : { JMutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
533 : const float getMaxIncomingRateKB()
534 : { JMutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
535 :
536 0 : const float getAvgDownloadRateKB()
537 0 : { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; };
538 0 : const float getAvgLossRateKB()
539 0 : { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
540 0 : const float getAvgIncomingRateKB()
541 0 : { JMutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
542 :
543 1595 : const unsigned int getWindowSize() const { return window_size; };
544 :
545 10152 : void setWindowSize(unsigned int size) { window_size = size; };
546 : private:
547 : JMutex m_internal_mutex;
548 : int window_size;
549 :
550 : u16 next_incoming_seqnum;
551 :
552 : u16 next_outgoing_seqnum;
553 : u16 next_outgoing_split_seqnum;
554 :
555 : unsigned int current_packet_loss;
556 : unsigned int current_packet_too_late;
557 : unsigned int current_packet_successfull;
558 : float packet_loss_counter;
559 :
560 : unsigned int current_bytes_transfered;
561 : unsigned int current_bytes_received;
562 : unsigned int current_bytes_lost;
563 : float max_kbps;
564 : float cur_kbps;
565 : float avg_kbps;
566 : float max_incoming_kbps;
567 : float cur_incoming_kbps;
568 : float avg_incoming_kbps;
569 : float max_kbps_lost;
570 : float cur_kbps_lost;
571 : float avg_kbps_lost;
572 : float bpm_counter;
573 :
574 : unsigned int rate_samples;
575 : };
576 :
577 : class Peer;
578 :
579 : enum PeerChangeType
580 : {
581 : PEER_ADDED,
582 : PEER_REMOVED
583 : };
584 : struct PeerChange
585 : {
586 : PeerChangeType type;
587 : u16 peer_id;
588 : bool timeout;
589 : };
590 :
591 : class PeerHandler
592 : {
593 : public:
594 :
595 1 : PeerHandler()
596 1 : {
597 1 : }
598 1 : virtual ~PeerHandler()
599 1 : {
600 1 : }
601 :
602 : /*
603 : This is called after the Peer has been inserted into the
604 : Connection's peer container.
605 : */
606 : virtual void peerAdded(Peer *peer) = 0;
607 : /*
608 : This is called before the Peer has been removed from the
609 : Connection's peer container.
610 : */
611 : virtual void deletingPeer(Peer *peer, bool timeout) = 0;
612 : };
613 :
614 : class PeerHelper
615 : {
616 : public:
617 : PeerHelper();
618 : PeerHelper(Peer* peer);
619 : ~PeerHelper();
620 :
621 : PeerHelper& operator=(Peer* peer);
622 : Peer* operator->() const;
623 : bool operator!();
624 : Peer* operator&() const;
625 : bool operator!=(void* ptr);
626 :
627 : private:
628 : Peer* m_peer;
629 : };
630 :
631 : class Connection;
632 :
633 : typedef enum {
634 : MIN_RTT,
635 : MAX_RTT,
636 : AVG_RTT,
637 : MIN_JITTER,
638 : MAX_JITTER,
639 : AVG_JITTER
640 : } rtt_stat_type;
641 :
642 : typedef enum {
643 : CUR_DL_RATE,
644 : AVG_DL_RATE,
645 : CUR_INC_RATE,
646 : AVG_INC_RATE,
647 : CUR_LOSS_RATE,
648 : AVG_LOSS_RATE,
649 : } rate_stat_type;
650 :
651 : class Peer {
652 : public:
653 : friend class PeerHelper;
654 :
655 2 : Peer(Address address_,u16 id_,Connection* connection) :
656 : id(id_),
657 : m_increment_packets_remaining(9),
658 : m_increment_bytes_remaining(0),
659 : m_pending_deletion(false),
660 : m_connection(connection),
661 : address(address_),
662 : m_ping_timer(0.0),
663 : m_last_rtt(-1.0),
664 : m_usage(0),
665 : m_timeout_counter(0.0),
666 2 : m_last_timeout_check(porting::getTimeMs()),
667 4 : m_has_sent_with_id(false)
668 : {
669 2 : m_rtt.avg_rtt = -1.0;
670 2 : m_rtt.jitter_avg = -1.0;
671 2 : m_rtt.jitter_max = 0.0;
672 2 : m_rtt.max_rtt = 0.0;
673 2 : m_rtt.jitter_min = FLT_MAX;
674 2 : m_rtt.min_rtt = FLT_MAX;
675 2 : };
676 :
677 4 : virtual ~Peer() {
678 4 : JMutexAutoLock usage_lock(m_exclusive_access_mutex);
679 2 : FATAL_ERROR_IF(m_usage != 0, "Reference counting failure");
680 2 : };
681 :
682 : // Unique id of the peer
683 : u16 id;
684 :
685 : void Drop();
686 :
687 0 : virtual void PutReliableSendCommand(ConnectionCommand &c,
688 0 : unsigned int max_packet_size) {};
689 :
690 0 : virtual bool isActive() { return false; };
691 :
692 : virtual bool getAddress(MTProtocols type, Address& toset) = 0;
693 :
694 4795 : void ResetTimeout()
695 4795 : {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
696 :
697 : bool isTimedOut(float timeout);
698 :
699 4795 : void setSentWithID()
700 4795 : { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; };
701 :
702 0 : bool hasSentWithID()
703 0 : { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; };
704 :
705 : unsigned int m_increment_packets_remaining;
706 : unsigned int m_increment_bytes_remaining;
707 :
708 0 : virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
709 0 : virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
710 0 : virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
711 : BufferedPacket toadd,
712 : bool reliable)
713 : {
714 0 : fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
715 0 : return SharedBuffer<u8>(0);
716 : };
717 :
718 0 : virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
719 :
720 513 : virtual float getStat(rtt_stat_type type) const {
721 513 : switch (type) {
722 : case MIN_RTT:
723 0 : return m_rtt.min_rtt;
724 : case MAX_RTT:
725 0 : return m_rtt.max_rtt;
726 : case AVG_RTT:
727 513 : return m_rtt.avg_rtt;
728 : case MIN_JITTER:
729 0 : return m_rtt.jitter_min;
730 : case MAX_JITTER:
731 0 : return m_rtt.jitter_max;
732 : case AVG_JITTER:
733 0 : return m_rtt.jitter_avg;
734 : }
735 0 : return -1;
736 : }
737 : protected:
738 0 : virtual void reportRTT(float rtt) {};
739 :
740 : void RTTStatistics(float rtt,
741 : std::string profiler_id="",
742 : unsigned int num_samples=1000);
743 :
744 : bool IncUseCount();
745 : void DecUseCount();
746 :
747 : JMutex m_exclusive_access_mutex;
748 :
749 : bool m_pending_deletion;
750 :
751 : Connection* m_connection;
752 :
753 : // Address of the peer
754 : Address address;
755 :
756 : // Ping timer
757 : float m_ping_timer;
758 : private:
759 :
760 : struct rttstats {
761 : float jitter_min;
762 : float jitter_max;
763 : float jitter_avg;
764 : float min_rtt;
765 : float max_rtt;
766 : float avg_rtt;
767 : };
768 :
769 : rttstats m_rtt;
770 : float m_last_rtt;
771 :
772 : // current usage count
773 : unsigned int m_usage;
774 :
775 : // Seconds from last receive
776 : float m_timeout_counter;
777 :
778 : u32 m_last_timeout_check;
779 :
780 : bool m_has_sent_with_id;
781 : };
782 :
783 : class UDPPeer : public Peer
784 : {
785 : public:
786 :
787 : friend class PeerHelper;
788 : friend class ConnectionReceiveThread;
789 : friend class ConnectionSendThread;
790 : friend class Connection;
791 :
792 : UDPPeer(u16 a_id, Address a_address, Connection* connection);
793 3 : virtual ~UDPPeer() {};
794 :
795 : void PutReliableSendCommand(ConnectionCommand &c,
796 : unsigned int max_packet_size);
797 :
798 0 : bool isActive()
799 0 : { return ((hasSentWithID()) && (!m_pending_deletion)); };
800 :
801 : bool getAddress(MTProtocols type, Address& toset);
802 :
803 : void setNonLegacyPeer();
804 :
805 20304 : bool getLegacyPeer()
806 20304 : { return m_legacy_peer; }
807 :
808 : u16 getNextSplitSequenceNumber(u8 channel);
809 : void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
810 :
811 : SharedBuffer<u8> addSpiltPacket(u8 channel,
812 : BufferedPacket toadd,
813 : bool reliable);
814 :
815 :
816 : protected:
817 : /*
818 : Calculates avg_rtt and resend_timeout.
819 : rtt=-1 only recalculates resend_timeout
820 : */
821 : void reportRTT(float rtt);
822 :
823 : void RunCommandQueues(
824 : unsigned int max_packet_size,
825 : unsigned int maxcommands,
826 : unsigned int maxtransfer);
827 :
828 3384 : float getResendTimeout()
829 3384 : { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
830 :
831 : void setResendTimeout(float timeout)
832 : { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
833 : bool Ping(float dtime,SharedBuffer<u8>& data);
834 :
835 : Channel channels[CHANNEL_COUNT];
836 : bool m_pending_disconnect;
837 : private:
838 : // This is changed dynamically
839 : float resend_timeout;
840 :
841 : bool processReliableSendCommand(
842 : ConnectionCommand &c,
843 : unsigned int max_packet_size);
844 :
845 : bool m_legacy_peer;
846 : };
847 :
848 : /*
849 : Connection
850 : */
851 :
852 : enum ConnectionEventType{
853 : CONNEVENT_NONE,
854 : CONNEVENT_DATA_RECEIVED,
855 : CONNEVENT_PEER_ADDED,
856 : CONNEVENT_PEER_REMOVED,
857 : CONNEVENT_BIND_FAILED,
858 : };
859 :
860 15488 : struct ConnectionEvent
861 : {
862 : enum ConnectionEventType type;
863 : u16 peer_id;
864 : Buffer<u8> data;
865 : bool timeout;
866 : Address address;
867 :
868 2516 : ConnectionEvent(): type(CONNEVENT_NONE), peer_id(0),
869 2516 : timeout(false) {}
870 :
871 1322 : std::string describe()
872 : {
873 1322 : switch(type) {
874 : case CONNEVENT_NONE:
875 0 : return "CONNEVENT_NONE";
876 : case CONNEVENT_DATA_RECEIVED:
877 1321 : return "CONNEVENT_DATA_RECEIVED";
878 : case CONNEVENT_PEER_ADDED:
879 1 : return "CONNEVENT_PEER_ADDED";
880 : case CONNEVENT_PEER_REMOVED:
881 0 : return "CONNEVENT_PEER_REMOVED";
882 : case CONNEVENT_BIND_FAILED:
883 0 : return "CONNEVENT_BIND_FAILED";
884 : }
885 0 : return "Invalid ConnectionEvent";
886 : }
887 :
888 1325 : void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
889 : {
890 1325 : type = CONNEVENT_DATA_RECEIVED;
891 1325 : peer_id = peer_id_;
892 1325 : data = data_;
893 1325 : }
894 1 : void peerAdded(u16 peer_id_, Address address_)
895 : {
896 1 : type = CONNEVENT_PEER_ADDED;
897 1 : peer_id = peer_id_;
898 1 : address = address_;
899 1 : }
900 0 : void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
901 : {
902 0 : type = CONNEVENT_PEER_REMOVED;
903 0 : peer_id = peer_id_;
904 0 : timeout = timeout_;
905 0 : address = address_;
906 0 : }
907 0 : void bindFailed()
908 : {
909 0 : type = CONNEVENT_BIND_FAILED;
910 0 : }
911 : };
912 :
913 1 : class ConnectionSendThread : public JThread {
914 :
915 : public:
916 : friend class UDPPeer;
917 :
918 : ConnectionSendThread(unsigned int max_packet_size, float timeout);
919 :
920 : void * Thread ();
921 :
922 : void Trigger();
923 :
924 1 : void setParent(Connection* parent) {
925 : assert(parent != NULL); // Pre-condition
926 1 : m_connection = parent;
927 1 : }
928 :
929 1 : void setPeerTimeout(float peer_timeout)
930 1 : { m_timeout = peer_timeout; }
931 :
932 : private:
933 : void runTimeouts (float dtime);
934 : void rawSend (const BufferedPacket &packet);
935 : bool rawSendAsPacket(u16 peer_id, u8 channelnum,
936 : SharedBuffer<u8> data, bool reliable);
937 :
938 : void processReliableCommand (ConnectionCommand &c);
939 : void processNonReliableCommand (ConnectionCommand &c);
940 : void serve (Address bind_address);
941 : void connect (Address address);
942 : void disconnect ();
943 : void disconnect_peer(u16 peer_id);
944 : void send (u16 peer_id, u8 channelnum,
945 : SharedBuffer<u8> data);
946 : void sendReliable (ConnectionCommand &c);
947 : void sendToAll (u8 channelnum,
948 : SharedBuffer<u8> data);
949 : void sendToAllReliable(ConnectionCommand &c);
950 :
951 : void sendPackets (float dtime);
952 :
953 : void sendAsPacket (u16 peer_id, u8 channelnum,
954 : SharedBuffer<u8> data,bool ack=false);
955 :
956 : void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
957 :
958 : bool packetsQueued();
959 :
960 : Connection* m_connection;
961 : unsigned int m_max_packet_size;
962 : float m_timeout;
963 : std::queue<OutgoingPacket> m_outgoing_queue;
964 : JSemaphore m_send_sleep_semaphore;
965 :
966 : unsigned int m_iteration_packets_avaialble;
967 : unsigned int m_max_commands_per_iteration;
968 : unsigned int m_max_data_packets_per_iteration;
969 : unsigned int m_max_packets_requeued;
970 : };
971 :
972 1 : class ConnectionReceiveThread : public JThread {
973 : public:
974 : ConnectionReceiveThread(unsigned int max_packet_size);
975 :
976 : void * Thread ();
977 :
978 1 : void setParent(Connection* parent) {
979 : assert(parent != NULL); // Pre-condition
980 1 : m_connection = parent;
981 1 : }
982 :
983 : private:
984 : void receive ();
985 :
986 : // Returns next data from a buffer if possible
987 : // If found, returns true; if not, false.
988 : // If found, sets peer_id and dst
989 : bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst);
990 :
991 : bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
992 : SharedBuffer<u8> &dst);
993 :
994 : /*
995 : Processes a packet with the basic header stripped out.
996 : Parameters:
997 : packetdata: Data in packet (with no base headers)
998 : peer_id: peer id of the sender of the packet in question
999 : channelnum: channel on which the packet was sent
1000 : reliable: true if recursing into a reliable packet
1001 : */
1002 : SharedBuffer<u8> processPacket(Channel *channel,
1003 : SharedBuffer<u8> packetdata, u16 peer_id,
1004 : u8 channelnum, bool reliable);
1005 :
1006 :
1007 : Connection* m_connection;
1008 : };
1009 :
1010 : class Connection
1011 : {
1012 : public:
1013 : friend class ConnectionSendThread;
1014 : friend class ConnectionReceiveThread;
1015 :
1016 : Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
1017 : Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
1018 : PeerHandler *peerhandler);
1019 : ~Connection();
1020 :
1021 : /* Interface */
1022 : ConnectionEvent getEvent();
1023 : ConnectionEvent waitEvent(u32 timeout_ms);
1024 : void putCommand(ConnectionCommand &c);
1025 :
1026 1 : void SetTimeoutMs(int timeout) { m_bc_receive_timeout = timeout; }
1027 : void Serve(Address bind_addr);
1028 : void Connect(Address address);
1029 : bool Connected();
1030 : void Disconnect();
1031 : void Receive(NetworkPacket* pkt);
1032 : void Send(u16 peer_id, u8 channelnum, NetworkPacket* pkt, bool reliable);
1033 4829 : u16 GetPeerID() { return m_peer_id; }
1034 : Address GetPeerAddress(u16 peer_id);
1035 : float getPeerStat(u16 peer_id, rtt_stat_type type);
1036 : float getLocalStat(rate_stat_type type);
1037 13143 : const u32 GetProtocolID() const { return m_protocol_id; };
1038 : const std::string getDesc();
1039 : void DisconnectPeer(u16 peer_id);
1040 :
1041 : protected:
1042 : PeerHelper getPeer(u16 peer_id);
1043 : PeerHelper getPeerNoEx(u16 peer_id);
1044 : u16 lookupPeer(Address& sender);
1045 :
1046 : u16 createPeer(Address& sender, MTProtocols protocol, int fd);
1047 : UDPPeer* createServerPeer(Address& sender);
1048 : bool deletePeer(u16 peer_id, bool timeout);
1049 :
1050 2 : void SetPeerID(u16 id) { m_peer_id = id; }
1051 :
1052 : void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
1053 :
1054 : void PrintInfo(std::ostream &out);
1055 : void PrintInfo();
1056 :
1057 8109 : std::list<u16> getPeerIDs() { return m_peer_ids; }
1058 :
1059 : UDPSocket m_udpSocket;
1060 : MutexedQueue<ConnectionCommand> m_command_queue;
1061 :
1062 : void putEvent(ConnectionEvent &e);
1063 :
1064 680 : void TriggerSend()
1065 680 : { m_sendThread.Trigger(); }
1066 : private:
1067 : std::list<Peer*> getPeers();
1068 :
1069 : MutexedQueue<ConnectionEvent> m_event_queue;
1070 :
1071 : u16 m_peer_id;
1072 : u32 m_protocol_id;
1073 :
1074 : std::map<u16, Peer*> m_peers;
1075 : std::list<u16> m_peer_ids;
1076 : JMutex m_peers_mutex;
1077 :
1078 : ConnectionSendThread m_sendThread;
1079 : ConnectionReceiveThread m_receiveThread;
1080 :
1081 : JMutex m_info_mutex;
1082 :
1083 : // Backwards compatibility
1084 : PeerHandler *m_bc_peerhandler;
1085 : int m_bc_receive_timeout;
1086 :
1087 : bool m_shutting_down;
1088 :
1089 : u16 m_next_remote_peer_id;
1090 : };
1091 :
1092 : } // namespace
1093 :
1094 : #endif
|