source: trunk/third-party/libutp/utp.cpp @ 13317

Last change on this file since 13317 was 13317, checked in by livings124, 9 years ago

update libutp, fixing #4915

File size: 89.4 KB
Line 
1#include <StdAfx.h>
2
3#include "utp.h"
4#include "templates.h"
5
6#include <stdio.h>
7#include <assert.h>
8#include <string.h>
9#include <string.h>
10#include <stdlib.h>
11#include <errno.h>
12#include <limits.h> // for UINT_MAX
13
14#ifdef WIN32
15#include "win32_inet_ntop.h"
16
17// newer versions of MSVC define these in errno.h
18#ifndef ECONNRESET
19#define ECONNRESET WSAECONNRESET
20#define EMSGSIZE WSAEMSGSIZE
21#define ECONNREFUSED WSAECONNREFUSED
22#define ETIMEDOUT WSAETIMEDOUT
23#endif
24#endif
25
26#ifdef POSIX
27typedef sockaddr_storage SOCKADDR_STORAGE;
28#endif // POSIX
29
30// number of bytes to increase max window size by, per RTT. This is
31// scaled down linearly proportional to off_target. i.e. if all packets
32// in one window have 0 delay, window size will increase by this number.
33// Typically it's less. TCP increases one MSS per RTT, which is 1500
34#define MAX_CWND_INCREASE_BYTES_PER_RTT 3000
35#define CUR_DELAY_SIZE 3
36// experiments suggest that a clock skew of 10 ms per 325 seconds
37// is not impossible. Reset delay_base every 13 minutes. The clock
38// skew is dealt with by observing the delay base in the other
39// direction, and adjusting our own upwards if the opposite direction
40// delay base keeps going down
41#define DELAY_BASE_HISTORY 13
42#define MAX_WINDOW_DECAY 100 // ms
43
44#define REORDER_BUFFER_SIZE 32
45#define REORDER_BUFFER_MAX_SIZE 511
46#define OUTGOING_BUFFER_MAX_SIZE 511
47
48#define PACKET_SIZE 350
49
50// this is the minimum max_window value. It can never drop below this
51#define MIN_WINDOW_SIZE 10
52
53// when window sizes are smaller than one packet_size, this
54// will pace the packets to average at the given window size
55// if it's not set, it will simply not send anything until
56// there's a timeout
57#define USE_PACKET_PACING 1
58
59// if we receive 4 or more duplicate acks, we resend the packet
60// that hasn't been acked yet
61#define DUPLICATE_ACKS_BEFORE_RESEND 3
62
63#define DELAYED_ACK_BYTE_THRESHOLD 2400 // bytes
64#define DELAYED_ACK_TIME_THRESHOLD 100 // milliseconds
65
66#define RST_INFO_TIMEOUT 10000
67#define RST_INFO_LIMIT 1000
68// 29 seconds determined from measuring many home NAT devices
69#define KEEPALIVE_INTERVAL 29000
70
71
72#define SEQ_NR_MASK 0xFFFF
73#define ACK_NR_MASK 0xFFFF
74
75#define DIV_ROUND_UP(num, denom) ((num + denom - 1) / denom)
76
77#include "utp_utils.h"
78#include "utp_config.h"
79
80#define LOG_UTP if (g_log_utp) utp_log
81#define LOG_UTPV if (g_log_utp_verbose) utp_log
82
83uint32 g_current_ms;
84
85// The totals are derived from the following data:
86//  45: IPv6 address including embedded IPv4 address
87//  11: Scope Id
88//   2: Brackets around IPv6 address when port is present
89//   6: Port (including colon)
90//   1: Terminating null byte
91char addrbuf[65];
92char addrbuf2[65];
93#define addrfmt(x, s) x.fmt(s, sizeof(s))
94
95#if (defined(__SVR4) && defined(__sun))
96#pragma pack(1)
97#else
98#pragma pack(push,1)
99#endif
100
101struct PACKED_ATTRIBUTE PackedSockAddr {
102
103        // The values are always stored here in network byte order
104        union {
105                byte _in6[16];          // IPv6
106                uint16 _in6w[8];        // IPv6, word based (for convenience)
107                uint32 _in6d[4];        // Dword access
108                in6_addr _in6addr;      // For convenience
109        } _in;
110
111        // Host byte order
112        uint16 _port;
113
114#define _sin4 _in._in6d[3]      // IPv4 is stored where it goes if mapped
115
116#define _sin6 _in._in6
117#define _sin6w _in._in6w
118#define _sin6d _in._in6d
119
120        byte get_family() const
121        {
122                return (IN6_IS_ADDR_V4MAPPED(&_in._in6addr) != 0) ? AF_INET : AF_INET6;
123        }
124
125        bool operator==(const PackedSockAddr& rhs) const
126        {
127                if (&rhs == this)
128                        return true;
129                if (_port != rhs._port)
130                        return false;
131                return memcmp(_sin6, rhs._sin6, sizeof(_sin6)) == 0;
132        }
133        bool operator!=(const PackedSockAddr& rhs) const { return !(*this == rhs); }
134
135        PackedSockAddr(const SOCKADDR_STORAGE* sa, socklen_t len)
136        {
137                if (sa->ss_family == AF_INET) {
138                        assert(len >= sizeof(sockaddr_in));
139                        const sockaddr_in *sin = (sockaddr_in*)sa;
140                        _sin6w[0] = 0;
141                        _sin6w[1] = 0;
142                        _sin6w[2] = 0;
143                        _sin6w[3] = 0;
144                        _sin6w[4] = 0;
145                        _sin6w[5] = 0xffff;
146                        _sin4 = sin->sin_addr.s_addr;
147                        _port = ntohs(sin->sin_port);
148                } else {
149                        assert(len >= sizeof(sockaddr_in6));
150                        const sockaddr_in6 *sin6 = (sockaddr_in6*)sa;
151                        _in._in6addr = sin6->sin6_addr;
152                        _port = ntohs(sin6->sin6_port);
153                }
154        }
155
156        SOCKADDR_STORAGE get_sockaddr_storage(socklen_t *len = NULL) const
157        {
158                SOCKADDR_STORAGE sa;
159                const byte family = get_family();
160                if (family == AF_INET) {
161                        sockaddr_in *sin = (sockaddr_in*)&sa;
162                        if (len) *len = sizeof(sockaddr_in);
163                        memset(sin, 0, sizeof(sockaddr_in));
164                        sin->sin_family = family;
165                        sin->sin_port = htons(_port);
166                        sin->sin_addr.s_addr = _sin4;
167                } else {
168                        sockaddr_in6 *sin6 = (sockaddr_in6*)&sa;
169                        memset(sin6, 0, sizeof(sockaddr_in6));
170                        if (len) *len = sizeof(sockaddr_in6);
171                        sin6->sin6_family = family;
172                        sin6->sin6_addr = _in._in6addr;
173                        sin6->sin6_port = htons(_port);
174                }
175                return sa;
176        }
177
178        cstr fmt(str s, size_t len) const
179        {
180                memset(s, 0, len);
181                const byte family = get_family();
182                str i;
183                if (family == AF_INET) {
184                        inet_ntop(family, (uint32*)&_sin4, s, len);
185                        i = s;
186                        while (*++i) {}
187                } else {
188                        i = s;
189                        *i++ = '[';
190                        inet_ntop(family, (in6_addr*)&_in._in6addr, i, len-1);
191                        while (*++i) {}
192                        *i++ = ']';
193                }
194                snprintf(i, len - (i-s), ":%u", _port);
195                return s;
196        }
197};
198
199struct PACKED_ATTRIBUTE RST_Info {
200        PackedSockAddr addr;
201        uint32 connid;
202        uint32 timestamp;
203        uint16 ack_nr;
204};
205
206// these packet sizes are including the uTP header wich
207// is either 20 or 23 bytes depending on version
208#define PACKET_SIZE_EMPTY_BUCKET 0
209#define PACKET_SIZE_EMPTY 23
210#define PACKET_SIZE_SMALL_BUCKET 1
211#define PACKET_SIZE_SMALL 373
212#define PACKET_SIZE_MID_BUCKET 2
213#define PACKET_SIZE_MID 723
214#define PACKET_SIZE_BIG_BUCKET 3
215#define PACKET_SIZE_BIG 1400
216#define PACKET_SIZE_HUGE_BUCKET 4
217
218struct PACKED_ATTRIBUTE PacketFormat {
219        // connection ID
220        uint32_big connid;
221        uint32_big tv_sec;
222        uint32_big tv_usec;
223        uint32_big reply_micro;
224        // receive window size in PACKET_SIZE chunks
225        byte windowsize;
226        // Type of the first extension header
227        byte ext;
228        // Flags
229        byte flags;
230        // Sequence number
231        uint16_big seq_nr;
232        // Acknowledgment number
233        uint16_big ack_nr;
234};
235
236struct PACKED_ATTRIBUTE PacketFormatAck {
237        PacketFormat pf;
238        byte ext_next;
239        byte ext_len;
240        byte acks[4];
241};
242
243struct PACKED_ATTRIBUTE PacketFormatExtensions {
244        PacketFormat pf;
245        byte ext_next;
246        byte ext_len;
247        byte extensions[8];
248};
249
250struct PACKED_ATTRIBUTE PacketFormatV1 {
251        // packet_type (4 high bits)
252        // protocol version (4 low bits)
253        byte ver_type;
254        byte version() const { return ver_type & 0xf; }
255        byte type() const { return ver_type >> 4; }
256        void set_version(byte v) { ver_type = (ver_type & 0xf0) | (v & 0xf); }
257        void set_type(byte t) { ver_type = (ver_type & 0xf) | (t << 4); }
258
259        // Type of the first extension header
260        byte ext;
261        // connection ID
262        uint16_big connid;
263        uint32_big tv_usec;
264        uint32_big reply_micro;
265        // receive window size in bytes
266        uint32_big windowsize;
267        // Sequence number
268        uint16_big seq_nr;
269        // Acknowledgment number
270        uint16_big ack_nr;
271};
272
273struct PACKED_ATTRIBUTE PacketFormatAckV1 {
274        PacketFormatV1 pf;
275        byte ext_next;
276        byte ext_len;
277        byte acks[4];
278};
279
280struct PACKED_ATTRIBUTE PacketFormatExtensionsV1 {
281        PacketFormatV1 pf;
282        byte ext_next;
283        byte ext_len;
284        byte extensions[8];
285};
286
287#if (defined(__SVR4) && defined(__sun))
288#pragma pack(0)
289#else
290#pragma pack(pop)
291#endif
292
293enum {
294        ST_DATA = 0,            // Data packet.
295        ST_FIN = 1,                     // Finalize the connection. This is the last packet.
296        ST_STATE = 2,           // State packet. Used to transmit an ACK with no data.
297        ST_RESET = 3,           // Terminate connection forcefully.
298        ST_SYN = 4,                     // Connect SYN
299        ST_NUM_STATES,          // used for bounds checking
300};
301
302static const cstr flagnames[] = {
303        "ST_DATA","ST_FIN","ST_STATE","ST_RESET","ST_SYN"
304};
305
306enum CONN_STATE {
307        CS_IDLE = 0,
308        CS_SYN_SENT = 1,
309        CS_CONNECTED = 2,
310        CS_CONNECTED_FULL = 3,
311        CS_GOT_FIN = 4,
312        CS_DESTROY_DELAY = 5,
313        CS_FIN_SENT = 6,
314        CS_RESET = 7,
315        CS_DESTROY = 8,
316};
317
318static const cstr statenames[] = {
319        "IDLE","SYN_SENT","CONNECTED","CONNECTED_FULL","GOT_FIN","DESTROY_DELAY","FIN_SENT","RESET","DESTROY"
320};
321
322struct OutgoingPacket {
323        size_t length;
324        size_t payload;
325        uint64 time_sent; // microseconds
326        uint transmissions:31;
327        bool need_resend:1;
328        byte data[1];
329};
330
331void no_read(void *socket, const byte *bytes, size_t count) {}
332void no_write(void *socket, byte *bytes, size_t count) {}
333size_t no_rb_size(void *socket) { return 0; }
334void no_state(void *socket, int state) {}
335void no_error(void *socket, int errcode) {}
336void no_overhead(void *socket, bool send, size_t count, int type) {}
337
338UTPFunctionTable zero_funcs = {
339        &no_read,
340        &no_write,
341        &no_rb_size,
342        &no_state,
343        &no_error,
344        &no_overhead,
345};
346
347struct SizableCircularBuffer {
348        // This is the mask. Since it's always a power of 2, adding 1 to this value will return the size.
349        size_t mask;
350        // This is the elements that the circular buffer points to
351        void **elements;
352
353        void *get(size_t i) { assert(elements); return elements ? elements[i & mask] : NULL; }
354        void put(size_t i, void *data) { assert(elements); elements[i&mask] = data; }
355
356        void grow(size_t item, size_t index);
357        void ensure_size(size_t item, size_t index) { if (index > mask) grow(item, index); }
358        size_t size() { return mask + 1; }
359};
360
361static struct UTPGlobalStats _global_stats;
362
363// Item contains the element we want to make space for
364// index is the index in the list.
365void SizableCircularBuffer::grow(size_t item, size_t index)
366{
367        // Figure out the new size.
368        size_t size = mask + 1;
369        do size *= 2; while (index >= size);
370
371        // Allocate the new buffer
372        void **buf = (void**)calloc(size, sizeof(void*));
373
374        size--;
375
376        // Copy elements from the old buffer to the new buffer
377        for (size_t i = 0; i <= mask; i++) {
378                buf[(item - index + i) & size] = get(item - index + i);
379        }
380
381        // Swap to the newly allocated buffer
382        mask = size;
383        free(elements);
384        elements = buf;
385}
386
387// compare if lhs is less than rhs, taking wrapping
388// into account. if lhs is close to UINT_MAX and rhs
389// is close to 0, lhs is assumed to have wrapped and
390// considered smaller
391bool wrapping_compare_less(uint32 lhs, uint32 rhs)
392{
393        // distance walking from lhs to rhs, downwards
394        const uint32 dist_down = lhs - rhs;
395        // distance walking from lhs to rhs, upwards
396        const uint32 dist_up = rhs - lhs;
397
398        // if the distance walking up is shorter, lhs
399        // is less than rhs. If the distance walking down
400        // is shorter, then rhs is less than lhs
401        return dist_up < dist_down;
402}
403
404struct DelayHist {
405        uint32 delay_base;
406
407        // this is the history of delay samples,
408        // normalized by using the delay_base. These
409        // values are always greater than 0 and measures
410        // the queuing delay in microseconds
411        uint32 cur_delay_hist[CUR_DELAY_SIZE];
412        size_t cur_delay_idx;
413
414        // this is the history of delay_base. It's
415        // a number that doesn't have an absolute meaning
416        // only relative. It doesn't make sense to initialize
417        // it to anything other than values relative to
418        // what's been seen in the real world.
419        uint32 delay_base_hist[DELAY_BASE_HISTORY];
420        size_t delay_base_idx;
421        // the time when we last stepped the delay_base_idx
422        uint32 delay_base_time;
423
424        bool delay_base_initialized;
425
426        void clear()
427        {
428                delay_base_initialized = false;
429                delay_base = 0;
430                cur_delay_idx = 0;
431                delay_base_idx = 0;
432                delay_base_time = g_current_ms;
433                for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
434                        cur_delay_hist[i] = 0;
435                }
436                for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
437                        delay_base_hist[i] = 0;
438                }
439        }
440
441        void shift(const uint32 offset)
442        {
443                // the offset should never be "negative"
444                // assert(offset < 0x10000000);
445
446                // increase all of our base delays by this amount
447                // this is used to take clock skew into account
448                // by observing the other side's changes in its base_delay
449                for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
450                        delay_base_hist[i] += offset;
451                }
452                delay_base += offset;
453        }
454
455        void add_sample(const uint32 sample)
456        {
457                // The two clocks (in the two peers) are assumed not to
458                // progress at the exact same rate. They are assumed to be
459                // drifting, which causes the delay samples to contain
460                // a systematic error, either they are under-
461                // estimated or over-estimated. This is why we update the
462                // delay_base every two minutes, to adjust for this.
463
464                // This means the values will keep drifting and eventually wrap.
465                // We can cross the wrapping boundry in two directions, either
466                // going up, crossing the highest value, or going down, crossing 0.
467
468                // if the delay_base is close to the max value and sample actually
469                // wrapped on the other end we would see something like this:
470                // delay_base = 0xffffff00, sample = 0x00000400
471                // sample - delay_base = 0x500 which is the correct difference
472
473                // if the delay_base is instead close to 0, and we got an even lower
474                // sample (that will eventually update the delay_base), we may see
475                // something like this:
476                // delay_base = 0x00000400, sample = 0xffffff00
477                // sample - delay_base = 0xfffffb00
478                // this needs to be interpreted as a negative number and the actual
479                // recorded delay should be 0.
480
481                // It is important that all arithmetic that assume wrapping
482                // is done with unsigned intergers. Signed integers are not guaranteed
483                // to wrap the way unsigned integers do. At least GCC takes advantage
484                // of this relaxed rule and won't necessarily wrap signed ints.
485
486                // remove the clock offset and propagation delay.
487                // delay base is min of the sample and the current
488                // delay base. This min-operation is subject to wrapping
489                // and care needs to be taken to correctly choose the
490                // true minimum.
491
492                // specifically the problem case is when delay_base is very small
493                // and sample is very large (because it wrapped past zero), sample
494                // needs to be considered the smaller
495
496                if (!delay_base_initialized) {
497                        // delay_base being 0 suggests that we haven't initialized
498                        // it or its history with any real measurements yet. Initialize
499                        // everything with this sample.
500                        for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
501                                // if we don't have a value, set it to the current sample
502                                delay_base_hist[i] = sample;
503                                continue;
504                        }
505                        delay_base = sample;
506                        delay_base_initialized = true;
507                }
508
509                if (wrapping_compare_less(sample, delay_base_hist[delay_base_idx])) {
510                        // sample is smaller than the current delay_base_hist entry
511                        // update it
512                        delay_base_hist[delay_base_idx] = sample;
513                }
514
515                // is sample lower than delay_base? If so, update delay_base
516                if (wrapping_compare_less(sample, delay_base)) {
517                        // sample is smaller than the current delay_base
518                        // update it
519                        delay_base = sample;
520                }
521               
522                // this operation may wrap, and is supposed to
523                const uint32 delay = sample - delay_base;
524                // sanity check. If this is triggered, something fishy is going on
525                // it means the measured sample was greater than 32 seconds!
526//              assert(delay < 0x2000000);
527
528                cur_delay_hist[cur_delay_idx] = delay;
529                cur_delay_idx = (cur_delay_idx + 1) % CUR_DELAY_SIZE;
530
531                // once every minute
532                if (g_current_ms - delay_base_time > 60 * 1000) {
533                        delay_base_time = g_current_ms;
534                        delay_base_idx = (delay_base_idx + 1) % DELAY_BASE_HISTORY;
535                        // clear up the new delay base history spot by initializing
536                        // it to the current sample, then update it
537                        delay_base_hist[delay_base_idx] = sample;
538                        delay_base = delay_base_hist[0];
539                        // Assign the lowest delay in the last 2 minutes to delay_base
540                        for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
541                                if (wrapping_compare_less(delay_base_hist[i], delay_base))
542                                        delay_base = delay_base_hist[i];
543                        }
544                }
545        }
546
547        uint32 get_value()
548        {
549                uint32 value = UINT_MAX;
550                for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
551                        value = min<uint32>(cur_delay_hist[i], value);
552                }
553                // value could be UINT_MAX if we have no samples yet...
554                return value;
555        }
556};
557
558struct UTPSocket {
559        PackedSockAddr addr;
560
561        size_t idx;
562
563        uint16 reorder_count;
564        byte duplicate_ack;
565
566        // the number of bytes we've received but not acked yet
567        size_t bytes_since_ack;
568
569        // the number of packets in the send queue. Packets that haven't
570        // yet been sent count as well as packets marked as needing resend
571        // the oldest un-acked packet in the send queue is seq_nr - cur_window_packets
572        uint16 cur_window_packets;
573
574        // how much of the window is used, number of bytes in-flight
575        // packets that have not yet been sent do not count, packets
576        // that are marked as needing to be re-sent (due to a timeout)
577        // don't count either
578        size_t cur_window;
579        // maximum window size, in bytes
580        size_t max_window;
581        // SO_SNDBUF setting, in bytes
582        size_t opt_sndbuf;
583        // SO_RCVBUF setting, in bytes
584        size_t opt_rcvbuf;
585
586        // Is a FIN packet in the reassembly buffer?
587        bool got_fin:1;
588        // Timeout procedure
589        bool fast_timeout:1;
590
591        // max receive window for other end, in bytes
592        size_t max_window_user;
593        // 0 = original uTP header, 1 = second revision
594        byte version;
595        CONN_STATE state;
596        // TickCount when we last decayed window (wraps)
597        int32 last_rwin_decay;
598
599        // the sequence number of the FIN packet. This field is only set
600        // when we have received a FIN, and the flag field has the FIN flag set.
601        // it is used to know when it is safe to destroy the socket, we must have
602        // received all packets up to this sequence number first.
603        uint16 eof_pkt;
604
605        // All sequence numbers up to including this have been properly received
606        // by us
607        uint16 ack_nr;
608        // This is the sequence number for the next packet to be sent.
609        uint16 seq_nr;
610
611        uint16 timeout_seq_nr;
612
613        // This is the sequence number of the next packet we're allowed to
614        // do a fast resend with. This makes sure we only do a fast-resend
615        // once per packet. We can resend the packet with this sequence number
616        // or any later packet (with a higher sequence number).
617        uint16 fast_resend_seq_nr;
618
619        uint32 reply_micro;
620
621        // the time when we need to send another ack. If there's
622        // nothing to ack, this is a very large number
623        uint32 ack_time;
624
625        uint32 last_got_packet;
626        uint32 last_sent_packet;
627        uint32 last_measured_delay;
628        uint32 last_maxed_out_window;
629
630        // the last time we added send quota to the connection
631        // when adding send quota, this is subtracted from the
632        // current time multiplied by max_window / rtt
633        // which is the current allowed send rate.
634        int32 last_send_quota;
635
636        // the number of bytes we are allowed to send on
637        // this connection. If this is more than one packet
638        // size when we run out of data to send, it is clamped
639        // to the packet size
640        // this value is multiplied by 100 in order to get
641        // higher accuracy when dealing with low rates
642        int32 send_quota;
643
644        SendToProc *send_to_proc;
645        void *send_to_userdata;
646        UTPFunctionTable func;
647        void *userdata;
648
649        // Round trip time
650        uint rtt;
651        // Round trip time variance
652        uint rtt_var;
653        // Round trip timeout
654        uint rto;
655        DelayHist rtt_hist;
656        uint retransmit_timeout;
657        // The RTO timer will timeout here.
658        uint rto_timeout;
659        // When the window size is set to zero, start this timer. It will send a new packet every 30secs.
660        uint32 zerowindow_time;
661
662        uint32 conn_seed;
663        // Connection ID for packets I receive
664        uint32 conn_id_recv;
665        // Connection ID for packets I send
666        uint32 conn_id_send;
667        // Last rcv window we advertised, in bytes
668        size_t last_rcv_win;
669
670        DelayHist our_hist;
671        DelayHist their_hist;
672
673        // extension bytes from SYN packet
674        byte extensions[8];
675
676        SizableCircularBuffer inbuf, outbuf;
677
678#ifdef _DEBUG
679        // Public stats, returned by UTP_GetStats().  See utp.h
680        UTPStats _stats;
681#endif // _DEBUG
682
683        // Calculates the current receive window
684        size_t get_rcv_window() const
685        {
686                // If we don't have a connection (such as during connection
687                // establishment, always act as if we have an empty buffer).
688                if (!userdata) return opt_rcvbuf;
689
690                // Trim window down according to what's already in buffer.
691                const size_t numbuf = func.get_rb_size(userdata);
692                assert((int)numbuf >= 0);
693                return opt_rcvbuf > numbuf ? opt_rcvbuf - numbuf : 0;
694        }
695
696        // Test if we're ready to decay max_window
697        // XXX this breaks when spaced by > INT_MAX/2, which is 49
698        // days; the failure mode in that case is we do an extra decay
699        // or fail to do one when we really shouldn't.
700        bool can_decay_win(int32 msec) const
701        {
702                return msec - last_rwin_decay >= MAX_WINDOW_DECAY;
703        }
704
705        // If we can, decay max window, returns true if we actually did so
706        void maybe_decay_win()
707        {
708                if (can_decay_win(g_current_ms)) {
709                        // TCP uses 0.5
710                        max_window = (size_t)(max_window * .5);
711                        last_rwin_decay = g_current_ms;
712                        if (max_window < MIN_WINDOW_SIZE)
713                                max_window = MIN_WINDOW_SIZE;
714                }
715        }
716
717        size_t get_header_size() const
718        {
719                return (version ? sizeof(PacketFormatV1) : sizeof(PacketFormat));
720        }
721
722        size_t get_header_extensions_size() const
723        {
724                return (version ? sizeof(PacketFormatExtensionsV1) : sizeof(PacketFormatExtensions));
725        }
726
727        void sent_ack()
728        {
729                ack_time = g_current_ms + 0x70000000;
730                bytes_since_ack = 0;
731        }
732
733        size_t get_udp_mtu() const
734        {
735                socklen_t len;
736                SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
737                return UTP_GetUDPMTU((const struct sockaddr *)&sa, len);
738        }
739
740        size_t get_udp_overhead() const
741        {
742                socklen_t len;
743                SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
744                return UTP_GetUDPOverhead((const struct sockaddr *)&sa, len);
745        }
746
747        uint64 get_global_utp_bytes_sent() const
748        {
749                socklen_t len;
750                SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
751                return UTP_GetGlobalUTPBytesSent((const struct sockaddr *)&sa, len);
752        }
753
754        size_t get_overhead() const
755        {
756                return get_udp_overhead() + get_header_size();
757        }
758
759        void send_data(PacketFormat* b, size_t length, bandwidth_type_t type);
760
761        void send_ack(bool synack = false);
762
763        void send_keep_alive();
764
765        static void send_rst(SendToProc *send_to_proc, void *send_to_userdata,
766                                                 const PackedSockAddr &addr, uint32 conn_id_send,
767                                                 uint16 ack_nr, uint16 seq_nr, byte version);
768
769        void send_packet(OutgoingPacket *pkt);
770
771        bool is_writable(size_t to_write);
772
773        bool flush_packets();
774
775        void write_outgoing_packet(size_t payload, uint flags);
776
777        void update_send_quota();
778
779#ifdef _DEBUG
780        void check_invariant();
781#endif
782
783        void check_timeouts();
784
785        int ack_packet(uint16 seq);
786
787        size_t selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt);
788
789        void selective_ack(uint base, const byte *mask, byte len);
790
791        void apply_ledbat_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt);
792
793        size_t get_packet_size();
794};
795
796Array<RST_Info> g_rst_info;
797Array<UTPSocket*> g_utp_sockets;
798
799static void UTP_RegisterSentPacket(size_t length) {
800        if (length <= PACKET_SIZE_MID) {
801                if (length <= PACKET_SIZE_EMPTY) {
802                        _global_stats._nraw_send[PACKET_SIZE_EMPTY_BUCKET]++;
803                } else if (length <= PACKET_SIZE_SMALL) {
804                        _global_stats._nraw_send[PACKET_SIZE_SMALL_BUCKET]++;
805                } else
806                        _global_stats._nraw_send[PACKET_SIZE_MID_BUCKET]++;
807        } else {
808                if (length <= PACKET_SIZE_BIG) {
809                        _global_stats._nraw_send[PACKET_SIZE_BIG_BUCKET]++;
810                } else
811                        _global_stats._nraw_send[PACKET_SIZE_HUGE_BUCKET]++;
812        }
813}
814
815void send_to_addr(SendToProc *send_to_proc, void *send_to_userdata, const byte *p, size_t len, const PackedSockAddr &addr)
816{
817        socklen_t tolen;
818        SOCKADDR_STORAGE to = addr.get_sockaddr_storage(&tolen);
819        UTP_RegisterSentPacket(len);
820        send_to_proc(send_to_userdata, p, len, (const struct sockaddr *)&to, tolen);
821}
822
823void UTPSocket::send_data(PacketFormat* b, size_t length, bandwidth_type_t type)
824{
825        // time stamp this packet with local time, the stamp goes into
826        // the header of every packet at the 8th byte for 8 bytes :
827        // two integers, check packet.h for more
828        uint64 time = UTP_GetMicroseconds();
829
830        PacketFormatV1* b1 = (PacketFormatV1*)b;
831        if (version == 0) {
832                b->tv_sec = (uint32)(time / 1000000);
833                b->tv_usec = time % 1000000;
834                b->reply_micro = reply_micro;
835        } else {
836                b1->tv_usec = (uint32)time;
837                b1->reply_micro = reply_micro;
838        }
839
840        last_sent_packet = g_current_ms;
841
842#ifdef _DEBUG
843        _stats._nbytes_xmit += length;
844        ++_stats._nxmit;
845#endif
846        if (userdata) {
847                size_t n;
848                if (type == payload_bandwidth) {
849                        // if this packet carries payload, just
850                        // count the header as overhead
851                        type = header_overhead;
852                        n = get_overhead();
853                } else {
854                        n = length + get_udp_overhead();
855                }
856                func.on_overhead(userdata, true, n, type);
857        }
858#if g_log_utp_verbose
859        int flags = version == 0 ? b->flags : b1->type();
860        uint16 seq_nr = version == 0 ? b->seq_nr : b1->seq_nr;
861        uint16 ack_nr = version == 0 ? b->ack_nr : b1->ack_nr;
862        LOG_UTPV("0x%08x: send %s len:%u id:%u timestamp:"I64u" reply_micro:%u flags:%s seq_nr:%u ack_nr:%u",
863                         this, addrfmt(addr, addrbuf), (uint)length, conn_id_send, time, reply_micro, flagnames[flags],
864                         seq_nr, ack_nr);
865#endif
866        send_to_addr(send_to_proc, send_to_userdata, (const byte*)b, length, addr);
867}
868
869void UTPSocket::send_ack(bool synack)
870{
871        PacketFormatExtensions pfe;
872        zeromem(&pfe);
873        PacketFormatExtensionsV1& pfe1 = (PacketFormatExtensionsV1&)pfe;
874        PacketFormatAck& pfa = (PacketFormatAck&)pfe1;
875        PacketFormatAckV1& pfa1 = (PacketFormatAckV1&)pfe1;
876
877        size_t len;
878        last_rcv_win = get_rcv_window();
879        if (version == 0) {
880                pfa.pf.connid = conn_id_send;
881                pfa.pf.ack_nr = (uint16)ack_nr;
882                pfa.pf.seq_nr = (uint16)seq_nr;
883                pfa.pf.flags = ST_STATE;
884                pfa.pf.ext = 0;
885                pfa.pf.windowsize = (byte)DIV_ROUND_UP(last_rcv_win, PACKET_SIZE);
886                len = sizeof(PacketFormat);
887        } else {
888                pfa1.pf.set_version(1);
889                pfa1.pf.set_type(ST_STATE);
890                pfa1.pf.ext = 0;
891                pfa1.pf.connid = conn_id_send;
892                pfa1.pf.ack_nr = ack_nr;
893                pfa1.pf.seq_nr = seq_nr;
894                pfa1.pf.windowsize = (uint32)last_rcv_win;
895                len = sizeof(PacketFormatV1);
896        }
897
898        // we never need to send EACK for connections
899        // that are shutting down
900        if (reorder_count != 0 && state < CS_GOT_FIN) {
901                // if reorder count > 0, send an EACK.
902                // reorder count should always be 0
903                // for synacks, so this should not be
904                // as synack
905                assert(!synack);
906                if (version == 0) {
907                        pfa.pf.ext = 1;
908                        pfa.ext_next = 0;
909                        pfa.ext_len = 4;
910                } else {
911                        pfa1.pf.ext = 1;
912                        pfa1.ext_next = 0;
913                        pfa1.ext_len = 4;
914                }
915                uint m = 0;
916
917                // reorder count should only be non-zero
918                // if the packet ack_nr + 1 has not yet
919                // been received
920                assert(inbuf.get(ack_nr + 1) == NULL);
921                size_t window = min<size_t>(14+16, inbuf.size());
922                // Generate bit mask of segments received.
923                for (size_t i = 0; i < window; i++) {
924                        if (inbuf.get(ack_nr + i + 2) != NULL) {
925                                m |= 1 << i;
926                                LOG_UTPV("0x%08x: EACK packet [%u]", this, ack_nr + i + 2);
927                        }
928                }
929                if (version == 0) {
930                        pfa.acks[0] = (byte)m;
931                        pfa.acks[1] = (byte)(m >> 8);
932                        pfa.acks[2] = (byte)(m >> 16);
933                        pfa.acks[3] = (byte)(m >> 24);
934                } else {
935                        pfa1.acks[0] = (byte)m;
936                        pfa1.acks[1] = (byte)(m >> 8);
937                        pfa1.acks[2] = (byte)(m >> 16);
938                        pfa1.acks[3] = (byte)(m >> 24);
939                }
940                len += 4 + 2;
941                LOG_UTPV("0x%08x: Sending EACK %u [%u] bits:[%032b]", this, ack_nr, conn_id_send, m);
942        } else if (synack) {
943                // we only send "extensions" in response to SYN
944                // and the reorder count is 0 in that state
945
946                LOG_UTPV("0x%08x: Sending ACK %u [%u] with extension bits", this, ack_nr, conn_id_send);
947                if (version == 0) {
948                        pfe.pf.ext = 2;
949                        pfe.ext_next = 0;
950                        pfe.ext_len = 8;
951                        memset(pfe.extensions, 0, 8);
952                } else {
953                        pfe1.pf.ext = 2;
954                        pfe1.ext_next = 0;
955                        pfe1.ext_len = 8;
956                        memset(pfe1.extensions, 0, 8);
957                }
958                len += 8 + 2;
959        } else {
960                LOG_UTPV("0x%08x: Sending ACK %u [%u]", this, ack_nr, conn_id_send);
961        }
962
963        sent_ack();
964        send_data((PacketFormat*)&pfe, len, ack_overhead);
965}
966
967void UTPSocket::send_keep_alive()
968{
969        ack_nr--;
970        LOG_UTPV("0x%08x: Sending KeepAlive ACK %u [%u]", this, ack_nr, conn_id_send);
971        send_ack();
972        ack_nr++;
973}
974
975void UTPSocket::send_rst(SendToProc *send_to_proc, void *send_to_userdata,
976                                                 const PackedSockAddr &addr, uint32 conn_id_send, uint16 ack_nr, uint16 seq_nr, byte version)
977{
978        PacketFormat pf;
979        zeromem(&pf);
980        PacketFormatV1& pf1 = (PacketFormatV1&)pf;
981
982        size_t len;
983        if (version == 0) {
984                pf.connid = conn_id_send;
985                pf.ack_nr = ack_nr;
986                pf.seq_nr = seq_nr;
987                pf.flags = ST_RESET;
988                pf.ext = 0;
989                pf.windowsize = 0;
990                len = sizeof(PacketFormat);
991        } else {
992                pf1.set_version(1);
993                pf1.set_type(ST_RESET);
994                pf1.ext = 0;
995                pf1.connid = conn_id_send;
996                pf1.ack_nr = ack_nr;
997                pf1.seq_nr = seq_nr;
998                pf1.windowsize = 0;
999                len = sizeof(PacketFormatV1);
1000        }
1001
1002        LOG_UTPV("%s: Sending RST id:%u seq_nr:%u ack_nr:%u", addrfmt(addr, addrbuf), conn_id_send, seq_nr, ack_nr);
1003        LOG_UTPV("send %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, conn_id_send);
1004        send_to_addr(send_to_proc, send_to_userdata, (const byte*)&pf1, len, addr);
1005}
1006
1007void UTPSocket::send_packet(OutgoingPacket *pkt)
1008{
1009        // only count against the quota the first time we
1010        // send the packet. Don't enforce quota when closing
1011        // a socket. Only enforce the quota when we're sending
1012        // at slow rates (max window < packet size)
1013        size_t max_send = min(max_window, opt_sndbuf, max_window_user);
1014
1015        if (pkt->transmissions == 0 || pkt->need_resend) {
1016                cur_window += pkt->payload;
1017        }
1018
1019        size_t packet_size = get_packet_size();
1020        if (pkt->transmissions == 0 && max_send < packet_size) {
1021                assert(state == CS_FIN_SENT ||
1022                           (int32)pkt->payload <= send_quota / 100);
1023                send_quota = send_quota - (int32)(pkt->payload * 100);
1024        }
1025
1026        pkt->need_resend = false;
1027
1028        PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
1029        PacketFormat* p = (PacketFormat*)pkt->data;
1030        if (version == 0) {
1031                p->ack_nr = ack_nr;
1032        } else {
1033                p1->ack_nr = ack_nr;
1034        }
1035        pkt->time_sent = UTP_GetMicroseconds();
1036        pkt->transmissions++;
1037        sent_ack();
1038        send_data((PacketFormat*)pkt->data, pkt->length,
1039                (state == CS_SYN_SENT) ? connect_overhead
1040                : (pkt->transmissions == 1) ? payload_bandwidth
1041                : retransmit_overhead);
1042}
1043
1044bool UTPSocket::is_writable(size_t to_write)
1045{
1046        // return true if it's OK to stuff another packet into the
1047        // outgoing queue. Since we may be using packet pacing, we
1048        // might not actually send the packet right away to affect the
1049        // cur_window. The only thing that happens when we add another
1050        // packet is that cur_window_packets is increased.
1051        size_t max_send = min(max_window, opt_sndbuf, max_window_user);
1052
1053        size_t packet_size = get_packet_size();
1054
1055        if (cur_window + packet_size >= max_window)
1056                last_maxed_out_window = g_current_ms;
1057
1058        // if we don't have enough quota, we can't write regardless
1059        if (USE_PACKET_PACING) {
1060                if (send_quota / 100 < (int32)to_write) return false;
1061        }
1062
1063        // subtract one to save space for the FIN packet
1064        if (cur_window_packets >= OUTGOING_BUFFER_MAX_SIZE - 1) return false;
1065
1066        // if sending another packet would not make the window exceed
1067        // the max_window, we can write
1068        if (cur_window + packet_size <= max_send) return true;
1069
1070        // if the window size is less than a packet, and we have enough
1071        // quota to send a packet, we can write, even though it would
1072        // make the window exceed the max size
1073        // the last condition is needed to not put too many packets
1074        // in the send buffer. cur_window isn't updated until we flush
1075        // the send buffer, so we need to take the number of packets
1076        // into account
1077        if (USE_PACKET_PACING) {
1078                if (max_window < to_write &&
1079                        cur_window < max_window &&
1080                        cur_window_packets == 0) {
1081                        return true;
1082                }
1083        }
1084
1085        return false;
1086}
1087
1088bool UTPSocket::flush_packets()
1089{
1090        size_t packet_size = get_packet_size();
1091
1092        // send packets that are waiting on the pacer to be sent
1093        // i has to be an unsigned 16 bit counter to wrap correctly
1094        // signed types are not guaranteed to wrap the way you expect
1095        for (uint16 i = seq_nr - cur_window_packets; i != seq_nr; ++i) {
1096                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(i);
1097                if (pkt == 0 || (pkt->transmissions > 0 && pkt->need_resend == false)) continue;
1098                // have we run out of quota?
1099                if (!is_writable(pkt->payload)) {
1100                        return true;
1101                }
1102
1103                // Nagle check
1104                // don't send the last packet if we have one packet in-flight
1105                // and the current packet is still smaller than packet_size.
1106                if (i != ((seq_nr - 1) & ACK_NR_MASK) ||
1107                        cur_window_packets == 1 ||
1108                        pkt->payload >= packet_size) {
1109                        send_packet(pkt);
1110
1111                        // No need to send another ack if there is nothing to reorder.
1112                        if (reorder_count == 0) {
1113                                sent_ack();
1114                        }
1115                }
1116        }
1117        return false;
1118}
1119
1120void UTPSocket::write_outgoing_packet(size_t payload, uint flags)
1121{
1122        // Setup initial timeout timer
1123        if (cur_window_packets == 0) {
1124                retransmit_timeout = rto;
1125                rto_timeout = g_current_ms + retransmit_timeout;
1126                assert(cur_window == 0);
1127        }
1128
1129        size_t packet_size = get_packet_size();
1130        do {
1131                assert(cur_window_packets < OUTGOING_BUFFER_MAX_SIZE);
1132                assert(flags == ST_DATA || flags == ST_FIN);
1133
1134                size_t added = 0;
1135
1136                OutgoingPacket *pkt = NULL;
1137               
1138                if (cur_window_packets > 0) {
1139                        pkt = (OutgoingPacket*)outbuf.get(seq_nr - 1);
1140                }
1141
1142                const size_t header_size = get_header_size();
1143                bool append = true;
1144
1145                // if there's any room left in the last packet in the window
1146                // and it hasn't been sent yet, fill that frame first
1147                if (payload && pkt && !pkt->transmissions && pkt->payload < packet_size) {
1148                        // Use the previous unsent packet
1149                        added = min(payload + pkt->payload, max<size_t>(packet_size, pkt->payload)) - pkt->payload;
1150                        pkt = (OutgoingPacket*)realloc(pkt,
1151                                                                                   (sizeof(OutgoingPacket) - 1) +
1152                                                                                   header_size +
1153                                                                                   pkt->payload + added);
1154                        outbuf.put(seq_nr - 1, pkt);
1155                        append = false;
1156                        assert(!pkt->need_resend);
1157                } else {
1158                        // Create the packet to send.
1159                        added = payload;
1160                        pkt = (OutgoingPacket*)malloc((sizeof(OutgoingPacket) - 1) +
1161                                                                                  header_size +
1162                                                                                  added);
1163                        pkt->payload = 0;
1164                        pkt->transmissions = 0;
1165                        pkt->need_resend = false;
1166                }
1167
1168                if (added) {
1169                        // Fill it with data from the upper layer.
1170                        func.on_write(userdata, pkt->data + header_size + pkt->payload, added);
1171                }
1172                pkt->payload += added;
1173                pkt->length = header_size + pkt->payload;
1174
1175                last_rcv_win = get_rcv_window();
1176
1177                PacketFormat* p = (PacketFormat*)pkt->data;
1178                PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
1179                if (version == 0) {
1180                        p->connid = conn_id_send;
1181                        p->ext = 0;
1182                        p->windowsize = (byte)DIV_ROUND_UP(last_rcv_win, PACKET_SIZE);
1183                        p->ack_nr = ack_nr;
1184                        p->flags = flags;
1185                } else {
1186                        p1->set_version(1);
1187                        p1->set_type(flags);
1188                        p1->ext = 0;
1189                        p1->connid = conn_id_send;
1190                        p1->windowsize = (uint32)last_rcv_win;
1191                        p1->ack_nr = ack_nr;
1192                }
1193
1194                if (append) {
1195                        // Remember the message in the outgoing queue.
1196                        outbuf.ensure_size(seq_nr, cur_window_packets);
1197                        outbuf.put(seq_nr, pkt);
1198                        if (version == 0) p->seq_nr = seq_nr;
1199                        else p1->seq_nr = seq_nr;
1200                        seq_nr++;
1201                        cur_window_packets++;
1202                }
1203
1204                payload -= added;
1205
1206        } while (payload);
1207
1208        flush_packets();
1209}
1210
1211void UTPSocket::update_send_quota()
1212{
1213        int dt = g_current_ms - last_send_quota;
1214        if (dt == 0) return;
1215        last_send_quota = g_current_ms;
1216        size_t add = max_window * dt * 100 / (rtt_hist.delay_base?rtt_hist.delay_base:50);
1217        if (add > max_window * 100 && add > MAX_CWND_INCREASE_BYTES_PER_RTT * 100) add = max_window;
1218        send_quota += (int32)add;
1219//      LOG_UTPV("0x%08x: UTPSocket::update_send_quota dt:%d rtt:%u max_window:%u quota:%d",
1220//                       this, dt, rtt, (uint)max_window, send_quota / 100);
1221}
1222
1223#ifdef _DEBUG
1224void UTPSocket::check_invariant()
1225{
1226        if (reorder_count > 0) {
1227                assert(inbuf.get(ack_nr + 1) == NULL);
1228        }
1229
1230        size_t outstanding_bytes = 0;
1231        for (int i = 0; i < cur_window_packets; ++i) {
1232                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
1233                if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
1234                outstanding_bytes += pkt->payload;
1235        }
1236        assert(outstanding_bytes == cur_window);
1237}
1238#endif
1239
1240void UTPSocket::check_timeouts()
1241{
1242#ifdef _DEBUG
1243        check_invariant();
1244#endif
1245
1246        // this invariant should always be true
1247        assert(cur_window_packets == 0 || outbuf.get(seq_nr - cur_window_packets));
1248
1249        LOG_UTPV("0x%08x: CheckTimeouts timeout:%d max_window:%u cur_window:%u quota:%d "
1250                         "state:%s cur_window_packets:%u bytes_since_ack:%u ack_time:%d",
1251                         this, (int)(rto_timeout - g_current_ms), (uint)max_window, (uint)cur_window,
1252                         send_quota / 100, statenames[state], cur_window_packets,
1253                         (uint)bytes_since_ack, (int)(g_current_ms - ack_time));
1254
1255        update_send_quota();
1256        flush_packets();
1257
1258
1259        if (USE_PACKET_PACING) {
1260                // In case the new send quota made it possible to send another packet
1261                // Mark the socket as writable. If we don't use pacing, the send
1262                // quota does not affect if the socket is writeable
1263                // if we don't use packet pacing, the writable event is triggered
1264                // whenever the cur_window falls below the max_window, so we don't
1265                // need this check then
1266                if (state == CS_CONNECTED_FULL && is_writable(get_packet_size())) {
1267                        state = CS_CONNECTED;
1268                        LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u",
1269                                         this, (uint)max_window, (uint)cur_window, send_quota / 100, (uint)get_packet_size());
1270                        func.on_state(userdata, UTP_STATE_WRITABLE);
1271                }
1272        }
1273
1274        switch (state) {
1275        case CS_SYN_SENT:
1276        case CS_CONNECTED_FULL:
1277        case CS_CONNECTED:
1278        case CS_FIN_SENT: {
1279
1280                // Reset max window...
1281                if ((int)(g_current_ms - zerowindow_time) >= 0 && max_window_user == 0) {
1282                        max_window_user = PACKET_SIZE;
1283                }
1284
1285                if ((int)(g_current_ms - rto_timeout) >= 0 &&
1286                        (!(USE_PACKET_PACING) || cur_window_packets > 0) &&
1287                        rto_timeout > 0) {
1288
1289                        /*
1290                        OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
1291                       
1292                        // If there were a lot of retransmissions, force recomputation of round trip time
1293                        if (pkt->transmissions >= 4)
1294                                rtt = 0;
1295                        */
1296
1297                        // Increase RTO
1298                        const uint new_timeout = retransmit_timeout * 2;
1299                        if (new_timeout >= 30000 || (state == CS_SYN_SENT && new_timeout > 6000)) {
1300                                // more than 30 seconds with no reply. kill it.
1301                                // if we haven't even connected yet, give up sooner. 6 seconds
1302                                // means 2 tries at the following timeouts: 3, 6 seconds
1303                                if (state == CS_FIN_SENT)
1304                                        state = CS_DESTROY;
1305                                else
1306                                        state = CS_RESET;
1307                                func.on_error(userdata, ETIMEDOUT);
1308                                goto getout;
1309                        }
1310
1311                        retransmit_timeout = new_timeout;
1312                        rto_timeout = g_current_ms + new_timeout;
1313
1314                        // On Timeout
1315                        duplicate_ack = 0;
1316
1317                        // rate = min_rate
1318                        max_window = get_packet_size();
1319                        send_quota = max<int32>((int32)max_window * 100, send_quota);
1320
1321                        // every packet should be considered lost
1322                        for (int i = 0; i < cur_window_packets; ++i) {
1323                                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
1324                                if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
1325                                pkt->need_resend = true;
1326                                assert(cur_window >= pkt->payload);
1327                                cur_window -= pkt->payload;
1328                        }
1329
1330                        // used in parse_log.py
1331                        LOG_UTP("0x%08x: Packet timeout. Resend. seq_nr:%u. timeout:%u max_window:%u",
1332                                        this, seq_nr - cur_window_packets, retransmit_timeout, (uint)max_window);
1333
1334                        fast_timeout = true;
1335                        timeout_seq_nr = seq_nr;
1336
1337                        if (cur_window_packets > 0) {
1338                                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
1339                                assert(pkt);
1340                                send_quota = max<int32>((int32)pkt->length * 100, send_quota);
1341
1342                                // Re-send the packet.
1343                                send_packet(pkt);
1344                        }
1345                }
1346
1347                // Mark the socket as writable
1348                if (state == CS_CONNECTED_FULL && is_writable(get_packet_size())) {
1349                        state = CS_CONNECTED;
1350                        LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u",
1351                                         this, (uint)max_window, (uint)cur_window, send_quota / 100, (uint)get_packet_size());
1352                        func.on_state(userdata, UTP_STATE_WRITABLE);
1353                }
1354
1355                if (state >= CS_CONNECTED && state <= CS_FIN_SENT) {
1356                        // Send acknowledgment packets periodically, or when the threshold is reached
1357                        if (bytes_since_ack > DELAYED_ACK_BYTE_THRESHOLD ||
1358                                (int)(g_current_ms - ack_time) >= 0) {
1359                                send_ack();
1360                        }
1361
1362                        if ((int)(g_current_ms - last_sent_packet) >= KEEPALIVE_INTERVAL) {
1363                                send_keep_alive();
1364                        }
1365                }
1366
1367                break;
1368        }
1369
1370        // Close?
1371        case CS_GOT_FIN:
1372        case CS_DESTROY_DELAY:
1373                if ((int)(g_current_ms - rto_timeout) >= 0) {
1374                        state = (state == CS_DESTROY_DELAY) ? CS_DESTROY : CS_RESET;
1375                        if (cur_window_packets > 0 && userdata) {
1376                                func.on_error(userdata, ECONNRESET);
1377                        }
1378                }
1379                break;
1380        // prevent warning
1381        case CS_IDLE:
1382        case CS_RESET:
1383        case CS_DESTROY:
1384                break;
1385        }
1386
1387        getout:
1388
1389        // make sure we don't accumulate quota when we don't have
1390        // anything to send
1391        int32 limit = max<int32>((int32)max_window / 2, 5 * (int32)get_packet_size()) * 100;
1392        if (send_quota > limit) send_quota = limit;
1393}
1394
1395// returns:
1396// 0: the packet was acked.
1397// 1: it means that the packet had already been acked
1398// 2: the packet has not been sent yet
1399int UTPSocket::ack_packet(uint16 seq)
1400{
1401        OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq);
1402
1403        // the packet has already been acked (or not sent)
1404        if (pkt == NULL) {
1405                LOG_UTPV("0x%08x: got ack for:%u (already acked, or never sent)", this, seq);
1406                return 1;
1407        }
1408
1409        // can't ack packets that haven't been sent yet!
1410        if (pkt->transmissions == 0) {
1411                LOG_UTPV("0x%08x: got ack for:%u (never sent, pkt_size:%u need_resend:%u)",
1412                                 this, seq, (uint)pkt->payload, pkt->need_resend);
1413                return 2;
1414        }
1415
1416        LOG_UTPV("0x%08x: got ack for:%u (pkt_size:%u need_resend:%u)",
1417                         this, seq, (uint)pkt->payload, pkt->need_resend);
1418
1419        outbuf.put(seq, NULL);
1420
1421        // if we never re-sent the packet, update the RTT estimate
1422        if (pkt->transmissions == 1) {
1423                // Estimate the round trip time.
1424                const uint32 ertt = (uint32)((UTP_GetMicroseconds() - pkt->time_sent) / 1000);
1425                if (rtt == 0) {
1426                        // First round trip time sample
1427                        rtt = ertt;
1428                        rtt_var = ertt / 2;
1429                        // sanity check. rtt should never be more than 6 seconds
1430//                      assert(rtt < 6000);
1431                } else {
1432                        // Compute new round trip times
1433                        const int delta = (int)rtt - ertt;
1434                        rtt_var = rtt_var + (int)(abs(delta) - rtt_var) / 4;
1435                        rtt = rtt - rtt/8 + ertt/8;
1436                        // sanity check. rtt should never be more than 6 seconds
1437//                      assert(rtt < 6000);
1438                        rtt_hist.add_sample(ertt);
1439                }
1440                rto = max<uint>(rtt + rtt_var * 4, 500);
1441                LOG_UTPV("0x%08x: rtt:%u avg:%u var:%u rto:%u",
1442                                 this, ertt, rtt, rtt_var, rto);
1443        }
1444        retransmit_timeout = rto;
1445        rto_timeout = g_current_ms + rto;
1446        // if need_resend is set, this packet has already
1447        // been considered timed-out, and is not included in
1448        // the cur_window anymore
1449        if (!pkt->need_resend) {
1450                assert(cur_window >= pkt->payload);
1451                cur_window -= pkt->payload;
1452        }
1453        free(pkt);
1454        return 0;
1455}
1456
1457// count the number of bytes that were acked by the EACK header
1458size_t UTPSocket::selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt)
1459{
1460        if (cur_window_packets == 0) return 0;
1461
1462        size_t acked_bytes = 0;
1463        int bits = len * 8;
1464
1465        do {
1466                uint v = base + bits;
1467
1468                // ignore bits that haven't been sent yet
1469                // see comment in UTPSocket::selective_ack
1470                if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1))
1471                        continue;
1472
1473                // ignore bits that represents packets we haven't sent yet
1474                // or packets that have already been acked
1475                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
1476                if (!pkt || pkt->transmissions == 0)
1477                        continue;
1478
1479                // Count the number of segments that were successfully received past it.
1480                if (bits >= 0 && mask[bits>>3] & (1 << (bits & 7))) {
1481                        assert((int)(pkt->payload) >= 0);
1482                        acked_bytes += pkt->payload;
1483                        min_rtt = min<int64>(min_rtt, UTP_GetMicroseconds() - pkt->time_sent);
1484                        continue;
1485                }
1486        } while (--bits >= -1);
1487        return acked_bytes;
1488}
1489
1490void UTPSocket::selective_ack(uint base, const byte *mask, byte len)
1491{
1492        if (cur_window_packets == 0) return;
1493
1494        // the range is inclusive [0, 31] bits
1495        int bits = len * 8 - 1;
1496
1497        int count = 0;
1498
1499        // resends is a stack of sequence numbers we need to resend. Since we
1500        // iterate in reverse over the acked packets, at the end, the top packets
1501        // are the ones we want to resend
1502        int resends[32];
1503        int nr = 0;
1504
1505        LOG_UTPV("0x%08x: Got EACK [%032b] base:%u", this, *(uint32*)mask, base);
1506        do {
1507                // we're iterating over the bits from higher sequence numbers
1508                // to lower (kind of in reverse order, wich might not be very
1509                // intuitive)
1510                uint v = base + bits;
1511
1512                // ignore bits that haven't been sent yet
1513                // and bits that fall below the ACKed sequence number
1514                // this can happen if an EACK message gets
1515                // reordered and arrives after a packet that ACKs up past
1516                // the base for thie EACK message
1517
1518                // this is essentially the same as:
1519                // if v >= seq_nr || v <= seq_nr - cur_window_packets
1520                // but it takes wrapping into account
1521
1522                // if v == seq_nr the -1 will make it wrap. if v > seq_nr
1523                // it will also wrap (since it will fall further below 0)
1524                // and be > cur_window_packets.
1525                // if v == seq_nr - cur_window_packets, the result will be
1526                // seq_nr - (seq_nr - cur_window_packets) - 1
1527                // == seq_nr - seq_nr + cur_window_packets - 1
1528                // == cur_window_packets - 1 which will be caught by the
1529                // test. If v < seq_nr - cur_window_packets the result will grow
1530                // fall furhter outside of the cur_window_packets range.
1531
1532                // sequence number space:
1533                //
1534                //     rejected <   accepted   > rejected
1535                // <============+--------------+============>
1536                //              ^              ^
1537                //              |              |
1538                //        (seq_nr-wnd)         seq_nr
1539
1540                if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1))
1541                        continue;
1542
1543                // this counts as a duplicate ack, even though we might have
1544                // received an ack for this packet previously (in another EACK
1545                // message for instance)
1546                bool bit_set = bits >= 0 && mask[bits>>3] & (1 << (bits & 7));
1547
1548                // if this packet is acked, it counts towards the duplicate ack counter
1549                if (bit_set) count++;
1550
1551                // ignore bits that represents packets we haven't sent yet
1552                // or packets that have already been acked
1553                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
1554                if (!pkt || pkt->transmissions == 0) {
1555                        LOG_UTPV("0x%08x: skipping %u. pkt:%08x transmissions:%u %s",
1556                                         this, v, pkt, pkt?pkt->transmissions:0, pkt?"(not sent yet?)":"(already acked?)");
1557                        continue;
1558                }
1559
1560                // Count the number of segments that were successfully received past it.
1561                if (bit_set) {
1562                        // the selective ack should never ACK the packet we're waiting for to decrement cur_window_packets
1563                        assert((v & outbuf.mask) != ((seq_nr - cur_window_packets) & outbuf.mask));
1564                        ack_packet(v);
1565                        continue;
1566                }
1567
1568                // Resend segments
1569                // if count is less than our re-send limit, we haven't seen enough
1570                // acked packets in front of this one to warrant a re-send.
1571                // if count == 0, we're still going through the tail of zeroes
1572                if (((v - fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE &&
1573                        count >= DUPLICATE_ACKS_BEFORE_RESEND &&
1574                        duplicate_ack < DUPLICATE_ACKS_BEFORE_RESEND) {
1575                        resends[nr++] = v;
1576                        LOG_UTPV("0x%08x: no ack for %u", this, v);
1577                } else {
1578                        LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u",
1579                                         this, v, count, duplicate_ack, fast_resend_seq_nr);
1580                }
1581        } while (--bits >= -1);
1582
1583        if (((base - 1 - fast_resend_seq_nr) & ACK_NR_MASK) < 256 &&
1584                count >= DUPLICATE_ACKS_BEFORE_RESEND &&
1585                duplicate_ack < DUPLICATE_ACKS_BEFORE_RESEND) {
1586                // if we get enough duplicate acks to start
1587                // resending, the first packet we should resend
1588                // is base-1
1589                resends[nr++] = base - 1;
1590        } else {
1591                LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u",
1592                                 this, base - 1, count, duplicate_ack, fast_resend_seq_nr);
1593        }
1594
1595        bool back_off = false;
1596        int i = 0;
1597        while (nr > 0) {
1598                uint v = resends[--nr];
1599                // don't consider the tail of 0:es to be lost packets
1600                // only unacked packets with acked packets after should
1601                // be considered lost
1602                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
1603
1604                // this may be an old (re-ordered) packet, and some of the
1605                // packets in here may have been acked already. In which
1606                // case they will not be in the send queue anymore
1607                if (!pkt) continue;
1608
1609                // used in parse_log.py
1610                LOG_UTP("0x%08x: Packet %u lost. Resending", this, v);
1611
1612                // On Loss
1613                back_off = true;
1614#ifdef _DEBUG
1615                ++_stats._rexmit;
1616#endif
1617                send_packet(pkt);
1618                fast_resend_seq_nr = v + 1;
1619
1620                // Re-send max 4 packets.
1621                if (++i >= 4) break;
1622        }
1623
1624        if (back_off)
1625                maybe_decay_win();
1626
1627        duplicate_ack = count;
1628}
1629
1630void UTPSocket::apply_ledbat_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt)
1631{
1632        // the delay can never be greater than the rtt. The min_rtt
1633        // variable is the RTT in microseconds
1634       
1635        assert(min_rtt >= 0);
1636        int32 our_delay = min<uint32>(our_hist.get_value(), uint32(min_rtt));
1637        assert(our_delay != INT_MAX);
1638        assert(our_delay >= 0);
1639
1640        SOCKADDR_STORAGE sa = addr.get_sockaddr_storage();
1641        UTP_DelaySample((sockaddr*)&sa, our_delay / 1000);
1642
1643        // This test the connection under heavy load from foreground
1644        // traffic. Pretend that our delays are very high to force the
1645        // connection to use sub-packet size window sizes
1646        //our_delay *= 4;
1647
1648        // target is microseconds
1649        int target = CCONTROL_TARGET;
1650        if (target <= 0) target = 100000;
1651
1652        double off_target = target - our_delay;
1653
1654        // this is the same as:
1655        //
1656        //    (min(off_target, target) / target) * (bytes_acked / max_window) * MAX_CWND_INCREASE_BYTES_PER_RTT
1657        //
1658        // so, it's scaling the max increase by the fraction of the window this ack represents, and the fraction
1659        // of the target delay the current delay represents.
1660        // The min() around off_target protects against crazy values of our_delay, which may happen when th
1661        // timestamps wraps, or by just having a malicious peer sending garbage. This caps the increase
1662        // of the window size to MAX_CWND_INCREASE_BYTES_PER_RTT per rtt.
1663        // as for large negative numbers, this direction is already capped at the min packet size further down
1664        // the min around the bytes_acked protects against the case where the window size was recently
1665        // shrunk and the number of acked bytes exceeds that. This is considered no more than one full
1666        // window, in order to keep the gain within sane boundries.
1667
1668        assert(bytes_acked > 0);
1669        double window_factor = (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked);
1670        double delay_factor = off_target / target;
1671        double scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT * window_factor * delay_factor;
1672
1673        // since MAX_CWND_INCREASE_BYTES_PER_RTT is a cap on how much the window size (max_window)
1674        // may increase per RTT, we may not increase the window size more than that proportional
1675        // to the number of bytes that were acked, so that once one window has been acked (one rtt)
1676        // the increase limit is not exceeded
1677        // the +1. is to allow for floating point imprecision
1678        assert(scaled_gain <= 1. + MAX_CWND_INCREASE_BYTES_PER_RTT * (int)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked));
1679
1680        if (scaled_gain > 0 && g_current_ms - last_maxed_out_window > 300) {
1681                // if it was more than 300 milliseconds since we tried to send a packet
1682                // and stopped because we hit the max window, we're most likely rate
1683                // limited (which prevents us from ever hitting the window size)
1684                // if this is the case, we cannot let the max_window grow indefinitely
1685                scaled_gain = 0;
1686        }
1687
1688        if (scaled_gain + max_window < MIN_WINDOW_SIZE) {
1689                max_window = MIN_WINDOW_SIZE;
1690        } else {
1691                max_window = (size_t)(max_window + scaled_gain);
1692        }
1693
1694        // make sure that the congestion window is below max
1695        // make sure that we don't shrink our window too small
1696        max_window = clamp<size_t>(max_window, MIN_WINDOW_SIZE, opt_sndbuf);
1697
1698        // used in parse_log.py
1699        LOG_UTP("0x%08x: actual_delay:%u our_delay:%d their_delay:%u off_target:%d max_window:%u "
1700                        "delay_base:%u delay_sum:%d target_delay:%d acked_bytes:%u cur_window:%u "
1701                        "scaled_gain:%f rtt:%u rate:%u quota:%d wnduser:%u rto:%u timeout:%d get_microseconds:"I64u" "
1702                        "cur_window_packets:%u packet_size:%u their_delay_base:%u their_actual_delay:%u",
1703                        this, actual_delay, our_delay / 1000, their_hist.get_value() / 1000,
1704                        (int)off_target / 1000, (uint)(max_window),  our_hist.delay_base,
1705                        (our_delay + their_hist.get_value()) / 1000, target / 1000, (uint)bytes_acked,
1706                        (uint)(cur_window - bytes_acked), (float)(scaled_gain), rtt,
1707                        (uint)(max_window * 1000 / (rtt_hist.delay_base?rtt_hist.delay_base:50)),
1708                        send_quota / 100, (uint)max_window_user, rto, (int)(rto_timeout - g_current_ms),
1709                        UTP_GetMicroseconds(), cur_window_packets, (uint)get_packet_size(),
1710                        their_hist.delay_base, their_hist.delay_base + their_hist.get_value());
1711}
1712
1713static void UTP_RegisterRecvPacket(UTPSocket *conn, size_t len)
1714{
1715#ifdef _DEBUG
1716        ++conn->_stats._nrecv;
1717        conn->_stats._nbytes_recv += len;
1718#endif
1719
1720        if (len <= PACKET_SIZE_MID) {
1721                if (len <= PACKET_SIZE_EMPTY) {
1722                        _global_stats._nraw_recv[PACKET_SIZE_EMPTY_BUCKET]++;
1723                } else if (len <= PACKET_SIZE_SMALL) {
1724                        _global_stats._nraw_recv[PACKET_SIZE_SMALL_BUCKET]++;
1725                } else 
1726                        _global_stats._nraw_recv[PACKET_SIZE_MID_BUCKET]++;
1727        } else {
1728                if (len <= PACKET_SIZE_BIG) {
1729                        _global_stats._nraw_recv[PACKET_SIZE_BIG_BUCKET]++;
1730                } else 
1731                        _global_stats._nraw_recv[PACKET_SIZE_HUGE_BUCKET]++;
1732        }
1733}
1734
1735// returns the max number of bytes of payload the uTP
1736// connection is allowed to send
1737size_t UTPSocket::get_packet_size()
1738{
1739        int header_size = version == 1
1740                ? sizeof(PacketFormatV1)
1741                : sizeof(PacketFormat);
1742
1743        size_t mtu = get_udp_mtu();
1744
1745        if (DYNAMIC_PACKET_SIZE_ENABLED) {
1746                SOCKADDR_STORAGE sa = addr.get_sockaddr_storage();
1747                size_t max_packet_size = UTP_GetPacketSize((sockaddr*)&sa);
1748                return min(mtu - header_size, max_packet_size);
1749        }
1750        else
1751        {
1752                return mtu - header_size;
1753        }
1754}
1755
1756// Process an incoming packet
1757// syn is true if this is the first packet received. It will cut off parsing
1758// as soon as the header is done
1759size_t UTP_ProcessIncoming(UTPSocket *conn, const byte *packet, size_t len, bool syn = false)
1760{
1761        UTP_RegisterRecvPacket(conn, len);
1762
1763        g_current_ms = UTP_GetMilliseconds();
1764
1765        conn->update_send_quota();
1766
1767        const PacketFormat *pf = (PacketFormat*)packet;
1768        const PacketFormatV1 *pf1 = (PacketFormatV1*)packet;
1769        const byte *packet_end = packet + len;
1770
1771        uint16 pk_seq_nr;
1772        uint16 pk_ack_nr;
1773        uint8 pk_flags;
1774        if (conn->version == 0) {
1775                pk_seq_nr = pf->seq_nr;
1776                pk_ack_nr = pf->ack_nr;
1777                pk_flags = pf->flags;
1778        } else {
1779                pk_seq_nr = pf1->seq_nr;
1780                pk_ack_nr = pf1->ack_nr;
1781                pk_flags = pf1->type();
1782        }
1783
1784        if (pk_flags >= ST_NUM_STATES) return 0;
1785
1786        LOG_UTPV("0x%08x: Got %s. seq_nr:%u ack_nr:%u state:%s version:%u timestamp:"I64u" reply_micro:%u",
1787                         conn, flagnames[pk_flags], pk_seq_nr, pk_ack_nr, statenames[conn->state], conn->version,
1788                         conn->version == 0?(uint64)(pf->tv_sec) * 1000000 + pf->tv_usec:uint64(pf1->tv_usec),
1789                         conn->version == 0?(uint32)(pf->reply_micro):(uint32)(pf1->reply_micro));
1790
1791        // mark receipt time
1792        uint64 time = UTP_GetMicroseconds();
1793
1794        // RSTs are handled earlier, since the connid matches the send id not the recv id
1795        assert(pk_flags != ST_RESET);
1796
1797        // TODO: maybe send a ST_RESET if we're in CS_RESET?
1798
1799        const byte *selack_ptr = NULL;
1800
1801        // Unpack UTP packet options
1802        // Data pointer
1803        const byte *data = (const byte*)pf + conn->get_header_size();
1804        if (conn->get_header_size() > len) {
1805                LOG_UTPV("0x%08x: Invalid packet size (less than header size)", conn);
1806                return 0;
1807        }
1808        // Skip the extension headers
1809        uint extension = conn->version == 0 ? pf->ext : pf1->ext;
1810        if (extension != 0) {
1811                do {
1812                        // Verify that the packet is valid.
1813                        data += 2;
1814
1815                        if ((int)(packet_end - data) < 0 || (int)(packet_end - data) < data[-1]) {
1816                                LOG_UTPV("0x%08x: Invalid len of extensions", conn);
1817                                return 0;
1818                        }
1819
1820                        switch(extension) {
1821                        case 1: // Selective Acknowledgment
1822                                selack_ptr = data;
1823                                break;
1824                        case 2: // extension bits
1825                                if (data[-1] != 8) {
1826                                        LOG_UTPV("0x%08x: Invalid len of extension bits header", conn);
1827                                        return 0;
1828                                }
1829                                memcpy(conn->extensions, data, 8);
1830                                LOG_UTPV("0x%08x: got extension bits:%02x%02x%02x%02x%02x%02x%02x%02x", conn,
1831                                        conn->extensions[0], conn->extensions[1], conn->extensions[2], conn->extensions[3],
1832                                        conn->extensions[4], conn->extensions[5], conn->extensions[6], conn->extensions[7]);
1833                        }
1834                        extension = data[-2];
1835                        data += data[-1];
1836                } while (extension);
1837        }
1838
1839        if (conn->state == CS_SYN_SENT) {
1840                // if this is a syn-ack, initialize our ack_nr
1841                // to match the sequence number we got from
1842                // the other end
1843                conn->ack_nr = (pk_seq_nr - 1) & SEQ_NR_MASK;
1844        }
1845
1846        g_current_ms = UTP_GetMilliseconds();
1847        conn->last_got_packet = g_current_ms;
1848
1849        if (syn) {
1850                return 0;
1851        }
1852
1853        // seqnr is the number of packets past the expected
1854        // packet this is. ack_nr is the last acked, seq_nr is the
1855        // current. Subtracring 1 makes 0 mean "this is the next
1856        // expected packet".
1857        const uint seqnr = (pk_seq_nr - conn->ack_nr - 1) & SEQ_NR_MASK;
1858
1859        // Getting an invalid sequence number?
1860        if (seqnr >= REORDER_BUFFER_MAX_SIZE) {
1861                if (seqnr >= (SEQ_NR_MASK + 1) - REORDER_BUFFER_MAX_SIZE && pk_flags != ST_STATE) {
1862                        conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD);
1863                }
1864                LOG_UTPV("    Got old Packet/Ack (%u/%u)=%u!", pk_seq_nr, conn->ack_nr, seqnr);
1865                return 0;
1866        }
1867
1868        // Process acknowledgment
1869        // acks is the number of packets that was acked
1870        int acks = (pk_ack_nr - (conn->seq_nr - 1 - conn->cur_window_packets)) & ACK_NR_MASK;
1871
1872        // this happens when we receive an old ack nr
1873        if (acks > conn->cur_window_packets) acks = 0;
1874
1875        // if we get the same ack_nr as in the last packet
1876        // increase the duplicate_ack counter, otherwise reset
1877        // it to 0
1878        if (conn->cur_window_packets > 0) {
1879                if (pk_ack_nr == ((conn->seq_nr - conn->cur_window_packets - 1) & ACK_NR_MASK) &&
1880                        conn->cur_window_packets > 0) {
1881                        //++conn->duplicate_ack;
1882                } else {
1883                        conn->duplicate_ack = 0;
1884                }
1885
1886                // TODO: if duplicate_ack == DUPLICATE_ACK_BEFORE_RESEND
1887                // and fast_resend_seq_nr <= ack_nr + 1
1888                //    resend ack_nr + 1
1889        }
1890
1891        // figure out how many bytes were acked
1892        size_t acked_bytes = 0;
1893
1894        // the minimum rtt of all acks
1895        // this is the upper limit on the delay we get back
1896        // from the other peer. Our delay cannot exceed
1897        // the rtt of the packet. If it does, clamp it.
1898        // this is done in apply_ledbat_ccontrol()
1899        int64 min_rtt = INT64_MAX;
1900
1901        for (int i = 0; i < acks; ++i) {
1902                int seq = conn->seq_nr - conn->cur_window_packets + i;
1903                OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(seq);
1904                if (pkt == 0 || pkt->transmissions == 0) continue;
1905                assert((int)(pkt->payload) >= 0);
1906                acked_bytes += pkt->payload;
1907                min_rtt = min<int64>(min_rtt, UTP_GetMicroseconds() - pkt->time_sent);
1908        }
1909       
1910        // count bytes acked by EACK
1911        if (selack_ptr != NULL) {
1912                acked_bytes += conn->selective_ack_bytes((pk_ack_nr + 2) & ACK_NR_MASK,
1913                                                                                                 selack_ptr, selack_ptr[-1], min_rtt);
1914        }
1915
1916        LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%d cur_window:%u cur_window_packets:%u relative_seqnr:%u max_window:%u min_rtt:%u rtt:%u",
1917                         conn, acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets,
1918                         seqnr, (uint)conn->max_window, (uint)(min_rtt / 1000), conn->rtt);
1919
1920        uint64 p;
1921
1922        if (conn->version == 0) {
1923                p = uint64(pf->tv_sec) * 1000000 + pf->tv_usec;
1924        } else {
1925                p = pf1->tv_usec;
1926        }
1927
1928        conn->last_measured_delay = g_current_ms;
1929
1930        // get delay in both directions
1931        // record the delay to report back
1932        const uint32 their_delay = (uint32)(p == 0 ? 0 : time - p);
1933        conn->reply_micro = their_delay;
1934        uint32 prev_delay_base = conn->their_hist.delay_base;
1935        if (their_delay != 0) conn->their_hist.add_sample(their_delay);
1936
1937        // if their new delay base is less than their previous one
1938        // we should shift our delay base in the other direction in order
1939        // to take the clock skew into account
1940        if (prev_delay_base != 0 &&
1941                wrapping_compare_less(conn->their_hist.delay_base, prev_delay_base)) {
1942                // never adjust more than 10 milliseconds
1943                if (prev_delay_base - conn->their_hist.delay_base <= 10000) {
1944                        conn->our_hist.shift(prev_delay_base - conn->their_hist.delay_base);
1945                }
1946        }
1947
1948        const uint32 actual_delay = conn->version==0
1949                ?(pf->reply_micro==INT_MAX?0:uint32(pf->reply_micro))
1950                :(uint32(pf1->reply_micro)==INT_MAX?0:uint32(pf1->reply_micro));
1951
1952        // if the actual delay is 0, it means the other end
1953        // hasn't received a sample from us yet, and doesn't
1954        // know what it is. We can't update out history unless
1955        // we have a true measured sample
1956        prev_delay_base = conn->our_hist.delay_base;
1957        if (actual_delay != 0) conn->our_hist.add_sample(actual_delay);
1958
1959        // if our new delay base is less than our previous one
1960        // we should shift the other end's delay base in the other
1961        // direction in order to take the clock skew into account
1962        // This is commented out because it creates bad interactions
1963        // with our adjustment in the other direction. We don't really
1964        // need our estimates of the other peer to be very accurate
1965        // anyway. The problem with shifting here is that we're more
1966        // likely shift it back later because of a low latency. This
1967        // second shift back would cause us to shift our delay base
1968        // which then get's into a death spiral of shifting delay bases
1969/*      if (prev_delay_base != 0 &&
1970                wrapping_compare_less(conn->our_hist.delay_base, prev_delay_base)) {
1971                // never adjust more than 10 milliseconds
1972                if (prev_delay_base - conn->our_hist.delay_base <= 10000) {
1973                        conn->their_hist.Shift(prev_delay_base - conn->our_hist.delay_base);
1974                }
1975        }
1976*/
1977
1978        // if the delay estimate exceeds the RTT, adjust the base_delay to
1979        // compensate
1980        if (conn->our_hist.get_value() > uint32(min_rtt)) {
1981                conn->our_hist.shift(conn->our_hist.get_value() - min_rtt);
1982        }
1983
1984        // only apply the congestion controller on acks
1985        // if we don't have a delay measurement, there's
1986        // no point in invoking the congestion control
1987        if (actual_delay != 0 && acked_bytes >= 1)
1988                conn->apply_ledbat_ccontrol(acked_bytes, actual_delay, min_rtt);
1989
1990        // sanity check, the other end should never ack packets
1991        // past the point we've sent
1992        if (acks <= conn->cur_window_packets) {
1993                conn->max_window_user = conn->version == 0
1994                        ? pf->windowsize * PACKET_SIZE : pf1->windowsize;
1995
1996                // If max user window is set to 0, then we startup a timer
1997                // That will reset it to 1 after 15 seconds.
1998                if (conn->max_window_user == 0)
1999                        // Reset max_window_user to 1 every 15 seconds.
2000                        conn->zerowindow_time = g_current_ms + 15000;
2001
2002                // Respond to connect message
2003                // Switch to CONNECTED state.
2004                if (conn->state == CS_SYN_SENT) {
2005                        conn->state = CS_CONNECTED;
2006                        conn->func.on_state(conn->userdata, UTP_STATE_CONNECT);
2007
2008                // We've sent a fin, and everything was ACKed (including the FIN),
2009                // it's safe to destroy the socket. cur_window_packets == acks
2010                // means that this packet acked all the remaining packets that
2011                // were in-flight.
2012                } else if (conn->state == CS_FIN_SENT && conn->cur_window_packets == acks) {
2013                        conn->state = CS_DESTROY;
2014                }
2015
2016                // Update fast resend counter
2017                if (wrapping_compare_less(conn->fast_resend_seq_nr, (pk_ack_nr + 1) & ACK_NR_MASK))
2018                        conn->fast_resend_seq_nr = pk_ack_nr + 1;
2019
2020                LOG_UTPV("0x%08x: fast_resend_seq_nr:%u", conn, conn->fast_resend_seq_nr);
2021
2022                for (int i = 0; i < acks; ++i) {
2023                        int ack_status = conn->ack_packet(conn->seq_nr - conn->cur_window_packets);
2024                        // if ack_status is 0, the packet was acked.
2025                        // if acl_stauts is 1, it means that the packet had already been acked
2026                        // if it's 2, the packet has not been sent yet
2027                        // We need to break this loop in the latter case. This could potentially
2028                        // happen if we get an ack_nr that does not exceed what we have stuffed
2029                        // into the outgoing buffer, but does exceed what we have sent
2030                        if (ack_status == 2) {
2031#ifdef _DEBUG
2032                                OutgoingPacket* pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets);
2033                                assert(pkt->transmissions == 0);
2034#endif
2035                                break;
2036                        }
2037                        conn->cur_window_packets--;
2038                }
2039#ifdef _DEBUG
2040                if (conn->cur_window_packets == 0) assert(conn->cur_window == 0);
2041#endif
2042
2043                // packets in front of this may have been acked by a
2044                // selective ack (EACK). Keep decreasing the window packet size
2045                // until we hit a packet that is still waiting to be acked
2046                // in the send queue
2047                // this is especially likely to happen when the other end
2048                // has the EACK send bug older versions of uTP had
2049                while (conn->cur_window_packets > 0 && !conn->outbuf.get(conn->seq_nr - conn->cur_window_packets))
2050                        conn->cur_window_packets--;
2051
2052#ifdef _DEBUG
2053                if (conn->cur_window_packets == 0) assert(conn->cur_window == 0);
2054#endif
2055
2056                // this invariant should always be true
2057                assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets));
2058
2059                // flush Nagle
2060                if (conn->cur_window_packets == 1) {
2061                        OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - 1);
2062                        // do we still have quota?
2063                        if (pkt->transmissions == 0 &&
2064                                (!(USE_PACKET_PACING) || conn->send_quota / 100 >= (int32)pkt->length)) {
2065                                conn->send_packet(pkt);
2066
2067                                // No need to send another ack if there is nothing to reorder.
2068                                if (conn->reorder_count == 0) {
2069                                        conn->sent_ack();
2070                                }
2071                        }
2072                }
2073
2074                // Fast timeout-retry
2075                if (conn->fast_timeout) {
2076                        LOG_UTPV("Fast timeout %u,%u,%u?", (uint)conn->cur_window, conn->seq_nr - conn->timeout_seq_nr, conn->timeout_seq_nr);
2077                        // if the fast_resend_seq_nr is not pointing to the oldest outstanding packet, it suggests that we've already
2078                        // resent the packet that timed out, and we should leave the fast-timeout mode.
2079                        if (((conn->seq_nr - conn->cur_window_packets) & ACK_NR_MASK) != conn->fast_resend_seq_nr) {
2080                                conn->fast_timeout = false;
2081                        } else {
2082                                // resend the oldest packet and increment fast_resend_seq_nr
2083                                // to not allow another fast resend on it again
2084                                OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets);
2085                                if (pkt && pkt->transmissions > 0) {
2086                                        LOG_UTPV("0x%08x: Packet %u fast timeout-retry.", conn, conn->seq_nr - conn->cur_window_packets);
2087#ifdef _DEBUG
2088                                        ++conn->_stats._fastrexmit;
2089#endif
2090                                        conn->fast_resend_seq_nr++;
2091                                        conn->send_packet(pkt);
2092                                }
2093                        }
2094                }
2095        }
2096
2097        // Process selective acknowledgent
2098        if (selack_ptr != NULL) {
2099                conn->selective_ack(pk_ack_nr + 2, selack_ptr, selack_ptr[-1]);
2100        }
2101
2102        // this invariant should always be true
2103        assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets));
2104
2105        LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%u cur_window:%u cur_window_packets:%u quota:%d",
2106                         conn, acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets,
2107                         conn->send_quota / 100);
2108
2109        // In case the ack dropped the current window below
2110        // the max_window size, Mark the socket as writable
2111        if (conn->state == CS_CONNECTED_FULL && conn->is_writable(conn->get_packet_size())) {
2112                conn->state = CS_CONNECTED;
2113                LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u",
2114                                 conn, (uint)conn->max_window, (uint)conn->cur_window, conn->send_quota / 100, (uint)conn->get_packet_size());
2115                conn->func.on_state(conn->userdata, UTP_STATE_WRITABLE);
2116        }
2117
2118        if (pk_flags == ST_STATE) {
2119                // This is a state packet only.
2120                return 0;
2121        }
2122
2123        // The connection is not in a state that can accept data?
2124        if (conn->state != CS_CONNECTED &&
2125                conn->state != CS_CONNECTED_FULL &&
2126                conn->state != CS_FIN_SENT) {
2127                return 0;
2128        }
2129
2130        // Is this a finalize packet?
2131        if (pk_flags == ST_FIN && !conn->got_fin) {
2132                LOG_UTPV("Got FIN eof_pkt:%u", pk_seq_nr);
2133                conn->got_fin = true;
2134                conn->eof_pkt = pk_seq_nr;
2135                // at this point, it is possible for the
2136                // other end to have sent packets with
2137                // sequence numbers higher than seq_nr.
2138                // if this is the case, our reorder_count
2139                // is out of sync. This case is dealt with
2140                // when we re-order and hit the eof_pkt.
2141                // we'll just ignore any packets with
2142                // sequence numbers past this
2143        }
2144
2145        // Getting an in-order packet?
2146        if (seqnr == 0) {
2147                size_t count = packet_end - data;
2148                if (count > 0 && conn->state != CS_FIN_SENT) {
2149                        LOG_UTPV("0x%08x: Got Data len:%u (rb:%u)", conn, (uint)count, (uint)conn->func.get_rb_size(conn->userdata));
2150                        // Post bytes to the upper layer
2151                        conn->func.on_read(conn->userdata, data, count);
2152                }
2153                conn->ack_nr++;
2154                conn->bytes_since_ack += count;
2155
2156                // Check if the next packet has been received too, but waiting
2157                // in the reorder buffer.
2158                for (;;) {
2159
2160                        if (conn->got_fin && conn->eof_pkt == conn->ack_nr) {
2161                                if (conn->state != CS_FIN_SENT) {
2162                                        conn->state = CS_GOT_FIN;
2163                                        conn->rto_timeout = g_current_ms + min<uint>(conn->rto * 3, 60);
2164
2165                                        LOG_UTPV("0x%08x: Posting EOF", conn);
2166                                        conn->func.on_state(conn->userdata, UTP_STATE_EOF);
2167                                }
2168
2169                                // if the other end wants to close, ack immediately
2170                                conn->send_ack();
2171
2172                                // reorder_count is not necessarily 0 at this point.
2173                                // even though it is most of the time, the other end
2174                                // may have sent packets with higher sequence numbers
2175                                // than what later end up being eof_pkt
2176                                // since we have received all packets up to eof_pkt
2177                                // just ignore the ones after it.
2178                                conn->reorder_count = 0;
2179                        }
2180
2181                        // Quick get-out in case there is nothing to reorder
2182                        if (conn->reorder_count == 0)
2183                                break;
2184
2185                        // Check if there are additional buffers in the reorder buffers
2186                        // that need delivery.
2187                        byte *p = (byte*)conn->inbuf.get(conn->ack_nr+1);
2188                        if (p == NULL)
2189                                break;
2190                        conn->inbuf.put(conn->ack_nr+1, NULL);
2191                        count = *(uint*)p;
2192                        if (count > 0 && conn->state != CS_FIN_SENT) {
2193                                // Pass the bytes to the upper layer
2194                                conn->func.on_read(conn->userdata, p + sizeof(uint), count);
2195                        }
2196                        conn->ack_nr++;
2197                        conn->bytes_since_ack += count;
2198
2199                        // Free the element from the reorder buffer
2200                        free(p);
2201                        assert(conn->reorder_count > 0);
2202                        conn->reorder_count--;
2203                }
2204
2205                // start the delayed ACK timer
2206                conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD);
2207        } else {
2208                // Getting an out of order packet.
2209                // The packet needs to be remembered and rearranged later.
2210
2211                // if we have received a FIN packet, and the EOF-sequence number
2212                // is lower than the sequence number of the packet we just received
2213                // something is wrong.
2214                if (conn->got_fin && pk_seq_nr > conn->eof_pkt) {
2215                        LOG_UTPV("0x%08x: Got an invalid packet sequence number, past EOF "
2216                                "reorder_count:%u len:%u (rb:%u)",
2217                                conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata));
2218                        return 0;
2219                }
2220
2221                // if the sequence number is entirely off the expected
2222                // one, just drop it. We can't allocate buffer space in
2223                // the inbuf entirely based on untrusted input
2224                if (seqnr > 0x3ff) {
2225                        LOG_UTPV("0x%08x: Got an invalid packet sequence number, too far off "
2226                                "reorder_count:%u len:%u (rb:%u)",
2227                                conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata));
2228                        return 0;
2229                }
2230
2231                // we need to grow the circle buffer before we
2232                // check if the packet is already in here, so that
2233                // we don't end up looking at an older packet (since
2234                // the indices wraps around).
2235                conn->inbuf.ensure_size(pk_seq_nr + 1, seqnr + 1);
2236
2237                // Has this packet already been received? (i.e. a duplicate)
2238                // If that is the case, just discard it.
2239                if (conn->inbuf.get(pk_seq_nr) != NULL) {
2240#ifdef _DEBUG
2241                        ++conn->_stats._nduprecv;
2242#endif
2243                        return 0;
2244                }
2245
2246                // Allocate memory to fit the packet that needs to re-ordered
2247                byte *mem = (byte*)malloc((packet_end - data) + sizeof(uint));
2248                *(uint*)mem = (uint)(packet_end - data);
2249                memcpy(mem + sizeof(uint), data, packet_end - data);
2250
2251                // Insert into reorder buffer and increment the count
2252                // of # of packets to be reordered.
2253                // we add one to seqnr in order to leave the last
2254                // entry empty, that way the assert in send_ack
2255                // is valid. we have to add one to seqnr too, in order
2256                // to make the circular buffer grow around the correct
2257                // point (which is conn->ack_nr + 1).
2258                assert(conn->inbuf.get(pk_seq_nr) == NULL);
2259                assert((pk_seq_nr & conn->inbuf.mask) != ((conn->ack_nr+1) & conn->inbuf.mask));
2260                conn->inbuf.put(pk_seq_nr, mem);
2261                conn->reorder_count++;
2262
2263                LOG_UTPV("0x%08x: Got out of order data reorder_count:%u len:%u (rb:%u)",
2264                        conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata));
2265
2266                // Setup so the partial ACK message will get sent immediately.
2267                conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, 1);
2268        }
2269
2270        // If ack_time or ack_bytes indicate that we need to send and ack, send one
2271        // here instead of waiting for the timer to trigger
2272        LOG_UTPV("bytes_since_ack:%u ack_time:%d",
2273                         (uint)conn->bytes_since_ack, (int)(g_current_ms - conn->ack_time));
2274        if (conn->state == CS_CONNECTED || conn->state == CS_CONNECTED_FULL) {
2275                if (conn->bytes_since_ack > DELAYED_ACK_BYTE_THRESHOLD ||
2276                        (int)(g_current_ms - conn->ack_time) >= 0) {
2277                        conn->send_ack();
2278                }
2279        }
2280        return (size_t)(packet_end - data);
2281}
2282
2283inline bool UTP_IsV1(PacketFormatV1 const* pf)
2284{
2285        return pf->version() == 1 && pf->type() < ST_NUM_STATES && pf->ext < 3;
2286}
2287
2288void UTP_Free(UTPSocket *conn)
2289{
2290        LOG_UTPV("0x%08x: Killing socket", conn);
2291
2292        conn->func.on_state(conn->userdata, UTP_STATE_DESTROYING);
2293        UTP_SetCallbacks(conn, NULL, NULL);
2294
2295        assert(conn->idx < g_utp_sockets.GetCount());
2296        assert(g_utp_sockets[conn->idx] == conn);
2297
2298        // Unlink object from the global list
2299        assert(g_utp_sockets.GetCount() > 0);
2300
2301        UTPSocket *last = g_utp_sockets[g_utp_sockets.GetCount() - 1];
2302
2303        assert(last->idx < g_utp_sockets.GetCount());
2304        assert(g_utp_sockets[last->idx] == last);
2305
2306        last->idx = conn->idx;
2307       
2308        g_utp_sockets[conn->idx] = last;
2309
2310        // Decrease the count
2311        g_utp_sockets.SetCount(g_utp_sockets.GetCount() - 1);
2312
2313        // Free all memory occupied by the socket object.
2314        for (size_t i = 0; i <= conn->inbuf.mask; i++) {
2315                free(conn->inbuf.elements[i]);
2316        }
2317        for (size_t i = 0; i <= conn->outbuf.mask; i++) {
2318                free(conn->outbuf.elements[i]);
2319        }
2320        free(conn->inbuf.elements);
2321        free(conn->outbuf.elements);
2322
2323        // Finally free the socket object
2324        free(conn);
2325}
2326
2327
2328// Public functions:
2329///////////////////////////////////////////////////////////////////////////////
2330
2331// Create a UTP socket
2332UTPSocket *UTP_Create(SendToProc *send_to_proc, void *send_to_userdata, const struct sockaddr *addr, socklen_t addrlen)
2333{
2334        UTPSocket *conn = (UTPSocket*)calloc(1, sizeof(UTPSocket));
2335
2336        g_current_ms = UTP_GetMilliseconds();
2337
2338        UTP_SetCallbacks(conn, NULL, NULL);
2339        conn->our_hist.clear();
2340        conn->their_hist.clear();
2341        conn->rto = 3000;
2342        conn->rtt_var = 800;
2343        conn->seq_nr = 1;
2344        conn->ack_nr = 0;
2345        conn->max_window_user = 255 * PACKET_SIZE;
2346        conn->addr = PackedSockAddr((const SOCKADDR_STORAGE*)addr, addrlen);
2347        conn->send_to_proc = send_to_proc;
2348        conn->send_to_userdata = send_to_userdata;
2349        conn->ack_time = g_current_ms + 0x70000000;
2350        conn->last_got_packet = g_current_ms;
2351        conn->last_sent_packet = g_current_ms;
2352        conn->last_measured_delay = g_current_ms + 0x70000000;
2353        conn->last_rwin_decay = int32(g_current_ms) - MAX_WINDOW_DECAY;
2354        conn->last_send_quota = g_current_ms;
2355        conn->send_quota = PACKET_SIZE * 100;
2356        conn->cur_window_packets = 0;
2357        conn->fast_resend_seq_nr = conn->seq_nr;
2358
2359        // default to version 1
2360        UTP_SetSockopt(conn, SO_UTPVERSION, 1);
2361
2362        // we need to fit one packet in the window
2363        // when we start the connection
2364        conn->max_window = conn->get_packet_size();
2365        conn->state = CS_IDLE;
2366
2367        conn->outbuf.mask = 15;
2368        conn->inbuf.mask = 15;
2369
2370        conn->outbuf.elements = (void**)calloc(16, sizeof(void*));
2371        conn->inbuf.elements = (void**)calloc(16, sizeof(void*));
2372
2373        conn->idx = g_utp_sockets.Append(conn);
2374
2375        LOG_UTPV("0x%08x: UTP_Create", conn);
2376
2377        return conn;
2378}
2379
2380void UTP_SetCallbacks(UTPSocket *conn, UTPFunctionTable *funcs, void *userdata)
2381{
2382        assert(conn);
2383
2384        if (funcs == NULL) {
2385                funcs = &zero_funcs;
2386        }
2387        conn->func = *funcs;
2388        conn->userdata = userdata;
2389}
2390
2391bool UTP_SetSockopt(UTPSocket* conn, int opt, int val)
2392{
2393        assert(conn);
2394
2395        switch (opt) {
2396        case SO_SNDBUF:
2397                assert(val >= 1);
2398                conn->opt_sndbuf = val;
2399                return true;
2400        case SO_RCVBUF:
2401                conn->opt_rcvbuf = val;
2402                return true;
2403        case SO_UTPVERSION:
2404                assert(conn->state == CS_IDLE);
2405                if (conn->state != CS_IDLE) {
2406                        // too late
2407                        return false;
2408                }
2409                if (conn->version == 1 && val == 0) {
2410                        conn->reply_micro = INT_MAX;
2411                        conn->opt_rcvbuf = 200 * 1024;
2412                        conn->opt_sndbuf = OUTGOING_BUFFER_MAX_SIZE * PACKET_SIZE;
2413                } else if (conn->version == 0 && val == 1) {
2414                        conn->reply_micro = 0;
2415                        conn->opt_rcvbuf = 3 * 1024 * 1024 + 512 * 1024;
2416                        conn->opt_sndbuf = conn->opt_rcvbuf;
2417                }
2418                conn->version = val;
2419                return true;
2420        }
2421
2422        return false;
2423}
2424
2425// Try to connect to a specified host.
2426// 'initial' is the number of data bytes to send in the connect packet.
2427void UTP_Connect(UTPSocket *conn)
2428{
2429        assert(conn);
2430
2431        assert(conn->state == CS_IDLE);
2432        assert(conn->cur_window_packets == 0);
2433        assert(conn->outbuf.get(conn->seq_nr) == NULL);
2434        assert(sizeof(PacketFormatV1) == 20);
2435
2436        conn->state = CS_SYN_SENT;
2437
2438        g_current_ms = UTP_GetMilliseconds();
2439
2440        // Create and send a connect message
2441        uint32 conn_seed = UTP_Random();
2442
2443        // we identify newer versions by setting the
2444        // first two bytes to 0x0001
2445        if (conn->version > 0) {
2446                conn_seed &= 0xffff;
2447        }
2448
2449        // used in parse_log.py
2450        LOG_UTP("0x%08x: UTP_Connect conn_seed:%u packet_size:%u (B) "
2451                        "target_delay:%u (ms) delay_history:%u "
2452                        "delay_base_history:%u (minutes)",
2453                        conn, conn_seed, PACKET_SIZE, CCONTROL_TARGET / 1000,
2454                        CUR_DELAY_SIZE, DELAY_BASE_HISTORY);
2455
2456        // Setup initial timeout timer.
2457        conn->retransmit_timeout = 3000;
2458        conn->rto_timeout = g_current_ms + conn->retransmit_timeout;
2459        conn->last_rcv_win = conn->get_rcv_window();
2460
2461        conn->conn_seed = conn_seed;
2462        conn->conn_id_recv = conn_seed;
2463        conn->conn_id_send = conn_seed+1;
2464        // if you need compatibiltiy with 1.8.1, use this. it increases attackability though.
2465        //conn->seq_nr = 1;
2466        conn->seq_nr = UTP_Random();
2467
2468        // Create the connect packet.
2469        const size_t header_ext_size = conn->get_header_extensions_size();
2470
2471        OutgoingPacket *pkt = (OutgoingPacket*)malloc(sizeof(OutgoingPacket) - 1 + header_ext_size);
2472
2473        PacketFormatExtensions* p = (PacketFormatExtensions*)pkt->data;
2474        PacketFormatExtensionsV1* p1 = (PacketFormatExtensionsV1*)pkt->data;
2475
2476        memset(p, 0, header_ext_size);
2477        // SYN packets are special, and have the receive ID in the connid field,
2478        // instead of conn_id_send.
2479        if (conn->version == 0) {
2480                p->pf.connid = conn->conn_id_recv;
2481                p->pf.ext = 2;
2482                p->pf.windowsize = (byte)DIV_ROUND_UP(conn->last_rcv_win, PACKET_SIZE);
2483                p->pf.seq_nr = conn->seq_nr;
2484                p->pf.flags = ST_SYN;
2485                p->ext_next = 0;
2486                p->ext_len = 8;
2487                memset(p->extensions, 0, 8);
2488        } else {
2489                p1->pf.set_version(1);
2490                p1->pf.set_type(ST_SYN);
2491                p1->pf.ext = 2;
2492                p1->pf.connid = conn->conn_id_recv;
2493                p1->pf.windowsize = (uint32)conn->last_rcv_win;
2494                p1->pf.seq_nr = conn->seq_nr;
2495                p1->ext_next = 0;
2496                p1->ext_len = 8;
2497                memset(p1->extensions, 0, 8);
2498        }
2499        pkt->transmissions = 0;
2500        pkt->length = header_ext_size;
2501        pkt->payload = 0;
2502
2503        //LOG_UTPV("0x%08x: Sending connect %s [%u].",
2504        //               conn, addrfmt(conn->addr, addrbuf), conn_seed);
2505
2506        // Remember the message in the outgoing queue.
2507        conn->outbuf.ensure_size(conn->seq_nr, conn->cur_window_packets);
2508        conn->outbuf.put(conn->seq_nr, pkt);
2509        conn->seq_nr++;
2510        conn->cur_window_packets++;
2511
2512        conn->send_packet(pkt);
2513}
2514
2515bool UTP_IsIncomingUTP(UTPGotIncomingConnection *incoming_proc,
2516                                           SendToProc *send_to_proc, void *send_to_userdata,
2517                                           const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
2518{
2519        const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
2520
2521        if (len < sizeof(PacketFormat) && len < sizeof(PacketFormatV1)) {
2522                LOG_UTPV("recv %s len:%u too small", addrfmt(addr, addrbuf), (uint)len);
2523                return false;
2524        }
2525
2526        const PacketFormat* p = (PacketFormat*)buffer;
2527        const PacketFormatV1* p1 = (PacketFormatV1*)buffer;
2528
2529        const byte version = UTP_IsV1(p1);
2530        const uint32 id = (version == 0) ? p->connid : uint32(p1->connid);
2531
2532        if (version == 0 && len < sizeof(PacketFormat)) {
2533                LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(addr, addrbuf), (uint)len, version);
2534                return false;
2535        }
2536
2537        if (version == 1 && len < sizeof(PacketFormatV1)) {
2538                LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(addr, addrbuf), (uint)len, version);
2539                return false;
2540        }
2541
2542        LOG_UTPV("recv %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, id);
2543
2544        const PacketFormat *pf = (PacketFormat*)p;
2545        const PacketFormatV1 *pf1 = (PacketFormatV1*)p;
2546
2547        if (version == 0) {
2548                LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf->seq_nr, (uint)pf->ack_nr);
2549        } else {
2550                LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf1->seq_nr, (uint)pf1->ack_nr);
2551        }
2552
2553        const byte flags = version == 0 ? pf->flags : pf1->type();
2554
2555        for (size_t i = 0; i < g_utp_sockets.GetCount(); i++) {
2556                UTPSocket *conn = g_utp_sockets[i];
2557                //LOG_UTPV("Examining UTPSocket %s for %s and (seed:%u s:%u r:%u) for %u",
2558                //              addrfmt(conn->addr, addrbuf), addrfmt(addr, addrbuf2), conn->conn_seed, conn->conn_id_send, conn->conn_id_recv, id);
2559                if (conn->addr != addr)
2560                        continue;
2561
2562                if (flags == ST_RESET && (conn->conn_id_send == id || conn->conn_id_recv == id)) {
2563                        LOG_UTPV("0x%08x: recv RST for existing connection", conn);
2564                        if (!conn->userdata || conn->state == CS_FIN_SENT) {
2565                                conn->state = CS_DESTROY;
2566                        } else {
2567                                conn->state = CS_RESET;
2568                        }
2569                        if (conn->userdata) {
2570                                conn->func.on_overhead(conn->userdata, false, len + conn->get_udp_overhead(),
2571                                                                           close_overhead);
2572                                const int err = conn->state == CS_SYN_SENT ?
2573                                        ECONNREFUSED :
2574                                        ECONNRESET;
2575                                conn->func.on_error(conn->userdata, err);
2576                        }
2577                        return true;
2578                } else if (flags != ST_SYN && conn->conn_id_recv == id) {
2579                        LOG_UTPV("0x%08x: recv processing", conn);
2580                        const size_t read = UTP_ProcessIncoming(conn, buffer, len);
2581                        if (conn->userdata) {
2582                                conn->func.on_overhead(conn->userdata, false,
2583                                        (len - read) + conn->get_udp_overhead(),
2584                                        header_overhead);
2585                        }
2586                        return true;
2587                }
2588        }
2589
2590        if (flags == ST_RESET) {
2591                LOG_UTPV("recv RST for unknown connection");
2592                return true;
2593        }
2594
2595        const uint32 seq_nr = version == 0 ? pf->seq_nr : pf1->seq_nr;
2596        if (flags != ST_SYN) {
2597                for (size_t i = 0; i < g_rst_info.GetCount(); i++) {
2598                        if (g_rst_info[i].connid != id)
2599                                continue;
2600                        if (g_rst_info[i].addr != addr)
2601                                continue;
2602                        if (seq_nr != g_rst_info[i].ack_nr)
2603                                continue;
2604                        g_rst_info[i].timestamp = UTP_GetMilliseconds();
2605                        LOG_UTPV("recv not sending RST to non-SYN (stored)");
2606                        return true;
2607                }
2608                if (g_rst_info.GetCount() > RST_INFO_LIMIT) {
2609                        LOG_UTPV("recv not sending RST to non-SYN (limit at %u stored)", (uint)g_rst_info.GetCount());
2610                        return true;
2611                }
2612                LOG_UTPV("recv send RST to non-SYN (%u stored)", (uint)g_rst_info.GetCount());
2613                RST_Info &r = g_rst_info.Append();
2614                r.addr = addr;
2615                r.connid = id;
2616                r.ack_nr = seq_nr;
2617                r.timestamp = UTP_GetMilliseconds();
2618
2619                UTPSocket::send_rst(send_to_proc, send_to_userdata, addr, id, seq_nr, UTP_Random(), version);
2620                return true;
2621        }
2622
2623        if (incoming_proc) {
2624                LOG_UTPV("Incoming connection from %s uTP version:%u", addrfmt(addr, addrbuf), version);
2625
2626                // Create a new UTP socket to handle this new connection
2627                UTPSocket *conn = UTP_Create(send_to_proc, send_to_userdata, to, tolen);
2628                // Need to track this value to be able to detect duplicate CONNECTs
2629                conn->conn_seed = id;
2630                // This is value that identifies this connection for them.
2631                conn->conn_id_send = id;
2632                // This is value that identifies this connection for us.
2633                conn->conn_id_recv = id+1;
2634                conn->ack_nr = seq_nr;
2635                conn->seq_nr = UTP_Random();
2636                conn->fast_resend_seq_nr = conn->seq_nr;
2637
2638                UTP_SetSockopt(conn, SO_UTPVERSION, version);
2639                conn->state = CS_CONNECTED;
2640
2641                const size_t read = UTP_ProcessIncoming(conn, buffer, len, true);
2642
2643                LOG_UTPV("0x%08x: recv send connect ACK", conn);
2644                conn->send_ack(true);
2645
2646                incoming_proc(send_to_userdata, conn);
2647
2648                // we report overhead after incoming_proc, because the callbacks are setup now
2649                if (conn->userdata) {
2650                        // SYN
2651                        conn->func.on_overhead(conn->userdata, false, (len - read) + conn->get_udp_overhead(),
2652                                                                   header_overhead);
2653                        // SYNACK
2654                        conn->func.on_overhead(conn->userdata, true, conn->get_overhead(),
2655                                                                   ack_overhead);
2656                }
2657        }
2658
2659        return true;
2660}
2661
2662bool UTP_HandleICMP(const byte* buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
2663{
2664        const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
2665
2666        // Want the whole packet so we have connection ID
2667        if (len < sizeof(PacketFormat)) {
2668                return false;
2669        }
2670
2671        const PacketFormat* p = (PacketFormat*)buffer;
2672        const PacketFormatV1* p1 = (PacketFormatV1*)buffer;
2673
2674        const byte version = UTP_IsV1(p1);
2675        const uint32 id = (version == 0) ? p->connid : uint32(p1->connid);
2676
2677        for (size_t i = 0; i < g_utp_sockets.GetCount(); ++i) {
2678                UTPSocket *conn = g_utp_sockets[i];
2679                if (conn->addr == addr &&
2680                        conn->conn_id_recv == id) {
2681                        // Don't pass on errors for idle/closed connections
2682                        if (conn->state != CS_IDLE) {
2683                                if (!conn->userdata || conn->state == CS_FIN_SENT) {
2684                                        LOG_UTPV("0x%08x: icmp packet causing socket destruction", conn);
2685                                        conn->state = CS_DESTROY;
2686                                } else {
2687                                        conn->state = CS_RESET;
2688                                }
2689                                if (conn->userdata) {
2690                                        const int err = conn->state == CS_SYN_SENT ?
2691                                                ECONNREFUSED :
2692                                                ECONNRESET;
2693                                        LOG_UTPV("0x%08x: icmp packet causing error on socket:%d", conn, err);
2694                                        conn->func.on_error(conn->userdata, err);
2695                                }
2696                        }
2697                        return true;
2698                }
2699        }
2700        return false;
2701}
2702
2703// Write bytes to the UTP socket.
2704// Returns true if the socket is still writable.
2705bool UTP_Write(UTPSocket *conn, size_t bytes)
2706{
2707        assert(conn);
2708
2709#ifdef g_log_utp_verbose
2710        size_t param = bytes;
2711#endif
2712
2713        if (conn->state != CS_CONNECTED) {
2714                LOG_UTPV("0x%08x: UTP_Write %u bytes = false (not CS_CONNECTED)", conn, (uint)bytes);
2715                return false;
2716        }
2717
2718        g_current_ms = UTP_GetMilliseconds();
2719
2720        conn->update_send_quota();
2721
2722        // don't send unless it will all fit in the window
2723        size_t packet_size = conn->get_packet_size();
2724        size_t num_to_send = min<size_t>(bytes, packet_size);
2725        while (conn->is_writable(num_to_send)) {
2726                // Send an outgoing packet.
2727                // Also add it to the outgoing of packets that have been sent but not ACKed.
2728
2729                if (num_to_send == 0) {
2730                        LOG_UTPV("0x%08x: UTP_Write %u bytes = true", conn, (uint)param);
2731                        return true;
2732                }
2733                bytes -= num_to_send;
2734
2735                LOG_UTPV("0x%08x: Sending packet. seq_nr:%u ack_nr:%u wnd:%u/%u/%u rcv_win:%u size:%u quota:%d cur_window_packets:%u",
2736                                 conn, conn->seq_nr, conn->ack_nr,
2737                                 (uint)(conn->cur_window + num_to_send),
2738                                 (uint)conn->max_window, (uint)conn->max_window_user,
2739                                 (uint)conn->last_rcv_win, num_to_send, conn->send_quota / 100,
2740                                 conn->cur_window_packets);
2741                conn->write_outgoing_packet(num_to_send, ST_DATA);
2742                num_to_send = min<size_t>(bytes, packet_size);
2743        }
2744
2745        // mark the socket as not being writable.
2746        conn->state = CS_CONNECTED_FULL;
2747        LOG_UTPV("0x%08x: UTP_Write %u bytes = false", conn, (uint)bytes);
2748        return false;
2749}
2750
2751void UTP_RBDrained(UTPSocket *conn)
2752{
2753        assert(conn);
2754
2755        const size_t rcvwin = conn->get_rcv_window();
2756
2757        if (rcvwin > conn->last_rcv_win) {
2758                // If last window was 0 send ACK immediately, otherwise should set timer
2759                if (conn->last_rcv_win == 0) {
2760                        conn->send_ack();
2761                } else {
2762                        conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD);
2763                }
2764        }
2765}
2766
2767void UTP_CheckTimeouts()
2768{
2769        g_current_ms = UTP_GetMilliseconds();
2770
2771        for (size_t i = 0; i < g_rst_info.GetCount(); i++) {
2772                if ((int)(g_current_ms - g_rst_info[i].timestamp) >= RST_INFO_TIMEOUT) {
2773                        g_rst_info.MoveUpLast(i);
2774                        i--;
2775                }
2776        }
2777        if (g_rst_info.GetCount() != g_rst_info.GetAlloc()) {
2778                g_rst_info.Compact();
2779        }
2780
2781        for (size_t i = 0; i != g_utp_sockets.GetCount(); i++) {
2782                UTPSocket *conn = g_utp_sockets[i];
2783                conn->check_timeouts();
2784
2785                // Check if the object was deleted
2786                if (conn->state == CS_DESTROY) {
2787                        LOG_UTPV("0x%08x: Destroying", conn);
2788                        UTP_Free(conn);
2789                        i--;
2790                }
2791        }
2792}
2793
2794size_t UTP_GetPacketSize(UTPSocket *socket)
2795{
2796        return socket->get_packet_size();
2797}
2798
2799void UTP_GetPeerName(UTPSocket *conn, struct sockaddr *addr, socklen_t *addrlen)
2800{
2801        assert(conn);
2802
2803        socklen_t len;
2804        const SOCKADDR_STORAGE sa = conn->addr.get_sockaddr_storage(&len);
2805        *addrlen = min(len, *addrlen);
2806        memcpy(addr, &sa, *addrlen);
2807}
2808
2809void UTP_GetDelays(UTPSocket *conn, int32 *ours, int32 *theirs, uint32 *age)
2810{
2811        assert(conn);
2812
2813        if (ours) *ours = conn->our_hist.get_value();
2814        if (theirs) *theirs = conn->their_hist.get_value();
2815        if (age) *age = g_current_ms - conn->last_measured_delay;
2816}
2817
2818#ifdef _DEBUG
2819void UTP_GetStats(UTPSocket *conn, UTPStats *stats)
2820{
2821        assert(conn);
2822
2823        *stats = conn->_stats;
2824}
2825#endif // _DEBUG
2826
2827void UTP_GetGlobalStats(UTPGlobalStats *stats)
2828{
2829        *stats = _global_stats;
2830}
2831
2832// Close the UTP socket.
2833// It is not valid for the upper layer to refer to socket after it is closed.
2834// Data will keep to try being delivered after the close.
2835void UTP_Close(UTPSocket *conn)
2836{
2837        assert(conn);
2838
2839        assert(conn->state != CS_DESTROY_DELAY && conn->state != CS_FIN_SENT && conn->state != CS_DESTROY);
2840
2841        LOG_UTPV("0x%08x: UTP_Close in state:%s", conn, statenames[conn->state]);
2842
2843        switch(conn->state) {
2844        case CS_CONNECTED:
2845        case CS_CONNECTED_FULL:
2846                conn->state = CS_FIN_SENT;
2847                conn->write_outgoing_packet(0, ST_FIN);
2848                break;
2849
2850        case CS_SYN_SENT:
2851                conn->rto_timeout = UTP_GetMilliseconds() + min<uint>(conn->rto * 2, 60);
2852        case CS_GOT_FIN:
2853                conn->state = CS_DESTROY_DELAY;
2854                break;
2855
2856        default:
2857                conn->state = CS_DESTROY;
2858                break;
2859        }
2860}
Note: See TracBrowser for help on using the repository browser.