source: trunk/third-party/libutp/utp.cpp @ 12663

Last change on this file since 12663 was 12663, checked in by jordan, 10 years ago

#4260 "µTP not working on embedded device" -- apply the changes from https://github.com/ghazel/libutp rather than http://github.com/bittorrent/libutp

File size: 86.4 KB
Line 
1#include <StdAfx.h>
2
3#include "utp.h"
4#include "templates.h"
5
6#include <stdio.h>
7#include <assert.h>
8#include <string.h>
9#include <string.h>
10#include <stdlib.h>
11#include <errno.h>
12#include <limits.h> // for UINT_MAX
13
14#ifdef WIN32
15#include "win32_inet_ntop.h"
16
17// newer versions of MSVC define these in errno.h
18#ifndef ECONNRESET
19#define ECONNRESET WSAECONNRESET
20#define EMSGSIZE WSAEMSGSIZE
21#define ECONNREFUSED WSAECONNREFUSED
22#define ETIMEDOUT WSAETIMEDOUT
23#endif
24#endif
25
26#ifdef POSIX
27typedef sockaddr_storage SOCKADDR_STORAGE;
28#endif // POSIX
29
30// number of bytes to increase max window size by, per RTT. This is
31// scaled down linearly proportional to off_target. i.e. if all packets
32// in one window have 0 delay, window size will increase by this number.
33// Typically it's less. TCP increases one MSS per RTT, which is 1500
34#define MAX_CWND_INCREASE_BYTES_PER_RTT 3000
35#define CUR_DELAY_SIZE 3
36// experiments suggest that a clock skew of 10 ms per 325 seconds
37// is not impossible. Reset delay_base every 13 minutes. The clock
38// skew is dealt with by observing the delay base in the other
39// direction, and adjusting our own upwards if the opposite direction
40// delay base keeps going down
41#define DELAY_BASE_HISTORY 13
42#define MAX_WINDOW_DECAY 100 // ms
43
44#define REORDER_BUFFER_SIZE 32
45#define REORDER_BUFFER_MAX_SIZE 511
46#define OUTGOING_BUFFER_MAX_SIZE 511
47
48#define PACKET_SIZE 350
49
50// this is the minimum max_window value. It can never drop below this
51#define MIN_WINDOW_SIZE 10
52
53// when window sizes are smaller than one packet_size, this
54// will pace the packets to average at the given window size
55// if it's not set, it will simply not send anything until
56// there's a timeout
57#define USE_PACKET_PACING 1
58
59// if we receive 4 or more duplicate acks, we resend the packet
60// that hasn't been acked yet
61#define DUPLICATE_ACKS_BEFORE_RESEND 3
62
63#define DELAYED_ACK_BYTE_THRESHOLD 2400 // bytes
64#define DELAYED_ACK_TIME_THRESHOLD 100 // milliseconds
65
66#define RST_INFO_TIMEOUT 10000
67#define RST_INFO_LIMIT 1000
68// 29 seconds determined from measuring many home NAT devices
69#define KEEPALIVE_INTERVAL 29000
70
71
72#define SEQ_NR_MASK 0xFFFF
73#define ACK_NR_MASK 0xFFFF
74
75#define DIV_ROUND_UP(num, denom) ((num + denom - 1) / denom)
76
77#include "utp_utils.h"
78#include "utp_config.h"
79
80#define LOG_UTP if (g_log_utp) utp_log
81#define LOG_UTPV if (g_log_utp_verbose) utp_log
82
83uint32 g_current_ms;
84
85// The totals are derived from the following data:
86//  45: IPv6 address including embedded IPv4 address
87//  11: Scope Id
88//   2: Brackets around IPv6 address when port is present
89//   6: Port (including colon)
90//   1: Terminating null byte
91char addrbuf[65];
92char addrbuf2[65];
93#define addrfmt(x, s) x.fmt(s, sizeof(s))
94
95#pragma pack(push,1)
96
97struct PACKED_ATTRIBUTE PackedSockAddr {
98
99        // The values are always stored here in network byte order
100        union {
101                byte _in6[16];          // IPv6
102                uint16 _in6w[8];        // IPv6, word based (for convenience)
103                uint32 _in6d[4];        // Dword access
104                in6_addr _in6addr;      // For convenience
105        } _in;
106
107        // Host byte order
108        uint16 _port;
109
110#define _sin4 _in._in6d[3]      // IPv4 is stored where it goes if mapped
111
112#define _sin6 _in._in6
113#define _sin6w _in._in6w
114#define _sin6d _in._in6d
115
116        byte get_family() const
117        {
118                return (IN6_IS_ADDR_V4MAPPED(&_in._in6addr) != 0) ? AF_INET : AF_INET6;
119        }
120
121        bool operator==(const PackedSockAddr& rhs) const
122        {
123                if (&rhs == this)
124                        return true;
125                if (_port != rhs._port)
126                        return false;
127                return memcmp(_sin6, rhs._sin6, sizeof(_sin6)) == 0;
128        }
129        bool operator!=(const PackedSockAddr& rhs) const { return !(*this == rhs); }
130
131        PackedSockAddr(const SOCKADDR_STORAGE* sa, socklen_t len)
132        {
133                if (sa->ss_family == AF_INET) {
134                        assert(len >= sizeof(sockaddr_in));
135                        const sockaddr_in *sin = (sockaddr_in*)sa;
136                        _sin6w[0] = 0;
137                        _sin6w[1] = 0;
138                        _sin6w[2] = 0;
139                        _sin6w[3] = 0;
140                        _sin6w[4] = 0;
141                        _sin6w[5] = 0xffff;
142                        _sin4 = sin->sin_addr.s_addr;
143                        _port = ntohs(sin->sin_port);
144                } else {
145                        assert(len >= sizeof(sockaddr_in6));
146                        const sockaddr_in6 *sin6 = (sockaddr_in6*)sa;
147                        _in._in6addr = sin6->sin6_addr;
148                        _port = ntohs(sin6->sin6_port);
149                }
150        }
151
152        SOCKADDR_STORAGE get_sockaddr_storage(socklen_t *len = NULL) const
153        {
154                SOCKADDR_STORAGE sa;
155                const byte family = get_family();
156                if (family == AF_INET) {
157                        sockaddr_in *sin = (sockaddr_in*)&sa;
158                        if (len) *len = sizeof(sockaddr_in);
159                        memset(sin, 0, sizeof(sockaddr_in));
160                        sin->sin_family = family;
161                        sin->sin_port = htons(_port);
162                        sin->sin_addr.s_addr = _sin4;
163                } else {
164                        sockaddr_in6 *sin6 = (sockaddr_in6*)&sa;
165                        memset(sin6, 0, sizeof(sockaddr_in6));
166                        if (len) *len = sizeof(sockaddr_in6);
167                        sin6->sin6_family = family;
168                        sin6->sin6_addr = _in._in6addr;
169                        sin6->sin6_port = htons(_port);
170                }
171                return sa;
172        }
173
174        cstr fmt(str s, size_t len) const
175        {
176                memset(s, 0, len);
177                const byte family = get_family();
178                str i;
179                if (family == AF_INET) {
180                        inet_ntop(family, (uint32*)&_sin4, s, len);
181                        i = s;
182                        while (*++i) {}
183                } else {
184                        i = s;
185                        *i++ = '[';
186                        inet_ntop(family, (in6_addr*)&_in._in6addr, i, len-1);
187                        while (*++i) {}
188                        *i++ = ']';
189                }
190                snprintf(i, len - (i-s), ":%u", _port);
191                return s;
192        }
193};
194
195struct PACKED_ATTRIBUTE RST_Info {
196        PackedSockAddr addr;
197        uint32 connid;
198        uint32 timestamp;
199        uint16 ack_nr;
200};
201
202// these packet sizes are including the uTP header wich
203// is either 20 or 23 bytes depending on version
204#define PACKET_SIZE_EMPTY_BUCKET 0
205#define PACKET_SIZE_EMPTY 23
206#define PACKET_SIZE_SMALL_BUCKET 1
207#define PACKET_SIZE_SMALL 373
208#define PACKET_SIZE_MID_BUCKET 2
209#define PACKET_SIZE_MID 723
210#define PACKET_SIZE_BIG_BUCKET 3
211#define PACKET_SIZE_BIG 1400
212#define PACKET_SIZE_HUGE_BUCKET 4
213
214struct PACKED_ATTRIBUTE PacketFormat {
215        // connection ID
216        uint32_big connid;
217        uint32_big tv_sec;
218        uint32_big tv_usec;
219        uint32_big reply_micro;
220        // receive window size in PACKET_SIZE chunks
221        byte windowsize;
222        // Type of the first extension header
223        byte ext;
224        // Flags
225        byte flags;
226        // Sequence number
227        uint16_big seq_nr;
228        // Acknowledgment number
229        uint16_big ack_nr;
230};
231
232struct PACKED_ATTRIBUTE PacketFormatAck {
233        PacketFormat pf;
234        byte ext_next;
235        byte ext_len;
236        byte acks[4];
237};
238
239struct PACKED_ATTRIBUTE PacketFormatExtensions {
240        PacketFormat pf;
241        byte ext_next;
242        byte ext_len;
243        byte extensions[8];
244};
245
246struct PACKED_ATTRIBUTE PacketFormatV1 {
247        // packet_type (4 high bits)
248        // protocol version (4 low bits)
249        byte ver_type;
250        byte version() const { return ver_type & 0xf; }
251        byte type() const { return ver_type >> 4; }
252        void set_version(byte v) { ver_type = (ver_type & 0xf0) | (v & 0xf); }
253        void set_type(byte t) { ver_type = (ver_type & 0xf) | (t << 4); }
254
255        // Type of the first extension header
256        byte ext;
257        // connection ID
258        uint16_big connid;
259        uint32_big tv_usec;
260        uint32_big reply_micro;
261        // receive window size in bytes
262        uint32_big windowsize;
263        // Sequence number
264        uint16_big seq_nr;
265        // Acknowledgment number
266        uint16_big ack_nr;
267};
268
269struct PACKED_ATTRIBUTE PacketFormatAckV1 {
270        PacketFormatV1 pf;
271        byte ext_next;
272        byte ext_len;
273        byte acks[4];
274};
275
276struct PACKED_ATTRIBUTE PacketFormatExtensionsV1 {
277        PacketFormatV1 pf;
278        byte ext_next;
279        byte ext_len;
280        byte extensions[8];
281};
282
283#pragma pack(pop)
284
285enum {
286        ST_DATA = 0,            // Data packet.
287        ST_FIN = 1,                     // Finalize the connection. This is the last packet.
288        ST_STATE = 2,           // State packet. Used to transmit an ACK with no data.
289        ST_RESET = 3,           // Terminate connection forcefully.
290        ST_SYN = 4,                     // Connect SYN
291        ST_NUM_STATES,          // used for bounds checking
292};
293
294static const cstr flagnames[] = {
295        "ST_DATA","ST_FIN","ST_STATE","ST_RESET","ST_SYN"
296};
297
298enum CONN_STATE {
299        CS_IDLE = 0,
300        CS_SYN_SENT = 1,
301        CS_CONNECTED = 2,
302        CS_CONNECTED_FULL = 3,
303        CS_GOT_FIN = 4,
304        CS_DESTROY_DELAY = 5,
305        CS_FIN_SENT = 6,
306        CS_RESET = 7,
307        CS_DESTROY = 8,
308};
309
310static const cstr statenames[] = {
311        "IDLE","SYN_SENT","CONNECTED","CONNECTED_FULL","GOT_FIN","DESTROY_DELAY","FIN_SENT","RESET","DESTROY"
312};
313
314struct OutgoingPacket {
315        size_t length;
316        size_t payload;
317        uint64 time_sent; // microseconds
318        uint transmissions:31;
319        bool need_resend:1;
320        byte data[1];
321};
322
323void no_read(void *socket, const byte *bytes, size_t count) {}
324void no_write(void *socket, byte *bytes, size_t count) {}
325size_t no_rb_size(void *socket) { return 0; }
326void no_state(void *socket, int state) {}
327void no_error(void *socket, int errcode) {}
328void no_overhead(void *socket, bool send, size_t count, int type) {}
329
330UTPFunctionTable zero_funcs = {
331        &no_read,
332        &no_write,
333        &no_rb_size,
334        &no_state,
335        &no_error,
336        &no_overhead,
337};
338
339struct SizableCircularBuffer {
340        // This is the mask. Since it's always a power of 2, adding 1 to this value will return the size.
341        size_t mask;
342        // This is the elements that the circular buffer points to
343        void **elements;
344
345        void *get(size_t i) { assert(elements); return elements ? elements[i & mask] : NULL; }
346        void put(size_t i, void *data) { assert(elements); elements[i&mask] = data; }
347
348        void grow(size_t item, size_t index);
349        void ensure_size(size_t item, size_t index) { if (index > mask) grow(item, index); }
350        size_t size() { return mask + 1; }
351};
352
353static struct UTPGlobalStats _global_stats;
354
355// Item contains the element we want to make space for
356// index is the index in the list.
357void SizableCircularBuffer::grow(size_t item, size_t index)
358{
359        // Figure out the new size.
360        size_t size = mask + 1;
361        do size *= 2; while (index >= size);
362
363        // Allocate the new buffer
364        void **buf = (void**)calloc(size, sizeof(void*));
365
366        size--;
367
368        // Copy elements from the old buffer to the new buffer
369        for (size_t i = 0; i <= mask; i++) {
370                buf[(item - index + i) & size] = get(item - index + i);
371        }
372
373        // Swap to the newly allocated buffer
374        mask = size;
375        free(elements);
376        elements = buf;
377}
378
379// compare if lhs is less than rhs, taking wrapping
380// into account. if lhs is close to UINT_MAX and rhs
381// is close to 0, lhs is assumed to have wrapped and
382// considered smaller
383bool wrapping_compare_less(uint32 lhs, uint32 rhs)
384{
385        // distance walking from lhs to rhs, downwards
386        const uint32 dist_down = lhs - rhs;
387        // distance walking from lhs to rhs, upwards
388        const uint32 dist_up = rhs - lhs;
389
390        // if the distance walking up is shorter, lhs
391        // is less than rhs. If the distance walking down
392        // is shorter, then rhs is less than lhs
393        return dist_up < dist_down;
394}
395
396struct DelayHist {
397        uint32 delay_base;
398
399        // this is the history of delay samples,
400        // normalized by using the delay_base. These
401        // values are always greater than 0 and measures
402        // the queuing delay in microseconds
403        uint32 cur_delay_hist[CUR_DELAY_SIZE];
404        size_t cur_delay_idx;
405
406        // this is the history of delay_base. It's
407        // a number that doesn't have an absolute meaning
408        // only relative. It doesn't make sense to initialize
409        // it to anything other than values relative to
410        // what's been seen in the real world.
411        uint32 delay_base_hist[DELAY_BASE_HISTORY];
412        size_t delay_base_idx;
413        // the time when we last stepped the delay_base_idx
414        uint32 delay_base_time;
415
416        bool delay_base_initialized;
417
418        void clear()
419        {
420                delay_base_initialized = false;
421                delay_base = 0;
422                cur_delay_idx = 0;
423                delay_base_idx = 0;
424                delay_base_time = g_current_ms;
425                for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
426                        cur_delay_hist[i] = 0;
427                }
428                for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
429                        delay_base_hist[i] = 0;
430                }
431        }
432
433        void shift(const uint32 offset)
434        {
435                // the offset should never be "negative"
436                // assert(offset < 0x10000000);
437
438                // increase all of our base delays by this amount
439                // this is used to take clock skew into account
440                // by observing the other side's changes in its base_delay
441                for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
442                        delay_base_hist[i] += offset;
443                }
444                delay_base += offset;
445        }
446
447        void add_sample(const uint32 sample)
448        {
449                // The two clocks (in the two peers) are assumed not to
450                // progress at the exact same rate. They are assumed to be
451                // drifting, which causes the delay samples to contain
452                // a systematic error, either they are under-
453                // estimated or over-estimated. This is why we update the
454                // delay_base every two minutes, to adjust for this.
455
456                // This means the values will keep drifting and eventually wrap.
457                // We can cross the wrapping boundry in two directions, either
458                // going up, crossing the highest value, or going down, crossing 0.
459
460                // if the delay_base is close to the max value and sample actually
461                // wrapped on the other end we would see something like this:
462                // delay_base = 0xffffff00, sample = 0x00000400
463                // sample - delay_base = 0x500 which is the correct difference
464
465                // if the delay_base is instead close to 0, and we got an even lower
466                // sample (that will eventually update the delay_base), we may see
467                // something like this:
468                // delay_base = 0x00000400, sample = 0xffffff00
469                // sample - delay_base = 0xfffffb00
470                // this needs to be interpreted as a negative number and the actual
471                // recorded delay should be 0.
472
473                // It is important that all arithmetic that assume wrapping
474                // is done with unsigned intergers. Signed integers are not guaranteed
475                // to wrap the way unsigned integers do. At least GCC takes advantage
476                // of this relaxed rule and won't necessarily wrap signed ints.
477
478                // remove the clock offset and propagation delay.
479                // delay base is min of the sample and the current
480                // delay base. This min-operation is subject to wrapping
481                // and care needs to be taken to correctly choose the
482                // true minimum.
483
484                // specifically the problem case is when delay_base is very small
485                // and sample is very large (because it wrapped past zero), sample
486                // needs to be considered the smaller
487
488                if (!delay_base_initialized) {
489                        // delay_base being 0 suggests that we haven't initialized
490                        // it or its history with any real measurements yet. Initialize
491                        // everything with this sample.
492                        for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
493                                // if we don't have a value, set it to the current sample
494                                delay_base_hist[i] = sample;
495                                continue;
496                        }
497                        delay_base = sample;
498                        delay_base_initialized = true;
499                }
500
501                if (wrapping_compare_less(sample, delay_base_hist[delay_base_idx])) {
502                        // sample is smaller than the current delay_base_hist entry
503                        // update it
504                        delay_base_hist[delay_base_idx] = sample;
505                }
506
507                // is sample lower than delay_base? If so, update delay_base
508                if (wrapping_compare_less(sample, delay_base)) {
509                        // sample is smaller than the current delay_base
510                        // update it
511                        delay_base = sample;
512                }
513               
514                // this operation may wrap, and is supposed to
515                const uint32 delay = sample - delay_base;
516                // sanity check. If this is triggered, something fishy is going on
517                // it means the measured sample was greater than 32 seconds!
518//              assert(delay < 0x2000000);
519
520                cur_delay_hist[cur_delay_idx] = delay;
521                cur_delay_idx = (cur_delay_idx + 1) % CUR_DELAY_SIZE;
522
523                // once every minute
524                if (g_current_ms - delay_base_time > 60 * 1000) {
525                        delay_base_time = g_current_ms;
526                        delay_base_idx = (delay_base_idx + 1) % DELAY_BASE_HISTORY;
527                        // clear up the new delay base history spot by initializing
528                        // it to the current sample, then update it
529                        delay_base_hist[delay_base_idx] = sample;
530                        delay_base = delay_base_hist[0];
531                        // Assign the lowest delay in the last 2 minutes to delay_base
532                        for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
533                                if (wrapping_compare_less(delay_base_hist[i], delay_base))
534                                        delay_base = delay_base_hist[i];
535                        }
536                }
537        }
538
539        uint32 get_value()
540        {
541                uint32 value = UINT_MAX;
542                for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
543                        value = min<uint32>(cur_delay_hist[i], value);
544                }
545                // value could be UINT_MAX if we have no samples yet...
546                return value;
547        }
548};
549
550struct UTPSocket {
551        PackedSockAddr addr;
552
553        size_t idx;
554
555        uint16 reorder_count;
556        byte duplicate_ack;
557
558        // the number of bytes we've received but not acked yet
559        size_t bytes_since_ack;
560
561        // the number of packets in the send queue. Packets that haven't
562        // yet been sent count as well as packets marked as needing resend
563        // the oldest un-acked packet in the send queue is seq_nr - cur_window_packets
564        uint16 cur_window_packets;
565
566        // how much of the window is used, number of bytes in-flight
567        // packets that have not yet been sent do not count, packets
568        // that are marked as needing to be re-sent (due to a timeout)
569        // don't count either
570        size_t cur_window;
571        // maximum window size, in bytes
572        size_t max_window;
573        // SO_SNDBUF setting, in bytes
574        size_t opt_sndbuf;
575        // SO_RCVBUF setting, in bytes
576        size_t opt_rcvbuf;
577
578        // Is a FIN packet in the reassembly buffer?
579        bool got_fin:1;
580        // Timeout procedure
581        bool fast_timeout:1;
582
583        // max receive window for other end, in bytes
584        size_t max_window_user;
585        // 0 = original uTP header, 1 = second revision
586        byte version;
587        CONN_STATE state;
588        // TickCount when we last decayed window (wraps)
589        int32 last_rwin_decay;
590
591        // the sequence number of the FIN packet. This field is only set
592        // when we have received a FIN, and the flag field has the FIN flag set.
593        // it is used to know when it is safe to destroy the socket, we must have
594        // received all packets up to this sequence number first.
595        uint16 eof_pkt;
596
597        // All sequence numbers up to including this have been properly received
598        // by us
599        uint16 ack_nr;
600        // This is the sequence number for the next packet to be sent.
601        uint16 seq_nr;
602
603        uint16 timeout_seq_nr;
604
605        // This is the sequence number of the next packet we're allowed to
606        // do a fast resend with. This makes sure we only do a fast-resend
607        // once per packet. We can resend the packet with this sequence number
608        // or any later packet (with a higher sequence number).
609        uint16 fast_resend_seq_nr;
610
611        uint32 reply_micro;
612
613        // the time when we need to send another ack. If there's
614        // nothing to ack, this is a very large number
615        uint32 ack_time;
616
617        uint32 last_got_packet;
618        uint32 last_sent_packet;
619        uint32 last_measured_delay;
620        uint32 last_maxed_out_window;
621
622        // the last time we added send quota to the connection
623        // when adding send quota, this is subtracted from the
624        // current time multiplied by max_window / rtt
625        // which is the current allowed send rate.
626        int32 last_send_quota;
627
628        // the number of bytes we are allowed to send on
629        // this connection. If this is more than one packet
630        // size when we run out of data to send, it is clamped
631        // to the packet size
632        // this value is multiplied by 100 in order to get
633        // higher accuracy when dealing with low rates
634        int32 send_quota;
635
636        SendToProc *send_to_proc;
637        void *send_to_userdata;
638        UTPFunctionTable func;
639        void *userdata;
640
641        // Round trip time
642        uint rtt;
643        // Round trip time variance
644        uint rtt_var;
645        // Round trip timeout
646        uint rto;
647        DelayHist rtt_hist;
648        uint retransmit_timeout;
649        // The RTO timer will timeout here.
650        uint rto_timeout;
651        // When the window size is set to zero, start this timer. It will send a new packet every 30secs.
652        uint32 zerowindow_time;
653
654        uint32 conn_seed;
655        // Connection ID for packets I receive
656        uint32 conn_id_recv;
657        // Connection ID for packets I send
658        uint32 conn_id_send;
659        // Last rcv window we advertised, in bytes
660        size_t last_rcv_win;
661
662        DelayHist our_hist;
663        DelayHist their_hist;
664
665        // extension bytes from SYN packet
666        byte extensions[8];
667
668        SizableCircularBuffer inbuf, outbuf;
669
670#ifdef _DEBUG
671        // Public stats, returned by UTP_GetStats().  See utp.h
672        UTPStats _stats;
673#endif // _DEBUG
674
675        // Calculates the current receive window
676        size_t get_rcv_window() const
677        {
678                // If we don't have a connection (such as during connection
679                // establishment, always act as if we have an empty buffer).
680                if (!userdata) return opt_rcvbuf;
681
682                // Trim window down according to what's already in buffer.
683                const size_t numbuf = func.get_rb_size(userdata);
684                assert((int)numbuf >= 0);
685                return opt_rcvbuf > numbuf ? opt_rcvbuf - numbuf : 0;
686        }
687
688        // Test if we're ready to decay max_window
689        // XXX this breaks when spaced by > INT_MAX/2, which is 49
690        // days; the failure mode in that case is we do an extra decay
691        // or fail to do one when we really shouldn't.
692        bool can_decay_win(int32 msec) const
693        {
694                return msec - last_rwin_decay >= MAX_WINDOW_DECAY;
695        }
696
697        // If we can, decay max window, returns true if we actually did so
698        void maybe_decay_win()
699        {
700                if (can_decay_win(g_current_ms)) {
701                        // TCP uses 0.5
702                        max_window = (size_t)(max_window * .5);
703                        last_rwin_decay = g_current_ms;
704                        if (max_window < MIN_WINDOW_SIZE)
705                                max_window = MIN_WINDOW_SIZE;
706                }
707        }
708
709        size_t get_header_size() const
710        {
711                return (version ? sizeof(PacketFormatV1) : sizeof(PacketFormat));
712        }
713
714        size_t get_header_extensions_size() const
715        {
716                return (version ? sizeof(PacketFormatExtensionsV1) : sizeof(PacketFormatExtensions));
717        }
718
719        void sent_ack()
720        {
721                ack_time = g_current_ms + 0x70000000;
722                bytes_since_ack = 0;
723        }
724
725        size_t get_udp_mtu() const
726        {
727                socklen_t len;
728                SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
729                return UTP_GetUDPMTU((const struct sockaddr *)&sa, len);
730        }
731
732        size_t get_udp_overhead() const
733        {
734                socklen_t len;
735                SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
736                return UTP_GetUDPOverhead((const struct sockaddr *)&sa, len);
737        }
738
739        uint64 get_global_utp_bytes_sent() const
740        {
741                socklen_t len;
742                SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
743                return UTP_GetGlobalUTPBytesSent((const struct sockaddr *)&sa, len);
744        }
745
746        size_t get_overhead() const
747        {
748                return get_udp_overhead() + get_header_size();
749        }
750
751        void send_data(PacketFormat* b, size_t length, bandwidth_type_t type);
752
753        void send_ack(bool synack = false);
754
755        void send_keep_alive();
756
757        static void send_rst(SendToProc *send_to_proc, void *send_to_userdata,
758                                                 const PackedSockAddr &addr, uint32 conn_id_send,
759                                                 uint16 ack_nr, uint16 seq_nr, byte version);
760
761        void send_packet(OutgoingPacket *pkt);
762
763        bool is_writable(size_t to_write);
764
765        bool flush_packets();
766
767        void write_outgoing_packet(size_t payload, uint flags);
768
769        void update_send_quota();
770
771#ifdef _DEBUG
772        void check_invariant();
773#endif
774
775        void check_timeouts();
776
777        int ack_packet(uint16 seq);
778
779        size_t selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt);
780
781        void selective_ack(uint base, const byte *mask, byte len);
782
783        void apply_ledbat_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt);
784
785        size_t get_packet_size();
786};
787
788Array<RST_Info> g_rst_info;
789Array<UTPSocket*> g_utp_sockets;
790
791static void UTP_RegisterSentPacket(size_t length) {
792        if (length <= PACKET_SIZE_MID) {
793                if (length <= PACKET_SIZE_EMPTY) {
794                        _global_stats._nraw_send[PACKET_SIZE_EMPTY_BUCKET]++;
795                } else if (length <= PACKET_SIZE_SMALL) {
796                        _global_stats._nraw_send[PACKET_SIZE_SMALL_BUCKET]++;
797                } else
798                        _global_stats._nraw_send[PACKET_SIZE_MID_BUCKET]++;
799        } else {
800                if (length <= PACKET_SIZE_BIG) {
801                        _global_stats._nraw_send[PACKET_SIZE_BIG_BUCKET]++;
802                } else
803                        _global_stats._nraw_send[PACKET_SIZE_HUGE_BUCKET]++;
804        }
805}
806
807void send_to_addr(SendToProc *send_to_proc, void *send_to_userdata, const byte *p, size_t len, const PackedSockAddr &addr)
808{
809        socklen_t tolen;
810        SOCKADDR_STORAGE to = addr.get_sockaddr_storage(&tolen);
811        UTP_RegisterSentPacket(len);
812        send_to_proc(send_to_userdata, p, len, (const struct sockaddr *)&to, tolen);
813}
814
815void UTPSocket::send_data(PacketFormat* b, size_t length, bandwidth_type_t type)
816{
817        // time stamp this packet with local time, the stamp goes into
818        // the header of every packet at the 8th byte for 8 bytes :
819        // two integers, check packet.h for more
820        uint64 time = UTP_GetMicroseconds();
821
822        PacketFormatV1* b1 = (PacketFormatV1*)b;
823        if (version == 0) {
824                b->tv_sec = (uint32)(time / 1000000);
825                b->tv_usec = time % 1000000;
826                b->reply_micro = reply_micro;
827        } else {
828                b1->tv_usec = (uint32)time;
829                b1->reply_micro = reply_micro;
830        }
831
832        last_sent_packet = g_current_ms;
833
834#ifdef _DEBUG
835        _stats._nbytes_xmit += length;
836        ++_stats._nxmit;
837#endif
838        if (userdata) {
839                size_t n;
840                if (type == payload_bandwidth) {
841                        // if this packet carries payload, just
842                        // count the header as overhead
843                        type = header_overhead;
844                        n = get_overhead();
845                } else {
846                        n = length + get_udp_overhead();
847                }
848                func.on_overhead(userdata, true, n, type);
849        }
850#if g_log_utp_verbose
851        int flags = version == 0 ? b->flags : b1->type();
852        uint16 seq_nr = version == 0 ? b->seq_nr : b1->seq_nr;
853        uint16 ack_nr = version == 0 ? b->ack_nr : b1->ack_nr;
854        LOG_UTPV("0x%08x: send %s len:%u id:%u timestamp:"I64u" reply_micro:%u flags:%s seq_nr:%u ack_nr:%u",
855                         this, addrfmt(addr, addrbuf), (uint)length, conn_id_send, time, reply_micro, flagnames[flags],
856                         seq_nr, ack_nr);
857#endif
858        send_to_addr(send_to_proc, send_to_userdata, (const byte*)b, length, addr);
859}
860
861void UTPSocket::send_ack(bool synack)
862{
863        PacketFormatExtensions pfe;
864        zeromem(&pfe);
865        PacketFormatExtensionsV1& pfe1 = (PacketFormatExtensionsV1&)pfe;
866        PacketFormatAck& pfa = (PacketFormatAck&)pfe1;
867        PacketFormatAckV1& pfa1 = (PacketFormatAckV1&)pfe1;
868
869        size_t len;
870        last_rcv_win = get_rcv_window();
871        if (version == 0) {
872                pfa.pf.connid = conn_id_send;
873                pfa.pf.ack_nr = (uint16)ack_nr;
874                pfa.pf.seq_nr = (uint16)seq_nr;
875                pfa.pf.flags = ST_STATE;
876                pfa.pf.ext = 0;
877                pfa.pf.windowsize = (byte)DIV_ROUND_UP(last_rcv_win, PACKET_SIZE);
878                len = sizeof(PacketFormat);
879        } else {
880                pfa1.pf.set_version(1);
881                pfa1.pf.set_type(ST_STATE);
882                pfa1.pf.ext = 0;
883                pfa1.pf.connid = conn_id_send;
884                pfa1.pf.ack_nr = ack_nr;
885                pfa1.pf.seq_nr = seq_nr;
886                pfa1.pf.windowsize = (uint32)last_rcv_win;
887                len = sizeof(PacketFormatV1);
888        }
889
890        // we never need to send EACK for connections
891        // that are shutting down
892        if (reorder_count != 0 && state < CS_GOT_FIN) {
893                // if reorder count > 0, send an EACK.
894                // reorder count should always be 0
895                // for synacks, so this should not be
896                // as synack
897                assert(!synack);
898                if (version == 0) {
899                        pfa.pf.ext = 1;
900                        pfa.ext_next = 0;
901                        pfa.ext_len = 4;
902                } else {
903                        pfa1.pf.ext = 1;
904                        pfa1.ext_next = 0;
905                        pfa1.ext_len = 4;
906                }
907                uint m = 0;
908
909                // reorder count should only be non-zero
910                // if the packet ack_nr + 1 has not yet
911                // been received
912                assert(inbuf.get(ack_nr + 1) == NULL);
913                size_t window = min<size_t>(14+16, inbuf.size());
914                // Generate bit mask of segments received.
915                for (size_t i = 0; i < window; i++) {
916                        if (inbuf.get(ack_nr + i + 2) != NULL) {
917                                m |= 1 << i;
918                                LOG_UTPV("0x%08x: EACK packet [%u]", this, ack_nr + i + 2);
919                        }
920                }
921                if (version == 0) {
922                        pfa.acks[0] = (byte)m;
923                        pfa.acks[1] = (byte)(m >> 8);
924                        pfa.acks[2] = (byte)(m >> 16);
925                        pfa.acks[3] = (byte)(m >> 24);
926                } else {
927                        pfa1.acks[0] = (byte)m;
928                        pfa1.acks[1] = (byte)(m >> 8);
929                        pfa1.acks[2] = (byte)(m >> 16);
930                        pfa1.acks[3] = (byte)(m >> 24);
931                }
932                len += 4 + 2;
933                LOG_UTPV("0x%08x: Sending EACK %u [%u] bits:[%032b]", this, ack_nr, conn_id_send, m);
934        } else if (synack) {
935                // we only send "extensions" in response to SYN
936                // and the reorder count is 0 in that state
937
938                LOG_UTPV("0x%08x: Sending ACK %u [%u] with extension bits", this, ack_nr, conn_id_send);
939                if (version == 0) {
940                        pfe.pf.ext = 2;
941                        pfe.ext_next = 0;
942                        pfe.ext_len = 8;
943                        memset(pfe.extensions, 0, 8);
944                } else {
945                        pfe1.pf.ext = 2;
946                        pfe1.ext_next = 0;
947                        pfe1.ext_len = 8;
948                        memset(pfe1.extensions, 0, 8);
949                }
950                len += 8 + 2;
951        } else {
952                LOG_UTPV("0x%08x: Sending ACK %u [%u]", this, ack_nr, conn_id_send);
953        }
954
955        sent_ack();
956        send_data((PacketFormat*)&pfe, len, ack_overhead);
957}
958
959void UTPSocket::send_keep_alive()
960{
961        ack_nr--;
962        LOG_UTPV("0x%08x: Sending KeepAlive ACK %u [%u]", this, ack_nr, conn_id_send);
963        send_ack();
964        ack_nr++;
965}
966
967void UTPSocket::send_rst(SendToProc *send_to_proc, void *send_to_userdata,
968                                                 const PackedSockAddr &addr, uint32 conn_id_send, uint16 ack_nr, uint16 seq_nr, byte version)
969{
970        PacketFormat pf;
971        zeromem(&pf);
972        PacketFormatV1& pf1 = (PacketFormatV1&)pf;
973
974        size_t len;
975        if (version == 0) {
976                pf.connid = conn_id_send;
977                pf.ack_nr = ack_nr;
978                pf.seq_nr = seq_nr;
979                pf.flags = ST_RESET;
980                pf.ext = 0;
981                pf.windowsize = 0;
982                len = sizeof(PacketFormat);
983        } else {
984                pf1.set_version(1);
985                pf1.set_type(ST_RESET);
986                pf1.ext = 0;
987                pf1.connid = conn_id_send;
988                pf1.ack_nr = ack_nr;
989                pf1.seq_nr = seq_nr;
990                pf1.windowsize = 0;
991                len = sizeof(PacketFormatV1);
992        }
993
994        LOG_UTPV("%s: Sending RST id:%u seq_nr:%u ack_nr:%u", addrfmt(addr, addrbuf), conn_id_send, seq_nr, ack_nr);
995        LOG_UTPV("send %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, conn_id_send);
996        send_to_addr(send_to_proc, send_to_userdata, (const byte*)&pf1, len, addr);
997}
998
999void UTPSocket::send_packet(OutgoingPacket *pkt)
1000{
1001        // only count against the quota the first time we
1002        // send the packet. Don't enforce quota when closing
1003        // a socket. Only enforce the quota when we're sending
1004        // at slow rates (max window < packet size)
1005        size_t max_send = min(max_window, opt_sndbuf, max_window_user);
1006
1007        if (pkt->transmissions == 0 || pkt->need_resend) {
1008                cur_window += pkt->payload;
1009        }
1010
1011        size_t packet_size = get_packet_size();
1012        if (pkt->transmissions == 0 && max_send < packet_size) {
1013                assert(state == CS_FIN_SENT ||
1014                           (int32)pkt->payload <= send_quota / 100);
1015                send_quota = send_quota - (int32)(pkt->payload * 100);
1016        }
1017
1018        pkt->need_resend = false;
1019
1020        PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
1021        PacketFormat* p = (PacketFormat*)pkt->data;
1022        if (version == 0) {
1023                p->ack_nr = ack_nr;
1024        } else {
1025                p1->ack_nr = ack_nr;
1026        }
1027        pkt->time_sent = UTP_GetMicroseconds();
1028        pkt->transmissions++;
1029        sent_ack();
1030        send_data((PacketFormat*)pkt->data, pkt->length,
1031                (state == CS_SYN_SENT) ? connect_overhead
1032                : (pkt->transmissions == 1) ? payload_bandwidth
1033                : retransmit_overhead);
1034}
1035
1036bool UTPSocket::is_writable(size_t to_write)
1037{
1038        // return true if it's OK to stuff another packet into the
1039        // outgoing queue. Since we may be using packet pacing, we
1040        // might not actually send the packet right away to affect the
1041        // cur_window. The only thing that happens when we add another
1042        // packet is that cur_window_packets is increased.
1043        size_t max_send = min(max_window, opt_sndbuf, max_window_user);
1044
1045        size_t packet_size = get_packet_size();
1046
1047        if (cur_window + packet_size >= max_window)
1048                last_maxed_out_window = g_current_ms;
1049
1050        // if we don't have enough quota, we can't write regardless
1051        if (USE_PACKET_PACING) {
1052                if (send_quota / 100 < (int32)to_write) return false;
1053        }
1054
1055        // subtract one to save space for the FIN packet
1056        if (cur_window_packets >= OUTGOING_BUFFER_MAX_SIZE - 1) return false;
1057
1058        // if sending another packet would not make the window exceed
1059        // the max_window, we can write
1060        if (cur_window + packet_size <= max_send) return true;
1061
1062        // if the window size is less than a packet, and we have enough
1063        // quota to send a packet, we can write, even though it would
1064        // make the window exceed the max size
1065        // the last condition is needed to not put too many packets
1066        // in the send buffer. cur_window isn't updated until we flush
1067        // the send buffer, so we need to take the number of packets
1068        // into account
1069        if (USE_PACKET_PACING) {
1070                if (max_window < to_write &&
1071                        cur_window < max_window &&
1072                        cur_window_packets == 0) {
1073                        return true;
1074                }
1075        }
1076
1077        return false;
1078}
1079
1080bool UTPSocket::flush_packets()
1081{
1082        size_t packet_size = get_packet_size();
1083
1084        // send packets that are waiting on the pacer to be sent
1085        // i has to be an unsigned 16 bit counter to wrap correctly
1086        // signed types are not guaranteed to wrap the way you expect
1087        for (uint16 i = seq_nr - cur_window_packets; i != seq_nr; ++i) {
1088                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(i);
1089                if (pkt == 0 || (pkt->transmissions > 0 && pkt->need_resend == false)) continue;
1090                // have we run out of quota?
1091                if (!is_writable(pkt->payload)) {
1092                        return true;
1093                }
1094
1095                // Nagle check
1096                // don't send the last packet if we have one packet in-flight
1097                // and the current packet is still smaller than packet_size.
1098                if (i != ((seq_nr - 1) & ACK_NR_MASK) ||
1099                        cur_window_packets == 1 ||
1100                        pkt->payload >= packet_size) {
1101                        send_packet(pkt);
1102
1103                        // No need to send another ack if there is nothing to reorder.
1104                        if (reorder_count == 0) {
1105                                sent_ack();
1106                        }
1107                }
1108        }
1109        return false;
1110}
1111
1112void UTPSocket::write_outgoing_packet(size_t payload, uint flags)
1113{
1114        // Setup initial timeout timer
1115        if (cur_window_packets == 0) {
1116                retransmit_timeout = rto;
1117                rto_timeout = g_current_ms + retransmit_timeout;
1118                assert(cur_window == 0);
1119        }
1120
1121        size_t packet_size = get_packet_size();
1122        do {
1123                assert(cur_window_packets < OUTGOING_BUFFER_MAX_SIZE);
1124                assert(flags == ST_DATA || flags == ST_FIN);
1125
1126                size_t added = 0;
1127
1128                OutgoingPacket *pkt = NULL;
1129               
1130                if (cur_window_packets > 0) {
1131                        pkt = (OutgoingPacket*)outbuf.get(seq_nr - 1);
1132                }
1133
1134                const size_t header_size = get_header_size();
1135                bool append = true;
1136
1137                // if there's any room left in the last packet in the window
1138                // and it hasn't been sent yet, fill that frame first
1139                if (payload && pkt && !pkt->transmissions && pkt->payload < packet_size) {
1140                        // Use the previous unsent packet
1141                        added = min(payload + pkt->payload, max<size_t>(packet_size, pkt->payload)) - pkt->payload;
1142                        pkt = (OutgoingPacket*)realloc(pkt,
1143                                                                                   (sizeof(OutgoingPacket) - 1) +
1144                                                                                   header_size +
1145                                                                                   pkt->payload + added);
1146                        outbuf.put(seq_nr - 1, pkt);
1147                        append = false;
1148                        assert(!pkt->need_resend);
1149                } else {
1150                        // Create the packet to send.
1151                        added = payload;
1152                        pkt = (OutgoingPacket*)malloc((sizeof(OutgoingPacket) - 1) +
1153                                                                                  header_size +
1154                                                                                  added);
1155                        pkt->payload = 0;
1156                        pkt->transmissions = 0;
1157                        pkt->need_resend = false;
1158                }
1159
1160                if (added) {
1161                        // Fill it with data from the upper layer.
1162                        func.on_write(userdata, pkt->data + header_size + pkt->payload, added);
1163                }
1164                pkt->payload += added;
1165                pkt->length = header_size + pkt->payload;
1166
1167                last_rcv_win = get_rcv_window();
1168
1169                PacketFormat* p = (PacketFormat*)pkt->data;
1170                PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
1171                if (version == 0) {
1172                        p->connid = conn_id_send;
1173                        p->ext = 0;
1174                        p->windowsize = (byte)DIV_ROUND_UP(last_rcv_win, PACKET_SIZE);
1175                        p->ack_nr = ack_nr;
1176                        p->flags = flags;
1177                } else {
1178                        p1->set_version(1);
1179                        p1->set_type(flags);
1180                        p1->ext = 0;
1181                        p1->connid = conn_id_send;
1182                        p1->windowsize = (uint32)last_rcv_win;
1183                        p1->ack_nr = ack_nr;
1184                }
1185
1186                if (append) {
1187                        // Remember the message in the outgoing queue.
1188                        outbuf.ensure_size(seq_nr, cur_window_packets);
1189                        outbuf.put(seq_nr, pkt);
1190                        if (version == 0) p->seq_nr = seq_nr;
1191                        else p1->seq_nr = seq_nr;
1192                        seq_nr++;
1193                        cur_window_packets++;
1194                }
1195
1196                payload -= added;
1197
1198        } while (payload);
1199
1200        flush_packets();
1201}
1202
1203void UTPSocket::update_send_quota()
1204{
1205        int dt = g_current_ms - last_send_quota;
1206        if (dt == 0) return;
1207        last_send_quota = g_current_ms;
1208        size_t add = max_window * dt * 100 / (rtt_hist.delay_base?rtt_hist.delay_base:50);
1209        if (add > max_window * 100 && add > MAX_CWND_INCREASE_BYTES_PER_RTT * 100) add = max_window;
1210        send_quota += (int32)add;
1211//      LOG_UTPV("0x%08x: UTPSocket::update_send_quota dt:%d rtt:%u max_window:%u quota:%d",
1212//                       this, dt, rtt, (uint)max_window, send_quota / 100);
1213}
1214
1215#ifdef _DEBUG
1216void UTPSocket::check_invariant()
1217{
1218        if (reorder_count > 0) {
1219                assert(inbuf.get(ack_nr + 1) == NULL);
1220        }
1221
1222        size_t outstanding_bytes = 0;
1223        for (int i = 0; i < cur_window_packets; ++i) {
1224                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
1225                if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
1226                outstanding_bytes += pkt->payload;
1227        }
1228        assert(outstanding_bytes == cur_window);
1229}
1230#endif
1231
1232void UTPSocket::check_timeouts()
1233{
1234#ifdef _DEBUG
1235        check_invariant();
1236#endif
1237
1238        // this invariant should always be true
1239        assert(cur_window_packets == 0 || outbuf.get(seq_nr - cur_window_packets));
1240
1241        LOG_UTPV("0x%08x: CheckTimeouts timeout:%d max_window:%u cur_window:%u quota:%d "
1242                         "state:%s cur_window_packets:%u bytes_since_ack:%u ack_time:%d",
1243                         this, (int)(rto_timeout - g_current_ms), (uint)max_window, (uint)cur_window,
1244                         send_quota / 100, statenames[state], cur_window_packets,
1245                         (uint)bytes_since_ack, (int)(g_current_ms - ack_time));
1246
1247        update_send_quota();
1248        flush_packets();
1249
1250
1251        if (USE_PACKET_PACING) {
1252                // In case the new send quota made it possible to send another packet
1253                // Mark the socket as writable. If we don't use pacing, the send
1254                // quota does not affect if the socket is writeable
1255                // if we don't use packet pacing, the writable event is triggered
1256                // whenever the cur_window falls below the max_window, so we don't
1257                // need this check then
1258                if (state == CS_CONNECTED_FULL && is_writable(get_packet_size())) {
1259                        state = CS_CONNECTED;
1260                        LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u",
1261                                         this, (uint)max_window, (uint)cur_window, send_quota / 100, (uint)get_packet_size());
1262                        func.on_state(userdata, UTP_STATE_WRITABLE);
1263                }
1264        }
1265
1266        switch (state) {
1267        case CS_SYN_SENT:
1268        case CS_CONNECTED_FULL:
1269        case CS_CONNECTED:
1270        case CS_FIN_SENT: {
1271
1272                // Reset max window...
1273                if ((int)(g_current_ms - zerowindow_time) >= 0 && max_window_user == 0) {
1274                        max_window_user = PACKET_SIZE;
1275                }
1276
1277                if ((int)(g_current_ms - rto_timeout) >= 0 &&
1278                        (!(USE_PACKET_PACING) || cur_window_packets > 0) &&
1279                        rto_timeout > 0) {
1280
1281                        /*
1282                        OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
1283                       
1284                        // If there were a lot of retransmissions, force recomputation of round trip time
1285                        if (pkt->transmissions >= 4)
1286                                rtt = 0;
1287                        */
1288
1289                        // Increase RTO
1290                        const uint new_timeout = retransmit_timeout * 2;
1291                        if (new_timeout >= 30000 || (state == CS_SYN_SENT && new_timeout > 6000)) {
1292                                // more than 30 seconds with no reply. kill it.
1293                                // if we haven't even connected yet, give up sooner. 6 seconds
1294                                // means 2 tries at the following timeouts: 3, 6 seconds
1295                                if (state == CS_FIN_SENT)
1296                                        state = CS_DESTROY;
1297                                else
1298                                        state = CS_RESET;
1299                                func.on_error(userdata, ETIMEDOUT);
1300                                goto getout;
1301                        }
1302
1303                        retransmit_timeout = new_timeout;
1304                        rto_timeout = g_current_ms + new_timeout;
1305
1306                        // On Timeout
1307                        duplicate_ack = 0;
1308
1309                        // rate = min_rate
1310                        max_window = get_packet_size();
1311                        send_quota = max<int32>((int32)max_window * 100, send_quota);
1312
1313                        // every packet should be considered lost
1314                        for (int i = 0; i < cur_window_packets; ++i) {
1315                                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
1316                                if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
1317                                pkt->need_resend = true;
1318                                assert(cur_window >= pkt->payload);
1319                                cur_window -= pkt->payload;
1320                        }
1321
1322                        // used in parse_log.py
1323                        LOG_UTP("0x%08x: Packet timeout. Resend. seq_nr:%u. timeout:%u max_window:%u",
1324                                        this, seq_nr - cur_window_packets, retransmit_timeout, (uint)max_window);
1325
1326                        fast_timeout = true;
1327                        timeout_seq_nr = seq_nr;
1328
1329                        if (cur_window_packets > 0) {
1330                                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
1331                                assert(pkt);
1332                                send_quota = max<int32>((int32)pkt->length * 100, send_quota);
1333
1334                                // Re-send the packet.
1335                                send_packet(pkt);
1336                        }
1337                }
1338
1339                // Mark the socket as writable
1340                if (state == CS_CONNECTED_FULL && is_writable(get_packet_size())) {
1341                        state = CS_CONNECTED;
1342                        LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u",
1343                                         this, (uint)max_window, (uint)cur_window, send_quota / 100, (uint)get_packet_size());
1344                        func.on_state(userdata, UTP_STATE_WRITABLE);
1345                }
1346
1347                if (state >= CS_CONNECTED && state <= CS_FIN_SENT) {
1348                        // Send acknowledgment packets periodically, or when the threshold is reached
1349                        if (bytes_since_ack > DELAYED_ACK_BYTE_THRESHOLD ||
1350                                (int)(g_current_ms - ack_time) >= 0) {
1351                                send_ack();
1352                        }
1353
1354                        if ((int)(g_current_ms - last_sent_packet) >= KEEPALIVE_INTERVAL) {
1355                                send_keep_alive();
1356                        }
1357                }
1358
1359                break;
1360        }
1361
1362        // Close?
1363        case CS_GOT_FIN:
1364        case CS_DESTROY_DELAY:
1365                if ((int)(g_current_ms - rto_timeout) >= 0) {
1366                        state = (state == CS_DESTROY_DELAY) ? CS_DESTROY : CS_RESET;
1367                        if (cur_window_packets > 0 && userdata) {
1368                                func.on_error(userdata, ECONNRESET);
1369                        }
1370                }
1371                break;
1372        // prevent warning
1373        case CS_IDLE:
1374        case CS_RESET:
1375        case CS_DESTROY:
1376                break;
1377        }
1378
1379        getout:
1380
1381        // make sure we don't accumulate quota when we don't have
1382        // anything to send
1383        int32 limit = max<int32>((int32)max_window / 2, 5 * (int32)get_packet_size()) * 100;
1384        if (send_quota > limit) send_quota = limit;
1385}
1386
1387// returns:
1388// 0: the packet was acked.
1389// 1: it means that the packet had already been acked
1390// 2: the packet has not been sent yet
1391int UTPSocket::ack_packet(uint16 seq)
1392{
1393        OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq);
1394
1395        // the packet has already been acked (or not sent)
1396        if (pkt == NULL) {
1397                LOG_UTPV("0x%08x: got ack for:%u (already acked, or never sent)", this, seq);
1398                return 1;
1399        }
1400
1401        // can't ack packets that haven't been sent yet!
1402        if (pkt->transmissions == 0) {
1403                LOG_UTPV("0x%08x: got ack for:%u (never sent, pkt_size:%u need_resend:%u)",
1404                                 this, seq, (uint)pkt->payload, pkt->need_resend);
1405                return 2;
1406        }
1407
1408        LOG_UTPV("0x%08x: got ack for:%u (pkt_size:%u need_resend:%u)",
1409                         this, seq, (uint)pkt->payload, pkt->need_resend);
1410
1411        outbuf.put(seq, NULL);
1412
1413        // if we never re-sent the packet, update the RTT estimate
1414        if (pkt->transmissions == 1) {
1415                // Estimate the round trip time.
1416                const uint32 ertt = (uint32)((UTP_GetMicroseconds() - pkt->time_sent) / 1000);
1417                if (rtt == 0) {
1418                        // First round trip time sample
1419                        rtt = ertt;
1420                        rtt_var = ertt / 2;
1421                        // sanity check. rtt should never be more than 6 seconds
1422//                      assert(rtt < 6000);
1423                } else {
1424                        // Compute new round trip times
1425                        const int delta = (int)rtt - ertt;
1426                        rtt_var = rtt_var + (int)(abs(delta) - rtt_var) / 4;
1427                        rtt = rtt - rtt/8 + ertt/8;
1428                        // sanity check. rtt should never be more than 6 seconds
1429//                      assert(rtt < 6000);
1430                        rtt_hist.add_sample(ertt);
1431                }
1432                rto = max<uint>(rtt + rtt_var * 4, 500);
1433                LOG_UTPV("0x%08x: rtt:%u avg:%u var:%u rto:%u",
1434                                 this, ertt, rtt, rtt_var, rto);
1435        }
1436        retransmit_timeout = rto;
1437        rto_timeout = g_current_ms + rto;
1438        // if need_resend is set, this packet has already
1439        // been considered timed-out, and is not included in
1440        // the cur_window anymore
1441        if (!pkt->need_resend) {
1442                assert(cur_window >= pkt->payload);
1443                cur_window -= pkt->payload;
1444        }
1445        free(pkt);
1446        return 0;
1447}
1448
1449// count the number of bytes that were acked by the EACK header
1450size_t UTPSocket::selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt)
1451{
1452        if (cur_window_packets == 0) return 0;
1453
1454        size_t acked_bytes = 0;
1455        int bits = len * 8;
1456
1457        do {
1458                uint v = base + bits;
1459
1460                // ignore bits that haven't been sent yet
1461                // see comment in UTPSocket::selective_ack
1462                if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1))
1463                        continue;
1464
1465                // ignore bits that represents packets we haven't sent yet
1466                // or packets that have already been acked
1467                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
1468                if (!pkt || pkt->transmissions == 0)
1469                        continue;
1470
1471                // Count the number of segments that were successfully received past it.
1472                if (bits >= 0 && mask[bits>>3] & (1 << (bits & 7))) {
1473                        assert((int)(pkt->payload) >= 0);
1474                        acked_bytes += pkt->payload;
1475                        min_rtt = min<int64>(min_rtt, UTP_GetMicroseconds() - pkt->time_sent);
1476                        continue;
1477                }
1478        } while (--bits >= -1);
1479        return acked_bytes;
1480}
1481
1482void UTPSocket::selective_ack(uint base, const byte *mask, byte len)
1483{
1484        if (cur_window_packets == 0) return;
1485
1486        // the range is inclusive [0, 31] bits
1487        int bits = len * 8 - 1;
1488
1489        int count = 0;
1490
1491        // resends is a stack of sequence numbers we need to resend. Since we
1492        // iterate in reverse over the acked packets, at the end, the top packets
1493        // are the ones we want to resend
1494        int resends[32];
1495        int nr = 0;
1496
1497        LOG_UTPV("0x%08x: Got EACK [%032b] base:%u", this, *(uint32*)mask, base);
1498        do {
1499                // we're iterating over the bits from higher sequence numbers
1500                // to lower (kind of in reverse order, wich might not be very
1501                // intuitive)
1502                uint v = base + bits;
1503
1504                // ignore bits that haven't been sent yet
1505                // and bits that fall below the ACKed sequence number
1506                // this can happen if an EACK message gets
1507                // reordered and arrives after a packet that ACKs up past
1508                // the base for thie EACK message
1509
1510                // this is essentially the same as:
1511                // if v >= seq_nr || v <= seq_nr - cur_window_packets
1512                // but it takes wrapping into account
1513
1514                // if v == seq_nr the -1 will make it wrap. if v > seq_nr
1515                // it will also wrap (since it will fall further below 0)
1516                // and be > cur_window_packets.
1517                // if v == seq_nr - cur_window_packets, the result will be
1518                // seq_nr - (seq_nr - cur_window_packets) - 1
1519                // == seq_nr - seq_nr + cur_window_packets - 1
1520                // == cur_window_packets - 1 which will be caught by the
1521                // test. If v < seq_nr - cur_window_packets the result will grow
1522                // fall furhter outside of the cur_window_packets range.
1523
1524                // sequence number space:
1525                //
1526                //     rejected <   accepted   > rejected
1527                // <============+--------------+============>
1528                //              ^              ^
1529                //              |              |
1530                //        (seq_nr-wnd)         seq_nr
1531
1532                if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1))
1533                        continue;
1534
1535                // this counts as a duplicate ack, even though we might have
1536                // received an ack for this packet previously (in another EACK
1537                // message for instance)
1538                bool bit_set = bits >= 0 && mask[bits>>3] & (1 << (bits & 7));
1539
1540                // if this packet is acked, it counts towards the duplicate ack counter
1541                if (bit_set) count++;
1542
1543                // ignore bits that represents packets we haven't sent yet
1544                // or packets that have already been acked
1545                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
1546                if (!pkt || pkt->transmissions == 0) {
1547                        LOG_UTPV("0x%08x: skipping %u. pkt:%08x transmissions:%u %s",
1548                                         this, v, pkt, pkt?pkt->transmissions:0, pkt?"(not sent yet?)":"(already acked?)");
1549                        continue;
1550                }
1551
1552                // Count the number of segments that were successfully received past it.
1553                if (bit_set) {
1554                        // the selective ack should never ACK the packet we're waiting for to decrement cur_window_packets
1555                        assert((v & outbuf.mask) != ((seq_nr - cur_window_packets) & outbuf.mask));
1556                        ack_packet(v);
1557                        continue;
1558                }
1559
1560                // Resend segments
1561                // if count is less than our re-send limit, we haven't seen enough
1562                // acked packets in front of this one to warrant a re-send.
1563                // if count == 0, we're still going through the tail of zeroes
1564                if (((v - fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE &&
1565                        count >= DUPLICATE_ACKS_BEFORE_RESEND &&
1566                        duplicate_ack < DUPLICATE_ACKS_BEFORE_RESEND) {
1567                        resends[nr++] = v;
1568                        LOG_UTPV("0x%08x: no ack for %u", this, v);
1569                } else {
1570                        LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u",
1571                                         this, v, count, duplicate_ack, fast_resend_seq_nr);
1572                }
1573        } while (--bits >= -1);
1574
1575        if (((base - 1 - fast_resend_seq_nr) & ACK_NR_MASK) < 256 &&
1576                count >= DUPLICATE_ACKS_BEFORE_RESEND &&
1577                duplicate_ack < DUPLICATE_ACKS_BEFORE_RESEND) {
1578                // if we get enough duplicate acks to start
1579                // resending, the first packet we should resend
1580                // is base-1
1581                resends[nr++] = base - 1;
1582        } else {
1583                LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u",
1584                                 this, base - 1, count, duplicate_ack, fast_resend_seq_nr);
1585        }
1586
1587        bool back_off = false;
1588        int i = 0;
1589        while (nr > 0) {
1590                uint v = resends[--nr];
1591                // don't consider the tail of 0:es to be lost packets
1592                // only unacked packets with acked packets after should
1593                // be considered lost
1594                OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
1595
1596                // this may be an old (re-ordered) packet, and some of the
1597                // packets in here may have been acked already. In which
1598                // case they will not be in the send queue anymore
1599                if (!pkt) continue;
1600
1601                // used in parse_log.py
1602                LOG_UTP("0x%08x: Packet %u lost. Resending", this, v);
1603
1604                // On Loss
1605                back_off = true;
1606#ifdef _DEBUG
1607                ++_stats._rexmit;
1608#endif
1609                send_packet(pkt);
1610                fast_resend_seq_nr = v + 1;
1611
1612                // Re-send max 4 packets.
1613                if (++i >= 4) break;
1614        }
1615
1616        if (back_off)
1617                maybe_decay_win();
1618
1619        duplicate_ack = count;
1620}
1621
1622void UTPSocket::apply_ledbat_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt)
1623{
1624        // the delay can never be greater than the rtt. The min_rtt
1625        // variable is the RTT in microseconds
1626       
1627        assert(min_rtt >= 0);
1628        int32 our_delay = min<uint32>(our_hist.get_value(), uint32(min_rtt));
1629        assert(our_delay != INT_MAX);
1630        assert(our_delay >= 0);
1631
1632        SOCKADDR_STORAGE sa = addr.get_sockaddr_storage();
1633        UTP_DelaySample((sockaddr*)&sa, our_delay / 1000);
1634
1635        // This test the connection under heavy load from foreground
1636        // traffic. Pretend that our delays are very high to force the
1637        // connection to use sub-packet size window sizes
1638        //our_delay *= 4;
1639
1640        // target is microseconds
1641        int target = CCONTROL_TARGET;
1642        if (target <= 0) target = 100000;
1643
1644        double off_target = target - our_delay;
1645
1646        // this is the same as:
1647        //
1648        //    (min(off_target, target) / target) * (bytes_acked / max_window) * MAX_CWND_INCREASE_BYTES_PER_RTT
1649        //
1650        // so, it's scaling the max increase by the fraction of the window this ack represents, and the fraction
1651        // of the target delay the current delay represents.
1652        // The min() around off_target protects against crazy values of our_delay, which may happen when th
1653        // timestamps wraps, or by just having a malicious peer sending garbage. This caps the increase
1654        // of the window size to MAX_CWND_INCREASE_BYTES_PER_RTT per rtt.
1655        // as for large negative numbers, this direction is already capped at the min packet size further down
1656        // the min around the bytes_acked protects against the case where the window size was recently
1657        // shrunk and the number of acked bytes exceeds that. This is considered no more than one full
1658        // window, in order to keep the gain within sane boundries.
1659
1660        assert(bytes_acked > 0);
1661        double window_factor = (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked);
1662        double delay_factor = off_target / target;
1663        double scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT * window_factor * delay_factor;
1664
1665        // since MAX_CWND_INCREASE_BYTES_PER_RTT is a cap on how much the window size (max_window)
1666        // may increase per RTT, we may not increase the window size more than that proportional
1667        // to the number of bytes that were acked, so that once one window has been acked (one rtt)
1668        // the increase limit is not exceeded
1669        // the +1. is to allow for floating point imprecision
1670        assert(scaled_gain <= 1. + MAX_CWND_INCREASE_BYTES_PER_RTT * (int)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked));
1671
1672        if (scaled_gain > 0 && g_current_ms - last_maxed_out_window > 300) {
1673                // if it was more than 300 milliseconds since we tried to send a packet
1674                // and stopped because we hit the max window, we're most likely rate
1675                // limited (which prevents us from ever hitting the window size)
1676                // if this is the case, we cannot let the max_window grow indefinitely
1677                scaled_gain = 0;
1678        }
1679
1680        if (scaled_gain + max_window < MIN_WINDOW_SIZE) {
1681                max_window = MIN_WINDOW_SIZE;
1682        } else {
1683                max_window = (size_t)(max_window + scaled_gain);
1684        }
1685
1686        // make sure that the congestion window is below max
1687        // make sure that we don't shrink our window too small
1688        max_window = clamp<size_t>(max_window, MIN_WINDOW_SIZE, opt_sndbuf);
1689
1690        // used in parse_log.py
1691        LOG_UTP("0x%08x: actual_delay:%u our_delay:%d their_delay:%u off_target:%d max_window:%u "
1692                        "delay_base:%u delay_sum:%d target_delay:%d acked_bytes:%u cur_window:%u "
1693                        "scaled_gain:%f rtt:%u rate:%u quota:%d wnduser:%u rto:%u timeout:%d get_microseconds:"I64u" "
1694                        "cur_window_packets:%u packet_size:%u their_delay_base:%u their_actual_delay:%u",
1695                        this, actual_delay, our_delay / 1000, their_hist.get_value() / 1000,
1696                        (int)off_target / 1000, (uint)(max_window),  our_hist.delay_base,
1697                        (our_delay + their_hist.get_value()) / 1000, target / 1000, (uint)bytes_acked,
1698                        (uint)(cur_window - bytes_acked), (float)(scaled_gain), rtt,
1699                        (uint)(max_window * 1000 / (rtt_hist.delay_base?rtt_hist.delay_base:50)),
1700                        send_quota / 100, (uint)max_window_user, rto, (int)(rto_timeout - g_current_ms),
1701                        UTP_GetMicroseconds(), cur_window_packets, (uint)get_packet_size(),
1702                        their_hist.delay_base, their_hist.delay_base + their_hist.get_value());
1703}
1704
1705static void UTP_RegisterRecvPacket(UTPSocket *conn, size_t len)
1706{
1707#ifdef _DEBUG
1708        ++conn->_stats._nrecv;
1709        conn->_stats._nbytes_recv += len;
1710#endif
1711
1712        if (len <= PACKET_SIZE_MID) {
1713                if (len <= PACKET_SIZE_EMPTY) {
1714                        _global_stats._nraw_recv[PACKET_SIZE_EMPTY_BUCKET]++;
1715                } else if (len <= PACKET_SIZE_SMALL) {
1716                        _global_stats._nraw_recv[PACKET_SIZE_SMALL_BUCKET]++;
1717                } else 
1718                        _global_stats._nraw_recv[PACKET_SIZE_MID_BUCKET]++;
1719        } else {
1720                if (len <= PACKET_SIZE_BIG) {
1721                        _global_stats._nraw_recv[PACKET_SIZE_BIG_BUCKET]++;
1722                } else 
1723                        _global_stats._nraw_recv[PACKET_SIZE_HUGE_BUCKET]++;
1724        }
1725}
1726
1727// returns the max number of bytes of payload the uTP
1728// connection is allowed to send
1729size_t UTPSocket::get_packet_size()
1730{
1731        int header_size = version == 1
1732                ? sizeof(PacketFormatV1)
1733                : sizeof(PacketFormat);
1734
1735        size_t mtu = get_udp_mtu();
1736
1737        if (DYNAMIC_PACKET_SIZE_ENABLED) {
1738                SOCKADDR_STORAGE sa = addr.get_sockaddr_storage();
1739                size_t max_packet_size = UTP_GetPacketSize((sockaddr*)&sa);
1740                return min(mtu - header_size, max_packet_size);
1741        }
1742        else
1743        {
1744                return mtu - header_size;
1745        }
1746}
1747
1748// Process an incoming packet
1749// syn is true if this is the first packet received. It will cut off parsing
1750// as soon as the header is done
1751size_t UTP_ProcessIncoming(UTPSocket *conn, const byte *packet, size_t len, bool syn = false)
1752{
1753        UTP_RegisterRecvPacket(conn, len);
1754
1755        g_current_ms = UTP_GetMilliseconds();
1756
1757        conn->update_send_quota();
1758
1759        const PacketFormat *pf = (PacketFormat*)packet;
1760        const PacketFormatV1 *pf1 = (PacketFormatV1*)packet;
1761        const byte *packet_end = packet + len;
1762
1763        uint16 pk_seq_nr;
1764        uint16 pk_ack_nr;
1765        uint8 pk_flags;
1766        if (conn->version == 0) {
1767                pk_seq_nr = pf->seq_nr;
1768                pk_ack_nr = pf->ack_nr;
1769                pk_flags = pf->flags;
1770        } else {
1771                pk_seq_nr = pf1->seq_nr;
1772                pk_ack_nr = pf1->ack_nr;
1773                pk_flags = pf1->type();
1774        }
1775
1776        if (pk_flags >= ST_NUM_STATES) return 0;
1777
1778        LOG_UTPV("0x%08x: Got %s. seq_nr:%u ack_nr:%u state:%s version:%u timestamp:"I64u" reply_micro:%u",
1779                         conn, flagnames[pk_flags], pk_seq_nr, pk_ack_nr, statenames[conn->state], conn->version,
1780                         conn->version == 0?(uint64)(pf->tv_sec) * 1000000 + pf->tv_usec:uint64(pf1->tv_usec),
1781                         conn->version == 0?(uint32)(pf->reply_micro):(uint32)(pf1->reply_micro));
1782
1783        // mark receipt time
1784        uint64 time = UTP_GetMicroseconds();
1785
1786        // RSTs are handled earlier, since the connid matches the send id not the recv id
1787        assert(pk_flags != ST_RESET);
1788
1789        // TODO: maybe send a ST_RESET if we're in CS_RESET?
1790
1791        const byte *selack_ptr = NULL;
1792
1793        // Unpack UTP packet options
1794        // Data pointer
1795        const byte *data = (const byte*)pf + conn->get_header_size();
1796        if (conn->get_header_size() > len) {
1797                LOG_UTPV("0x%08x: Invalid packet size (less than header size)", conn);
1798                return 0;
1799        }
1800        // Skip the extension headers
1801        uint extension = conn->version == 0 ? pf->ext : pf1->ext;
1802        if (extension != 0) {
1803                do {
1804                        // Verify that the packet is valid.
1805                        data += 2;
1806
1807                        if ((int)(packet_end - data) < 0 || (int)(packet_end - data) < data[-1]) {
1808                                LOG_UTPV("0x%08x: Invalid len of extensions", conn);
1809                                return 0;
1810                        }
1811
1812                        switch(extension) {
1813                        case 1: // Selective Acknowledgment
1814                                selack_ptr = data;
1815                                break;
1816                        case 2: // extension bits
1817                                if (data[-1] != 8) {
1818                                        LOG_UTPV("0x%08x: Invalid len of extension bits header", conn);
1819                                        return 0;
1820                                }
1821                                memcpy(conn->extensions, data, 8);
1822                                LOG_UTPV("0x%08x: got extension bits:%02x%02x%02x%02x%02x%02x%02x%02x", conn,
1823                                        conn->extensions[0], conn->extensions[1], conn->extensions[2], conn->extensions[3],
1824                                        conn->extensions[4], conn->extensions[5], conn->extensions[6], conn->extensions[7]);
1825                        }
1826                        extension = data[-2];
1827                        data += data[-1];
1828                } while (extension);
1829        }
1830
1831        if (conn->state == CS_SYN_SENT) {
1832                // if this is a syn-ack, initialize our ack_nr
1833                // to match the sequence number we got from
1834                // the other end
1835                conn->ack_nr = (pk_seq_nr - 1) & SEQ_NR_MASK;
1836        }
1837
1838        g_current_ms = UTP_GetMilliseconds();
1839        conn->last_got_packet = g_current_ms;
1840
1841        if (syn) {
1842                return 0;
1843        }
1844
1845        // seqnr is the number of packets past the expected
1846        // packet this is. ack_nr is the last acked, seq_nr is the
1847        // current. Subtracring 1 makes 0 mean "this is the next
1848        // expected packet".
1849        const uint seqnr = (pk_seq_nr - conn->ack_nr - 1) & SEQ_NR_MASK;
1850
1851        // Getting an invalid sequence number?
1852        if (seqnr >= REORDER_BUFFER_MAX_SIZE) {
1853                if (seqnr >= (SEQ_NR_MASK + 1) - REORDER_BUFFER_MAX_SIZE && pk_flags != ST_STATE) {
1854                        conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD);
1855                }
1856                LOG_UTPV("    Got old Packet/Ack (%u/%u)=%u!", pk_seq_nr, conn->ack_nr, seqnr);
1857                return 0;
1858        }
1859
1860        // Process acknowledgment
1861        // acks is the number of packets that was acked
1862        int acks = (pk_ack_nr - (conn->seq_nr - 1 - conn->cur_window_packets)) & ACK_NR_MASK;
1863
1864        // this happens when we receive an old ack nr
1865        if (acks > conn->cur_window_packets) acks = 0;
1866
1867        // if we get the same ack_nr as in the last packet
1868        // increase the duplicate_ack counter, otherwise reset
1869        // it to 0
1870        if (conn->cur_window_packets > 0) {
1871                if (pk_ack_nr == ((conn->seq_nr - conn->cur_window_packets - 1) & ACK_NR_MASK) &&
1872                        conn->cur_window_packets > 0) {
1873                        //++conn->duplicate_ack;
1874                } else {
1875                        conn->duplicate_ack = 0;
1876                }
1877
1878                // TODO: if duplicate_ack == DUPLICATE_ACK_BEFORE_RESEND
1879                // and fast_resend_seq_nr <= ack_nr + 1
1880                //    resend ack_nr + 1
1881        }
1882
1883        // figure out how many bytes were acked
1884        size_t acked_bytes = 0;
1885
1886        // the minimum rtt of all acks
1887        // this is the upper limit on the delay we get back
1888        // from the other peer. Our delay cannot exceed
1889        // the rtt of the packet. If it does, clamp it.
1890        // this is done in apply_ledbat_ccontrol()
1891        int64 min_rtt = INT64_MAX;
1892
1893        for (int i = 0; i < acks; ++i) {
1894                int seq = conn->seq_nr - conn->cur_window_packets + i;
1895                OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(seq);
1896                if (pkt == 0 || pkt->transmissions == 0) continue;
1897                assert((int)(pkt->payload) >= 0);
1898                acked_bytes += pkt->payload;
1899                min_rtt = min<int64>(min_rtt, UTP_GetMicroseconds() - pkt->time_sent);
1900        }
1901       
1902        // count bytes acked by EACK
1903        if (selack_ptr != NULL) {
1904                acked_bytes += conn->selective_ack_bytes((pk_ack_nr + 2) & ACK_NR_MASK,
1905                                                                                                 selack_ptr, selack_ptr[-1], min_rtt);
1906        }
1907
1908        LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%d cur_window:%u cur_window_packets:%u relative_seqnr:%u max_window:%u min_rtt:%u rtt:%u",
1909                         conn, acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets,
1910                         seqnr, (uint)conn->max_window, (uint)(min_rtt / 1000), conn->rtt);
1911
1912        uint64 p;
1913
1914        if (conn->version == 0) {
1915                p = uint64(pf->tv_sec) * 1000000 + pf->tv_usec;
1916        } else {
1917                p = pf1->tv_usec;
1918        }
1919
1920        conn->last_measured_delay = g_current_ms;
1921
1922        // get delay in both directions
1923        // record the delay to report back
1924        const uint32 their_delay = (uint32)(p == 0 ? 0 : time - p);
1925        conn->reply_micro = their_delay;
1926        uint32 prev_delay_base = conn->their_hist.delay_base;
1927        if (their_delay != 0) conn->their_hist.add_sample(their_delay);
1928
1929        // if their new delay base is less than their previous one
1930        // we should shift our delay base in the other direction in order
1931        // to take the clock skew into account
1932        if (prev_delay_base != 0 &&
1933                wrapping_compare_less(conn->their_hist.delay_base, prev_delay_base)) {
1934                // never adjust more than 10 milliseconds
1935                if (prev_delay_base - conn->their_hist.delay_base <= 10000) {
1936                        conn->our_hist.shift(prev_delay_base - conn->their_hist.delay_base);
1937                }
1938        }
1939
1940        const uint32 actual_delay = conn->version==0
1941                ?(pf->reply_micro==INT_MAX?0:uint32(pf->reply_micro))
1942                :(uint32(pf1->reply_micro)==INT_MAX?0:uint32(pf1->reply_micro));
1943
1944        // if the actual delay is 0, it means the other end
1945        // hasn't received a sample from us yet, and doesn't
1946        // know what it is. We can't update out history unless
1947        // we have a true measured sample
1948        prev_delay_base = conn->our_hist.delay_base;
1949        if (actual_delay != 0) conn->our_hist.add_sample(actual_delay);
1950
1951        // if our new delay base is less than our previous one
1952        // we should shift the other end's delay base in the other
1953        // direction in order to take the clock skew into account
1954        // This is commented out because it creates bad interactions
1955        // with our adjustment in the other direction. We don't really
1956        // need our estimates of the other peer to be very accurate
1957        // anyway. The problem with shifting here is that we're more
1958        // likely shift it back later because of a low latency. This
1959        // second shift back would cause us to shift our delay base
1960        // which then get's into a death spiral of shifting delay bases
1961/*      if (prev_delay_base != 0 &&
1962                wrapping_compare_less(conn->our_hist.delay_base, prev_delay_base)) {
1963                // never adjust more than 10 milliseconds
1964                if (prev_delay_base - conn->our_hist.delay_base <= 10000) {
1965                        conn->their_hist.Shift(prev_delay_base - conn->our_hist.delay_base);
1966                }
1967        }
1968*/
1969
1970        // if the delay estimate exceeds the RTT, adjust the base_delay to
1971        // compensate
1972        if (conn->our_hist.get_value() > uint32(min_rtt)) {
1973                conn->our_hist.shift(conn->our_hist.get_value() - min_rtt);
1974        }
1975
1976        // only apply the congestion controller on acks
1977        // if we don't have a delay measurement, there's
1978        // no point in invoking the congestion control
1979        if (actual_delay != 0 && acked_bytes >= 1)
1980                conn->apply_ledbat_ccontrol(acked_bytes, actual_delay, min_rtt);
1981
1982        // sanity check, the other end should never ack packets
1983        // past the point we've sent
1984        if (acks <= conn->cur_window_packets) {
1985                conn->max_window_user = conn->version == 0
1986                        ? pf->windowsize * PACKET_SIZE : pf1->windowsize;
1987
1988                // If max user window is set to 0, then we startup a timer
1989                // That will reset it to 1 after 15 seconds.
1990                if (conn->max_window_user == 0)
1991                        // Reset max_window_user to 1 every 15 seconds.
1992                        conn->zerowindow_time = g_current_ms + 15000;
1993
1994                // Respond to connect message
1995                // Switch to CONNECTED state.
1996                if (conn->state == CS_SYN_SENT) {
1997                        conn->state = CS_CONNECTED;
1998                        conn->func.on_state(conn->userdata, UTP_STATE_CONNECT);
1999
2000                // We've sent a fin, and everything was ACKed (including the FIN),
2001                // it's safe to destroy the socket. cur_window_packets == acks
2002                // means that this packet acked all the remaining packets that
2003                // were in-flight.
2004                } else if (conn->state == CS_FIN_SENT && conn->cur_window_packets == acks) {
2005                        conn->state = CS_DESTROY;
2006                }
2007
2008                // Update fast resend counter
2009                if (wrapping_compare_less(conn->fast_resend_seq_nr, (pk_ack_nr + 1) & ACK_NR_MASK))
2010                        conn->fast_resend_seq_nr = pk_ack_nr + 1;
2011
2012                LOG_UTPV("0x%08x: fast_resend_seq_nr:%u", conn, conn->fast_resend_seq_nr);
2013
2014                for (int i = 0; i < acks; ++i) {
2015                        int ack_status = conn->ack_packet(conn->seq_nr - conn->cur_window_packets);
2016                        // if ack_status is 0, the packet was acked.
2017                        // if acl_stauts is 1, it means that the packet had already been acked
2018                        // if it's 2, the packet has not been sent yet
2019                        // We need to break this loop in the latter case. This could potentially
2020                        // happen if we get an ack_nr that does not exceed what we have stuffed
2021                        // into the outgoing buffer, but does exceed what we have sent
2022                        if (ack_status == 2) {
2023#ifdef _DEBUG
2024                                OutgoingPacket* pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets);
2025                                assert(pkt->transmissions == 0);
2026#endif
2027                                break;
2028                        }
2029                        conn->cur_window_packets--;
2030                }
2031#ifdef _DEBUG
2032                if (conn->cur_window_packets == 0) assert(conn->cur_window == 0);
2033#endif
2034
2035                // packets in front of this may have been acked by a
2036                // selective ack (EACK). Keep decreasing the window packet size
2037                // until we hit a packet that is still waiting to be acked
2038                // in the send queue
2039                // this is especially likely to happen when the other end
2040                // has the EACK send bug older versions of uTP had
2041                while (conn->cur_window_packets > 0 && !conn->outbuf.get(conn->seq_nr - conn->cur_window_packets))
2042                        conn->cur_window_packets--;
2043
2044#ifdef _DEBUG
2045                if (conn->cur_window_packets == 0) assert(conn->cur_window == 0);
2046#endif
2047
2048                // this invariant should always be true
2049                assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets));
2050
2051                // flush Nagle
2052                if (conn->cur_window_packets == 1) {
2053                        OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - 1);
2054                        // do we still have quota?
2055                        if (pkt->transmissions == 0 &&
2056                                (!(USE_PACKET_PACING) || conn->send_quota / 100 >= (int32)pkt->length)) {
2057                                conn->send_packet(pkt);
2058
2059                                // No need to send another ack if there is nothing to reorder.
2060                                if (conn->reorder_count == 0) {
2061                                        conn->sent_ack();
2062                                }
2063                        }
2064                }
2065
2066                // Fast timeout-retry
2067                if (conn->fast_timeout) {
2068                        LOG_UTPV("Fast timeout %u,%u,%u?", (uint)conn->cur_window, conn->seq_nr - conn->timeout_seq_nr, conn->timeout_seq_nr);
2069                        // if the fast_resend_seq_nr is not pointing to the oldest outstanding packet, it suggests that we've already
2070                        // resent the packet that timed out, and we should leave the fast-timeout mode.
2071                        if (((conn->seq_nr - conn->cur_window_packets) & ACK_NR_MASK) != conn->fast_resend_seq_nr) {
2072                                conn->fast_timeout = false;
2073                        } else {
2074                                // resend the oldest packet and increment fast_resend_seq_nr
2075                                // to not allow another fast resend on it again
2076                                OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets);
2077                                if (pkt && pkt->transmissions > 0) {
2078                                        LOG_UTPV("0x%08x: Packet %u fast timeout-retry.", conn, conn->seq_nr - conn->cur_window_packets);
2079#ifdef _DEBUG
2080                                        ++conn->_stats._fastrexmit;
2081#endif
2082                                        conn->fast_resend_seq_nr++;
2083                                        conn->send_packet(pkt);
2084                                }
2085                        }
2086                }
2087        }
2088
2089        // Process selective acknowledgent
2090        if (selack_ptr != NULL) {
2091                conn->selective_ack(pk_ack_nr + 2, selack_ptr, selack_ptr[-1]);
2092        }
2093
2094        // this invariant should always be true
2095        assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets));
2096
2097        LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%u cur_window:%u cur_window_packets:%u quota:%d",
2098                         conn, acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets,
2099                         conn->send_quota / 100);
2100
2101        // In case the ack dropped the current window below
2102        // the max_window size, Mark the socket as writable
2103        if (conn->state == CS_CONNECTED_FULL && conn->is_writable(conn->get_packet_size())) {
2104                conn->state = CS_CONNECTED;
2105                LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u",
2106                                 conn, (uint)conn->max_window, (uint)conn->cur_window, conn->send_quota / 100, (uint)conn->get_packet_size());
2107                conn->func.on_state(conn->userdata, UTP_STATE_WRITABLE);
2108        }
2109
2110        if (pk_flags == ST_STATE) {
2111                // This is a state packet only.
2112                return 0;
2113        }
2114
2115        // The connection is not in a state that can accept data?
2116        if (conn->state != CS_CONNECTED &&
2117                conn->state != CS_CONNECTED_FULL &&
2118                conn->state != CS_FIN_SENT) {
2119                return 0;
2120        }
2121
2122        // Is this a finalize packet?
2123        if (pk_flags == ST_FIN && !conn->got_fin) {
2124                LOG_UTPV("Got FIN eof_pkt:%u", pk_seq_nr);
2125                conn->got_fin = true;
2126                conn->eof_pkt = pk_seq_nr;
2127                // at this point, it is possible for the
2128                // other end to have sent packets with
2129                // sequence numbers higher than seq_nr.
2130                // if this is the case, our reorder_count
2131                // is out of sync. This case is dealt with
2132                // when we re-order and hit the eof_pkt.
2133                // we'll just ignore any packets with
2134                // sequence numbers past this
2135        }
2136
2137        // Getting an in-order packet?
2138        if (seqnr == 0) {
2139                size_t count = packet_end - data;
2140                if (count > 0 && conn->state != CS_FIN_SENT) {
2141                        LOG_UTPV("0x%08x: Got Data len:%u (rb:%u)", conn, (uint)count, (uint)conn->func.get_rb_size(conn->userdata));
2142                        // Post bytes to the upper layer
2143                        conn->func.on_read(conn->userdata, data, count);
2144                }
2145                conn->ack_nr++;
2146                conn->bytes_since_ack += count;
2147
2148                // Check if the next packet has been received too, but waiting
2149                // in the reorder buffer.
2150                for (;;) {
2151
2152                        if (conn->got_fin && conn->eof_pkt == conn->ack_nr) {
2153                                if (conn->state != CS_FIN_SENT) {
2154                                        conn->state = CS_GOT_FIN;
2155                                        conn->rto_timeout = g_current_ms + min<uint>(conn->rto * 3, 60);
2156
2157                                        LOG_UTPV("0x%08x: Posting EOF", conn);
2158                                        conn->func.on_state(conn->userdata, UTP_STATE_EOF);
2159                                }
2160
2161                                // if the other end wants to close, ack immediately
2162                                conn->send_ack();
2163
2164                                // reorder_count is not necessarily 0 at this point.
2165                                // even though it is most of the time, the other end
2166                                // may have sent packets with higher sequence numbers
2167                                // than what later end up being eof_pkt
2168                                // since we have received all packets up to eof_pkt
2169                                // just ignore the ones after it.
2170                                conn->reorder_count = 0;
2171                        }
2172
2173                        // Quick get-out in case there is nothing to reorder
2174                        if (conn->reorder_count == 0)
2175                                break;
2176
2177                        // Check if there are additional buffers in the reorder buffers
2178                        // that need delivery.
2179                        byte *p = (byte*)conn->inbuf.get(conn->ack_nr+1);
2180                        if (p == NULL)
2181                                break;
2182                        conn->inbuf.put(conn->ack_nr+1, NULL);
2183                        count = *(uint*)p;
2184                        if (count > 0 && conn->state != CS_FIN_SENT) {
2185                                // Pass the bytes to the upper layer
2186                                conn->func.on_read(conn->userdata, p + sizeof(uint), count);
2187                        }
2188                        conn->ack_nr++;
2189                        conn->bytes_since_ack += count;
2190
2191                        // Free the element from the reorder buffer
2192                        free(p);
2193                        assert(conn->reorder_count > 0);
2194                        conn->reorder_count--;
2195                }
2196
2197                // start the delayed ACK timer
2198                conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD);
2199        } else {
2200                // Getting an out of order packet.
2201                // The packet needs to be remembered and rearranged later.
2202
2203                // if we have received a FIN packet, and the EOF-sequence number
2204                // is lower than the sequence number of the packet we just received
2205                // something is wrong.
2206                if (conn->got_fin && pk_seq_nr > conn->eof_pkt) {
2207                        LOG_UTPV("0x%08x: Got an invalid packet sequence number, past EOF "
2208                                "reorder_count:%u len:%u (rb:%u)",
2209                                conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata));
2210                        return 0;
2211                }
2212
2213                // if the sequence number is entirely off the expected
2214                // one, just drop it. We can't allocate buffer space in
2215                // the inbuf entirely based on untrusted input
2216                if (seqnr > 0x3ff) {
2217                        LOG_UTPV("0x%08x: Got an invalid packet sequence number, too far off "
2218                                "reorder_count:%u len:%u (rb:%u)",
2219                                conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata));
2220                        return 0;
2221                }
2222
2223                // we need to grow the circle buffer before we
2224                // check if the packet is already in here, so that
2225                // we don't end up looking at an older packet (since
2226                // the indices wraps around).
2227                conn->inbuf.ensure_size(pk_seq_nr + 1, seqnr + 1);
2228
2229                // Has this packet already been received? (i.e. a duplicate)
2230                // If that is the case, just discard it.
2231                if (conn->inbuf.get(pk_seq_nr) != NULL) {
2232#ifdef _DEBUG
2233                        ++conn->_stats._nduprecv;
2234#endif
2235                        return 0;
2236                }
2237
2238                // Allocate memory to fit the packet that needs to re-ordered
2239                byte *mem = (byte*)malloc((packet_end - data) + sizeof(uint));
2240                *(uint*)mem = (uint)(packet_end - data);
2241                memcpy(mem + sizeof(uint), data, packet_end - data);
2242
2243                // Insert into reorder buffer and increment the count
2244                // of # of packets to be reordered.
2245                // we add one to seqnr in order to leave the last
2246                // entry empty, that way the assert in send_ack
2247                // is valid. we have to add one to seqnr too, in order
2248                // to make the circular buffer grow around the correct
2249                // point (which is conn->ack_nr + 1).
2250                assert(conn->inbuf.get(pk_seq_nr) == NULL);
2251                assert((pk_seq_nr & conn->inbuf.mask) != ((conn->ack_nr+1) & conn->inbuf.mask));
2252                conn->inbuf.put(pk_seq_nr, mem);
2253                conn->reorder_count++;
2254
2255                LOG_UTPV("0x%08x: Got out of order data reorder_count:%u len:%u (rb:%u)",
2256                        conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata));
2257
2258                // Setup so the partial ACK message will get sent immediately.
2259                conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, 1);
2260        }
2261
2262        // If ack_time or ack_bytes indicate that we need to send and ack, send one
2263        // here instead of waiting for the timer to trigger
2264        LOG_UTPV("bytes_since_ack:%u ack_time:%d",
2265                         (uint)conn->bytes_since_ack, (int)(g_current_ms - conn->ack_time));
2266        if (conn->state == CS_CONNECTED || conn->state == CS_CONNECTED_FULL) {
2267                if (conn->bytes_since_ack > DELAYED_ACK_BYTE_THRESHOLD ||
2268                        (int)(g_current_ms - conn->ack_time) >= 0) {
2269                        conn->send_ack();
2270                }
2271        }
2272        return (size_t)(packet_end - data);
2273}
2274
2275inline bool UTP_IsV1(PacketFormatV1 const* pf)
2276{
2277        return pf->version() == 1 && pf->type() < ST_NUM_STATES && pf->ext < 3;
2278}
2279
2280void UTP_Free(UTPSocket *conn)
2281{
2282        LOG_UTPV("0x%08x: Killing socket", conn);
2283
2284        conn->func.on_state(conn->userdata, UTP_STATE_DESTROYING);
2285        UTP_SetCallbacks(conn, NULL, NULL);
2286
2287        assert(conn->idx < g_utp_sockets.GetCount());
2288        assert(g_utp_sockets[conn->idx] == conn);
2289
2290        // Unlink object from the global list
2291        assert(g_utp_sockets.GetCount() > 0);
2292
2293        UTPSocket *last = g_utp_sockets[g_utp_sockets.GetCount() - 1];
2294
2295        assert(last->idx < g_utp_sockets.GetCount());
2296        assert(g_utp_sockets[last->idx] == last);
2297
2298        last->idx = conn->idx;
2299       
2300        g_utp_sockets[conn->idx] = last;
2301
2302        // Decrease the count
2303        g_utp_sockets.SetCount(g_utp_sockets.GetCount() - 1);
2304
2305        // Free all memory occupied by the socket object.
2306        for (size_t i = 0; i <= conn->inbuf.mask; i++) {
2307                free(conn->inbuf.elements[i]);
2308        }
2309        for (size_t i = 0; i <= conn->outbuf.mask; i++) {
2310                free(conn->outbuf.elements[i]);
2311        }
2312        free(conn->inbuf.elements);
2313        free(conn->outbuf.elements);
2314
2315        // Finally free the socket object
2316        free(conn);
2317}
2318
2319
2320// Public functions:
2321///////////////////////////////////////////////////////////////////////////////
2322
2323// Create a UTP socket
2324UTPSocket *UTP_Create(SendToProc *send_to_proc, void *send_to_userdata, const struct sockaddr *addr, socklen_t addrlen)
2325{
2326        UTPSocket *conn = (UTPSocket*)calloc(1, sizeof(UTPSocket));
2327
2328        g_current_ms = UTP_GetMilliseconds();
2329
2330        UTP_SetCallbacks(conn, NULL, NULL);
2331        conn->our_hist.clear();
2332        conn->their_hist.clear();
2333        conn->rto = 3000;
2334        conn->rtt_var = 800;
2335        conn->seq_nr = 1;
2336        conn->ack_nr = 0;
2337        conn->max_window_user = 255 * PACKET_SIZE;
2338        conn->addr = PackedSockAddr((const SOCKADDR_STORAGE*)addr, addrlen);
2339        conn->send_to_proc = send_to_proc;
2340        conn->send_to_userdata = send_to_userdata;
2341        conn->ack_time = g_current_ms + 0x70000000;
2342        conn->last_got_packet = g_current_ms;
2343        conn->last_sent_packet = g_current_ms;
2344        conn->last_measured_delay = g_current_ms + 0x70000000;
2345        conn->last_rwin_decay = int32(g_current_ms) - MAX_WINDOW_DECAY;
2346        conn->last_send_quota = g_current_ms;
2347        conn->send_quota = PACKET_SIZE * 100;
2348        conn->cur_window_packets = 0;
2349        conn->fast_resend_seq_nr = conn->seq_nr;
2350
2351        // default to version 1
2352        UTP_SetSockopt(conn, SO_UTPVERSION, 1);
2353
2354        // we need to fit one packet in the window
2355        // when we start the connection
2356        conn->max_window = conn->get_packet_size();
2357        conn->state = CS_IDLE;
2358
2359        conn->outbuf.mask = 15;
2360        conn->inbuf.mask = 15;
2361
2362        conn->outbuf.elements = (void**)calloc(16, sizeof(void*));
2363        conn->inbuf.elements = (void**)calloc(16, sizeof(void*));
2364
2365        conn->idx = g_utp_sockets.Append(conn);
2366
2367        LOG_UTPV("0x%08x: UTP_Create", conn);
2368
2369        return conn;
2370}
2371
2372void UTP_SetCallbacks(UTPSocket *conn, UTPFunctionTable *funcs, void *userdata)
2373{
2374        assert(conn);
2375
2376        if (funcs == NULL) {
2377                funcs = &zero_funcs;
2378        }
2379        conn->func = *funcs;
2380        conn->userdata = userdata;
2381}
2382
2383bool UTP_SetSockopt(UTPSocket* conn, int opt, int val)
2384{
2385        assert(conn);
2386
2387        switch (opt) {
2388        case SO_SNDBUF:
2389                assert(val >= 1);
2390                conn->opt_sndbuf = val;
2391                return true;
2392        case SO_RCVBUF:
2393                conn->opt_rcvbuf = val;
2394                return true;
2395        case SO_UTPVERSION:
2396                assert(conn->state == CS_IDLE);
2397                if (conn->state != CS_IDLE) {
2398                        // too late
2399                        return false;
2400                }
2401                if (conn->version == 1 && val == 0) {
2402                        conn->reply_micro = INT_MAX;
2403                        conn->opt_rcvbuf = 200 * 1024;
2404                        conn->opt_sndbuf = OUTGOING_BUFFER_MAX_SIZE * PACKET_SIZE;
2405                } else if (conn->version == 0 && val == 1) {
2406                        conn->reply_micro = 0;
2407                        conn->opt_rcvbuf = 3 * 1024 * 1024 + 512 * 1024;
2408                        conn->opt_sndbuf = conn->opt_rcvbuf;
2409                }
2410                conn->version = val;
2411                return true;
2412        }
2413
2414        return false;
2415}
2416
2417// Try to connect to a specified host.
2418// 'initial' is the number of data bytes to send in the connect packet.
2419void UTP_Connect(UTPSocket *conn)
2420{
2421        assert(conn);
2422
2423        assert(conn->state == CS_IDLE);
2424        assert(conn->cur_window_packets == 0);
2425        assert(conn->outbuf.get(conn->seq_nr) == NULL);
2426        assert(sizeof(PacketFormatV1) == 20);
2427
2428        conn->state = CS_SYN_SENT;
2429
2430        g_current_ms = UTP_GetMilliseconds();
2431
2432        // Create and send a connect message
2433        uint32 conn_seed = UTP_Random();
2434
2435        // we identify newer versions by setting the
2436        // first two bytes to 0x0001
2437        if (conn->version > 0) {
2438                conn_seed &= 0xffff;
2439        }
2440
2441        // used in parse_log.py
2442        LOG_UTP("0x%08x: UTP_Connect conn_seed:%u packet_size:%u (B) "
2443                        "target_delay:%u (ms) delay_history:%u "
2444                        "delay_base_history:%u (minutes)",
2445                        conn, conn_seed, PACKET_SIZE, CCONTROL_TARGET / 1000,
2446                        CUR_DELAY_SIZE, DELAY_BASE_HISTORY);
2447
2448        // Setup initial timeout timer.
2449        conn->retransmit_timeout = 3000;
2450        conn->rto_timeout = g_current_ms + conn->retransmit_timeout;
2451        conn->last_rcv_win = conn->get_rcv_window();
2452
2453        conn->conn_seed = conn_seed;
2454        conn->conn_id_recv = conn_seed;
2455        conn->conn_id_send = conn_seed+1;
2456        // if you need compatibiltiy with 1.8.1, use this. it increases attackability though.
2457        //conn->seq_nr = 1;
2458        conn->seq_nr = UTP_Random();
2459
2460        // Create the connect packet.
2461        const size_t header_ext_size = conn->get_header_extensions_size();
2462
2463        OutgoingPacket *pkt = (OutgoingPacket*)malloc(sizeof(OutgoingPacket) - 1 + header_ext_size);
2464
2465        PacketFormatExtensions* p = (PacketFormatExtensions*)pkt->data;
2466        PacketFormatExtensionsV1* p1 = (PacketFormatExtensionsV1*)pkt->data;
2467
2468        memset(p, 0, header_ext_size);
2469        // SYN packets are special, and have the receive ID in the connid field,
2470        // instead of conn_id_send.
2471        if (conn->version == 0) {
2472                p->pf.connid = conn->conn_id_recv;
2473                p->pf.ext = 2;
2474                p->pf.windowsize = (byte)DIV_ROUND_UP(conn->last_rcv_win, PACKET_SIZE);
2475                p->pf.seq_nr = conn->seq_nr;
2476                p->pf.flags = ST_SYN;
2477                p->ext_next = 0;
2478                p->ext_len = 8;
2479                memset(p->extensions, 0, 8);
2480        } else {
2481                p1->pf.set_version(1);
2482                p1->pf.set_type(ST_SYN);
2483                p1->pf.ext = 2;
2484                p1->pf.connid = conn->conn_id_recv;
2485                p1->pf.windowsize = (uint32)conn->last_rcv_win;
2486                p1->pf.seq_nr = conn->seq_nr;
2487                p1->ext_next = 0;
2488                p1->ext_len = 8;
2489                memset(p1->extensions, 0, 8);
2490        }
2491        pkt->transmissions = 0;
2492        pkt->length = header_ext_size;
2493        pkt->payload = 0;
2494
2495        //LOG_UTPV("0x%08x: Sending connect %s [%u].",
2496        //               conn, addrfmt(conn->addr, addrbuf), conn_seed);
2497
2498        // Remember the message in the outgoing queue.
2499        conn->outbuf.ensure_size(conn->seq_nr, conn->cur_window_packets);
2500        conn->outbuf.put(conn->seq_nr, pkt);
2501        conn->seq_nr++;
2502        conn->cur_window_packets++;
2503
2504        conn->send_packet(pkt);
2505}
2506
2507bool UTP_IsIncomingUTP(UTPGotIncomingConnection *incoming_proc,
2508                                           SendToProc *send_to_proc, void *send_to_userdata,
2509                                           const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
2510{
2511        const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
2512
2513        if (len < sizeof(PacketFormat) && len < sizeof(PacketFormatV1)) {
2514                LOG_UTPV("recv %s len:%u too small", addrfmt(addr, addrbuf), (uint)len);
2515                return false;
2516        }
2517
2518        const PacketFormat* p = (PacketFormat*)buffer;
2519        const PacketFormatV1* p1 = (PacketFormatV1*)buffer;
2520
2521        const byte version = UTP_IsV1(p1);
2522        const uint32 id = (version == 0) ? p->connid : uint32(p1->connid);
2523
2524        if (version == 0 && len < sizeof(PacketFormat)) {
2525                LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(addr, addrbuf), (uint)len, version);
2526                return false;
2527        }
2528
2529        if (version == 1 && len < sizeof(PacketFormatV1)) {
2530                LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(addr, addrbuf), (uint)len, version);
2531                return false;
2532        }
2533
2534        LOG_UTPV("recv %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, id);
2535
2536        const PacketFormat *pf = (PacketFormat*)p;
2537        const PacketFormatV1 *pf1 = (PacketFormatV1*)p;
2538
2539        if (version == 0) {
2540                LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf->seq_nr, (uint)pf->ack_nr);
2541        } else {
2542                LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf1->seq_nr, (uint)pf1->ack_nr);
2543        }
2544
2545        const byte flags = version == 0 ? pf->flags : pf1->type();
2546
2547        for (size_t i = 0; i < g_utp_sockets.GetCount(); i++) {
2548                UTPSocket *conn = g_utp_sockets[i];
2549                //LOG_UTPV("Examining UTPSocket %s for %s and (seed:%u s:%u r:%u) for %u",
2550                //              addrfmt(conn->addr, addrbuf), addrfmt(addr, addrbuf2), conn->conn_seed, conn->conn_id_send, conn->conn_id_recv, id);
2551                if (conn->addr != addr)
2552                        continue;
2553
2554                if (flags == ST_RESET && (conn->conn_id_send == id || conn->conn_id_recv == id)) {
2555                        LOG_UTPV("0x%08x: recv RST for existing connection", conn);
2556                        if (!conn->userdata || conn->state == CS_FIN_SENT) {
2557                                conn->state = CS_DESTROY;
2558                        } else {
2559                                conn->state = CS_RESET;
2560                        }
2561                        if (conn->userdata) {
2562                                conn->func.on_overhead(conn->userdata, false, len + conn->get_udp_overhead(),
2563                                                                           close_overhead);
2564                                const int err = conn->state == CS_SYN_SENT ?
2565                                        ECONNREFUSED :
2566                                        ECONNRESET;
2567                                conn->func.on_error(conn->userdata, err);
2568                        }
2569                        return true;
2570                } else if (flags != ST_SYN && conn->conn_id_recv == id) {
2571                        LOG_UTPV("0x%08x: recv processing", conn);
2572                        const size_t read = UTP_ProcessIncoming(conn, buffer, len);
2573                        if (conn->userdata) {
2574                                conn->func.on_overhead(conn->userdata, false,
2575                                        (len - read) + conn->get_udp_overhead(),
2576                                        header_overhead);
2577                        }
2578                        return true;
2579                }
2580        }
2581
2582        if (flags == ST_RESET) {
2583                LOG_UTPV("recv RST for unknown connection");
2584                return true;
2585        }
2586
2587        const uint32 seq_nr = version == 0 ? pf->seq_nr : pf1->seq_nr;
2588        if (flags != ST_SYN) {
2589                for (size_t i = 0; i < g_rst_info.GetCount(); i++) {
2590                        if (g_rst_info[i].connid != id)
2591                                continue;
2592                        if (g_rst_info[i].addr != addr)
2593                                continue;
2594                        if (seq_nr != g_rst_info[i].ack_nr)
2595                                continue;
2596                        g_rst_info[i].timestamp = UTP_GetMilliseconds();
2597                        LOG_UTPV("recv not sending RST to non-SYN (stored)");
2598                        return true;
2599                }
2600                if (g_rst_info.GetCount() > RST_INFO_LIMIT) {
2601                        LOG_UTPV("recv not sending RST to non-SYN (limit at %u stored)", (uint)g_rst_info.GetCount());
2602                        return true;
2603                }
2604                LOG_UTPV("recv send RST to non-SYN (%u stored)", (uint)g_rst_info.GetCount());
2605                RST_Info &r = g_rst_info.Append();
2606                r.addr = addr;
2607                r.connid = id;
2608                r.ack_nr = seq_nr;
2609                r.timestamp = UTP_GetMilliseconds();
2610
2611                UTPSocket::send_rst(send_to_proc, send_to_userdata, addr, id, seq_nr, UTP_Random(), version);
2612                return true;
2613        }
2614
2615        if (incoming_proc) {
2616                LOG_UTPV("Incoming connection from %s uTP version:%u", addrfmt(addr, addrbuf), version);
2617
2618                // Create a new UTP socket to handle this new connection
2619                UTPSocket *conn = UTP_Create(send_to_proc, send_to_userdata, to, tolen);
2620                // Need to track this value to be able to detect duplicate CONNECTs
2621                conn->conn_seed = id;
2622                // This is value that identifies this connection for them.
2623                conn->conn_id_send = id;
2624                // This is value that identifies this connection for us.
2625                conn->conn_id_recv = id+1;
2626                conn->ack_nr = seq_nr;
2627                conn->seq_nr = UTP_Random();
2628                conn->fast_resend_seq_nr = conn->seq_nr;
2629
2630                UTP_SetSockopt(conn, SO_UTPVERSION, version);
2631                conn->state = CS_CONNECTED;
2632
2633                const size_t read = UTP_ProcessIncoming(conn, buffer, len, true);
2634
2635                LOG_UTPV("0x%08x: recv send connect ACK", conn);
2636                conn->send_ack(true);
2637
2638                incoming_proc(send_to_userdata, conn);
2639
2640                // we report overhead after incoming_proc, because the callbacks are setup now
2641                if (conn->userdata) {
2642                        // SYN
2643                        conn->func.on_overhead(conn->userdata, false, (len - read) + conn->get_udp_overhead(),
2644                                                                   header_overhead);
2645                        // SYNACK
2646                        conn->func.on_overhead(conn->userdata, true, conn->get_overhead(),
2647                                                                   ack_overhead);
2648                }
2649        }
2650
2651        return true;
2652}
2653
2654bool UTP_HandleICMP(const byte* buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
2655{
2656        const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
2657
2658        // Want the whole packet so we have connection ID
2659        if (len < sizeof(PacketFormat)) {
2660                return false;
2661        }
2662
2663        const PacketFormat* p = (PacketFormat*)buffer;
2664        const PacketFormatV1* p1 = (PacketFormatV1*)buffer;
2665
2666        const byte version = UTP_IsV1(p1);
2667        const uint32 id = (version == 0) ? p->connid : uint32(p1->connid);
2668
2669        for (size_t i = 0; i < g_utp_sockets.GetCount(); ++i) {
2670                UTPSocket *conn = g_utp_sockets[i];
2671                if (conn->addr == addr &&
2672                        conn->conn_id_recv == id) {
2673                        // Don't pass on errors for idle/closed connections
2674                        if (conn->state != CS_IDLE) {
2675                                if (!conn->userdata || conn->state == CS_FIN_SENT) {
2676                                        LOG_UTPV("0x%08x: icmp packet causing socket destruction", conn);
2677                                        conn->state = CS_DESTROY;
2678                                } else {
2679                                        conn->state = CS_RESET;
2680                                }
2681                                if (conn->userdata) {
2682                                        const int err = conn->state == CS_SYN_SENT ?
2683                                                ECONNREFUSED :
2684                                                ECONNRESET;
2685                                        LOG_UTPV("0x%08x: icmp packet causing error on socket:%d", conn, err);
2686                                        conn->func.on_error(conn->userdata, err);
2687                                }
2688                        }
2689                        return true;
2690                }
2691        }
2692        return false;
2693}
2694
2695// Write bytes to the UTP socket.
2696// Returns true if the socket is still writable.
2697bool UTP_Write(UTPSocket *conn, size_t bytes)
2698{
2699        assert(conn);
2700
2701#ifdef g_log_utp_verbose
2702        size_t param = bytes;
2703#endif
2704
2705        if (conn->state != CS_CONNECTED) {
2706                LOG_UTPV("0x%08x: UTP_Write %u bytes = false (not CS_CONNECTED)", conn, (uint)bytes);
2707                return false;
2708        }
2709
2710        g_current_ms = UTP_GetMilliseconds();
2711
2712        conn->update_send_quota();
2713
2714        // don't send unless it will all fit in the window
2715        size_t packet_size = conn->get_packet_size();
2716        size_t num_to_send = min<size_t>(bytes, packet_size);
2717        while (conn->is_writable(num_to_send)) {
2718                // Send an outgoing packet.
2719                // Also add it to the outgoing of packets that have been sent but not ACKed.
2720
2721                if (num_to_send == 0) {
2722                        LOG_UTPV("0x%08x: UTP_Write %u bytes = true", conn, (uint)param);
2723                        return true;
2724                }
2725                bytes -= num_to_send;
2726
2727                LOG_UTPV("0x%08x: Sending packet. seq_nr:%u ack_nr:%u wnd:%u/%u/%u rcv_win:%u size:%u quota:%d cur_window_packets:%u",
2728                                 conn, conn->seq_nr, conn->ack_nr,
2729                                 (uint)(conn->cur_window + num_to_send),
2730                                 (uint)conn->max_window, (uint)conn->max_window_user,
2731                                 (uint)conn->last_rcv_win, num_to_send, conn->send_quota / 100,
2732                                 conn->cur_window_packets);
2733                conn->write_outgoing_packet(num_to_send, ST_DATA);
2734                num_to_send = min<size_t>(bytes, packet_size);
2735        }
2736
2737        // mark the socket as not being writable.
2738        conn->state = CS_CONNECTED_FULL;
2739        LOG_UTPV("0x%08x: UTP_Write %u bytes = false", conn, (uint)bytes);
2740        return false;
2741}
2742
2743void UTP_RBDrained(UTPSocket *conn)
2744{
2745        assert(conn);
2746
2747        const size_t rcvwin = conn->get_rcv_window();
2748
2749        if (rcvwin > conn->last_rcv_win) {
2750                // If last window was 0 send ACK immediately, otherwise should set timer
2751                if (conn->last_rcv_win == 0) {
2752                        conn->send_ack();
2753                } else {
2754                        conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD);
2755                }
2756        }
2757}
2758
2759void UTP_CheckTimeouts()
2760{
2761        g_current_ms = UTP_GetMilliseconds();
2762
2763        for (size_t i = 0; i < g_rst_info.GetCount(); i++) {
2764                if ((int)(g_current_ms - g_rst_info[i].timestamp) >= RST_INFO_TIMEOUT) {
2765                        g_rst_info.MoveUpLast(i);
2766                        i--;
2767                }
2768        }
2769        if (g_rst_info.GetCount() != g_rst_info.GetAlloc()) {
2770                g_rst_info.Compact();
2771        }
2772
2773        for (size_t i = 0; i != g_utp_sockets.GetCount(); i++) {
2774                UTPSocket *conn = g_utp_sockets[i];
2775                conn->check_timeouts();
2776
2777                // Check if the object was deleted
2778                if (conn->state == CS_DESTROY) {
2779                        LOG_UTPV("0x%08x: Destroying", conn);
2780                        UTP_Free(conn);
2781                        i--;
2782                }
2783        }
2784}
2785
2786size_t UTP_GetPacketSize(UTPSocket *socket)
2787{
2788        return socket->get_packet_size();
2789}
2790
2791void UTP_GetPeerName(UTPSocket *conn, struct sockaddr *addr, socklen_t *addrlen)
2792{
2793        assert(conn);
2794
2795        socklen_t len;
2796        const SOCKADDR_STORAGE sa = conn->addr.get_sockaddr_storage(&len);
2797        *addrlen = min(len, *addrlen);
2798        memcpy(addr, &sa, *addrlen);
2799}
2800
2801void UTP_GetDelays(UTPSocket *conn, int32 *ours, int32 *theirs, uint32 *age)
2802{
2803        assert(conn);
2804
2805        if (ours) *ours = conn->our_hist.get_value();
2806        if (theirs) *theirs = conn->their_hist.get_value();
2807        if (age) *age = g_current_ms - conn->last_measured_delay;
2808}
2809
2810#ifdef _DEBUG
2811void UTP_GetStats(UTPSocket *conn, UTPStats *stats)
2812{
2813        assert(conn);
2814
2815        *stats = conn->_stats;
2816}
2817#endif // _DEBUG
2818
2819void UTP_GetGlobalStats(UTPGlobalStats *stats)
2820{
2821        *stats = _global_stats;
2822}
2823
2824// Close the UTP socket.
2825// It is not valid for the upper layer to refer to socket after it is closed.
2826// Data will keep to try being delivered after the close.
2827void UTP_Close(UTPSocket *conn)
2828{
2829        assert(conn);
2830
2831        assert(conn->state != CS_DESTROY_DELAY && conn->state != CS_FIN_SENT && conn->state != CS_DESTROY);
2832
2833        LOG_UTPV("0x%08x: UTP_Close in state:%s", conn, statenames[conn->state]);
2834
2835        switch(conn->state) {
2836        case CS_CONNECTED:
2837        case CS_CONNECTED_FULL:
2838                conn->state = CS_FIN_SENT;
2839                conn->write_outgoing_packet(0, ST_FIN);
2840                break;
2841
2842        case CS_SYN_SENT:
2843                conn->rto_timeout = UTP_GetMilliseconds() + min<uint>(conn->rto * 2, 60);
2844        case CS_GOT_FIN:
2845                conn->state = CS_DESTROY_DELAY;
2846                break;
2847
2848        default:
2849                conn->state = CS_DESTROY;
2850                break;
2851        }
2852}
Note: See TracBrowser for help on using the repository browser.