1 | #include <StdAfx.h> |
---|
2 | |
---|
3 | #include "utp.h" |
---|
4 | #include "templates.h" |
---|
5 | |
---|
6 | #include <stdio.h> |
---|
7 | #include <assert.h> |
---|
8 | #include <string.h> |
---|
9 | #include <string.h> |
---|
10 | #include <stdlib.h> |
---|
11 | #include <errno.h> |
---|
12 | #include <limits.h> // for UINT_MAX |
---|
13 | |
---|
14 | #ifdef WIN32 |
---|
15 | #include "win32_inet_ntop.h" |
---|
16 | |
---|
17 | // newer versions of MSVC define these in errno.h |
---|
18 | #ifndef ECONNRESET |
---|
19 | #define ECONNRESET WSAECONNRESET |
---|
20 | #define EMSGSIZE WSAEMSGSIZE |
---|
21 | #define ECONNREFUSED WSAECONNREFUSED |
---|
22 | #define ETIMEDOUT WSAETIMEDOUT |
---|
23 | #endif |
---|
24 | #endif |
---|
25 | |
---|
26 | #ifdef POSIX |
---|
27 | typedef sockaddr_storage SOCKADDR_STORAGE; |
---|
28 | #endif // POSIX |
---|
29 | |
---|
30 | // number of bytes to increase max window size by, per RTT. This is |
---|
31 | // scaled down linearly proportional to off_target. i.e. if all packets |
---|
32 | // in one window have 0 delay, window size will increase by this number. |
---|
33 | // Typically it's less. TCP increases one MSS per RTT, which is 1500 |
---|
34 | #define MAX_CWND_INCREASE_BYTES_PER_RTT 3000 |
---|
35 | #define CUR_DELAY_SIZE 3 |
---|
36 | // experiments suggest that a clock skew of 10 ms per 325 seconds |
---|
37 | // is not impossible. Reset delay_base every 13 minutes. The clock |
---|
38 | // skew is dealt with by observing the delay base in the other |
---|
39 | // direction, and adjusting our own upwards if the opposite direction |
---|
40 | // delay base keeps going down |
---|
41 | #define DELAY_BASE_HISTORY 13 |
---|
42 | #define MAX_WINDOW_DECAY 100 // ms |
---|
43 | |
---|
44 | #define REORDER_BUFFER_SIZE 32 |
---|
45 | #define REORDER_BUFFER_MAX_SIZE 511 |
---|
46 | #define OUTGOING_BUFFER_MAX_SIZE 511 |
---|
47 | |
---|
48 | #define PACKET_SIZE 350 |
---|
49 | |
---|
50 | // this is the minimum max_window value. It can never drop below this |
---|
51 | #define MIN_WINDOW_SIZE 10 |
---|
52 | |
---|
53 | // when window sizes are smaller than one packet_size, this |
---|
54 | // will pace the packets to average at the given window size |
---|
55 | // if it's not set, it will simply not send anything until |
---|
56 | // there's a timeout |
---|
57 | #define USE_PACKET_PACING 1 |
---|
58 | |
---|
59 | // if we receive 4 or more duplicate acks, we resend the packet |
---|
60 | // that hasn't been acked yet |
---|
61 | #define DUPLICATE_ACKS_BEFORE_RESEND 3 |
---|
62 | |
---|
63 | #define DELAYED_ACK_BYTE_THRESHOLD 2400 // bytes |
---|
64 | #define DELAYED_ACK_TIME_THRESHOLD 100 // milliseconds |
---|
65 | |
---|
66 | #define RST_INFO_TIMEOUT 10000 |
---|
67 | #define RST_INFO_LIMIT 1000 |
---|
68 | // 29 seconds determined from measuring many home NAT devices |
---|
69 | #define KEEPALIVE_INTERVAL 29000 |
---|
70 | |
---|
71 | |
---|
72 | #define SEQ_NR_MASK 0xFFFF |
---|
73 | #define ACK_NR_MASK 0xFFFF |
---|
74 | |
---|
75 | #define DIV_ROUND_UP(num, denom) ((num + denom - 1) / denom) |
---|
76 | |
---|
77 | #include "utp_utils.h" |
---|
78 | #include "utp_config.h" |
---|
79 | |
---|
80 | #define LOG_UTP if (g_log_utp) utp_log |
---|
81 | #define LOG_UTPV if (g_log_utp_verbose) utp_log |
---|
82 | |
---|
83 | uint32 g_current_ms; |
---|
84 | |
---|
85 | // The totals are derived from the following data: |
---|
86 | // 45: IPv6 address including embedded IPv4 address |
---|
87 | // 11: Scope Id |
---|
88 | // 2: Brackets around IPv6 address when port is present |
---|
89 | // 6: Port (including colon) |
---|
90 | // 1: Terminating null byte |
---|
91 | char addrbuf[65]; |
---|
92 | char addrbuf2[65]; |
---|
93 | #define addrfmt(x, s) x.fmt(s, sizeof(s)) |
---|
94 | |
---|
95 | #pragma pack(push,1) |
---|
96 | |
---|
97 | struct PACKED_ATTRIBUTE PackedSockAddr { |
---|
98 | |
---|
99 | // The values are always stored here in network byte order |
---|
100 | union { |
---|
101 | byte _in6[16]; // IPv6 |
---|
102 | uint16 _in6w[8]; // IPv6, word based (for convenience) |
---|
103 | uint32 _in6d[4]; // Dword access |
---|
104 | in6_addr _in6addr; // For convenience |
---|
105 | } _in; |
---|
106 | |
---|
107 | // Host byte order |
---|
108 | uint16 _port; |
---|
109 | |
---|
110 | #define _sin4 _in._in6d[3] // IPv4 is stored where it goes if mapped |
---|
111 | |
---|
112 | #define _sin6 _in._in6 |
---|
113 | #define _sin6w _in._in6w |
---|
114 | #define _sin6d _in._in6d |
---|
115 | |
---|
116 | byte get_family() const |
---|
117 | { |
---|
118 | return (IN6_IS_ADDR_V4MAPPED(&_in._in6addr) != 0) ? AF_INET : AF_INET6; |
---|
119 | } |
---|
120 | |
---|
121 | bool operator==(const PackedSockAddr& rhs) const |
---|
122 | { |
---|
123 | if (&rhs == this) |
---|
124 | return true; |
---|
125 | if (_port != rhs._port) |
---|
126 | return false; |
---|
127 | return memcmp(_sin6, rhs._sin6, sizeof(_sin6)) == 0; |
---|
128 | } |
---|
129 | bool operator!=(const PackedSockAddr& rhs) const { return !(*this == rhs); } |
---|
130 | |
---|
131 | PackedSockAddr(const SOCKADDR_STORAGE* sa, socklen_t len) |
---|
132 | { |
---|
133 | if (sa->ss_family == AF_INET) { |
---|
134 | assert(len >= sizeof(sockaddr_in)); |
---|
135 | const sockaddr_in *sin = (sockaddr_in*)sa; |
---|
136 | _sin6w[0] = 0; |
---|
137 | _sin6w[1] = 0; |
---|
138 | _sin6w[2] = 0; |
---|
139 | _sin6w[3] = 0; |
---|
140 | _sin6w[4] = 0; |
---|
141 | _sin6w[5] = 0xffff; |
---|
142 | _sin4 = sin->sin_addr.s_addr; |
---|
143 | _port = ntohs(sin->sin_port); |
---|
144 | } else { |
---|
145 | assert(len >= sizeof(sockaddr_in6)); |
---|
146 | const sockaddr_in6 *sin6 = (sockaddr_in6*)sa; |
---|
147 | _in._in6addr = sin6->sin6_addr; |
---|
148 | _port = ntohs(sin6->sin6_port); |
---|
149 | } |
---|
150 | } |
---|
151 | |
---|
152 | SOCKADDR_STORAGE get_sockaddr_storage(socklen_t *len = NULL) const |
---|
153 | { |
---|
154 | SOCKADDR_STORAGE sa; |
---|
155 | const byte family = get_family(); |
---|
156 | if (family == AF_INET) { |
---|
157 | sockaddr_in *sin = (sockaddr_in*)&sa; |
---|
158 | if (len) *len = sizeof(sockaddr_in); |
---|
159 | memset(sin, 0, sizeof(sockaddr_in)); |
---|
160 | sin->sin_family = family; |
---|
161 | sin->sin_port = htons(_port); |
---|
162 | sin->sin_addr.s_addr = _sin4; |
---|
163 | } else { |
---|
164 | sockaddr_in6 *sin6 = (sockaddr_in6*)&sa; |
---|
165 | memset(sin6, 0, sizeof(sockaddr_in6)); |
---|
166 | if (len) *len = sizeof(sockaddr_in6); |
---|
167 | sin6->sin6_family = family; |
---|
168 | sin6->sin6_addr = _in._in6addr; |
---|
169 | sin6->sin6_port = htons(_port); |
---|
170 | } |
---|
171 | return sa; |
---|
172 | } |
---|
173 | |
---|
174 | cstr fmt(str s, size_t len) const |
---|
175 | { |
---|
176 | memset(s, 0, len); |
---|
177 | const byte family = get_family(); |
---|
178 | str i; |
---|
179 | if (family == AF_INET) { |
---|
180 | inet_ntop(family, (uint32*)&_sin4, s, len); |
---|
181 | i = s; |
---|
182 | while (*++i) {} |
---|
183 | } else { |
---|
184 | i = s; |
---|
185 | *i++ = '['; |
---|
186 | inet_ntop(family, (in6_addr*)&_in._in6addr, i, len-1); |
---|
187 | while (*++i) {} |
---|
188 | *i++ = ']'; |
---|
189 | } |
---|
190 | snprintf(i, len - (i-s), ":%u", _port); |
---|
191 | return s; |
---|
192 | } |
---|
193 | }; |
---|
194 | |
---|
195 | struct PACKED_ATTRIBUTE RST_Info { |
---|
196 | PackedSockAddr addr; |
---|
197 | uint32 connid; |
---|
198 | uint32 timestamp; |
---|
199 | uint16 ack_nr; |
---|
200 | }; |
---|
201 | |
---|
202 | // these packet sizes are including the uTP header wich |
---|
203 | // is either 20 or 23 bytes depending on version |
---|
204 | #define PACKET_SIZE_EMPTY_BUCKET 0 |
---|
205 | #define PACKET_SIZE_EMPTY 23 |
---|
206 | #define PACKET_SIZE_SMALL_BUCKET 1 |
---|
207 | #define PACKET_SIZE_SMALL 373 |
---|
208 | #define PACKET_SIZE_MID_BUCKET 2 |
---|
209 | #define PACKET_SIZE_MID 723 |
---|
210 | #define PACKET_SIZE_BIG_BUCKET 3 |
---|
211 | #define PACKET_SIZE_BIG 1400 |
---|
212 | #define PACKET_SIZE_HUGE_BUCKET 4 |
---|
213 | |
---|
214 | struct PACKED_ATTRIBUTE PacketFormat { |
---|
215 | // connection ID |
---|
216 | uint32_big connid; |
---|
217 | uint32_big tv_sec; |
---|
218 | uint32_big tv_usec; |
---|
219 | uint32_big reply_micro; |
---|
220 | // receive window size in PACKET_SIZE chunks |
---|
221 | byte windowsize; |
---|
222 | // Type of the first extension header |
---|
223 | byte ext; |
---|
224 | // Flags |
---|
225 | byte flags; |
---|
226 | // Sequence number |
---|
227 | uint16_big seq_nr; |
---|
228 | // Acknowledgment number |
---|
229 | uint16_big ack_nr; |
---|
230 | }; |
---|
231 | |
---|
232 | struct PACKED_ATTRIBUTE PacketFormatAck { |
---|
233 | PacketFormat pf; |
---|
234 | byte ext_next; |
---|
235 | byte ext_len; |
---|
236 | byte acks[4]; |
---|
237 | }; |
---|
238 | |
---|
239 | struct PACKED_ATTRIBUTE PacketFormatExtensions { |
---|
240 | PacketFormat pf; |
---|
241 | byte ext_next; |
---|
242 | byte ext_len; |
---|
243 | byte extensions[8]; |
---|
244 | }; |
---|
245 | |
---|
246 | struct PACKED_ATTRIBUTE PacketFormatV1 { |
---|
247 | // packet_type (4 high bits) |
---|
248 | // protocol version (4 low bits) |
---|
249 | byte ver_type; |
---|
250 | byte version() const { return ver_type & 0xf; } |
---|
251 | byte type() const { return ver_type >> 4; } |
---|
252 | void set_version(byte v) { ver_type = (ver_type & 0xf0) | (v & 0xf); } |
---|
253 | void set_type(byte t) { ver_type = (ver_type & 0xf) | (t << 4); } |
---|
254 | |
---|
255 | // Type of the first extension header |
---|
256 | byte ext; |
---|
257 | // connection ID |
---|
258 | uint16_big connid; |
---|
259 | uint32_big tv_usec; |
---|
260 | uint32_big reply_micro; |
---|
261 | // receive window size in bytes |
---|
262 | uint32_big windowsize; |
---|
263 | // Sequence number |
---|
264 | uint16_big seq_nr; |
---|
265 | // Acknowledgment number |
---|
266 | uint16_big ack_nr; |
---|
267 | }; |
---|
268 | |
---|
269 | struct PACKED_ATTRIBUTE PacketFormatAckV1 { |
---|
270 | PacketFormatV1 pf; |
---|
271 | byte ext_next; |
---|
272 | byte ext_len; |
---|
273 | byte acks[4]; |
---|
274 | }; |
---|
275 | |
---|
276 | struct PACKED_ATTRIBUTE PacketFormatExtensionsV1 { |
---|
277 | PacketFormatV1 pf; |
---|
278 | byte ext_next; |
---|
279 | byte ext_len; |
---|
280 | byte extensions[8]; |
---|
281 | }; |
---|
282 | |
---|
283 | #pragma pack(pop) |
---|
284 | |
---|
285 | enum { |
---|
286 | ST_DATA = 0, // Data packet. |
---|
287 | ST_FIN = 1, // Finalize the connection. This is the last packet. |
---|
288 | ST_STATE = 2, // State packet. Used to transmit an ACK with no data. |
---|
289 | ST_RESET = 3, // Terminate connection forcefully. |
---|
290 | ST_SYN = 4, // Connect SYN |
---|
291 | ST_NUM_STATES, // used for bounds checking |
---|
292 | }; |
---|
293 | |
---|
294 | static const cstr flagnames[] = { |
---|
295 | "ST_DATA","ST_FIN","ST_STATE","ST_RESET","ST_SYN" |
---|
296 | }; |
---|
297 | |
---|
298 | enum CONN_STATE { |
---|
299 | CS_IDLE = 0, |
---|
300 | CS_SYN_SENT = 1, |
---|
301 | CS_CONNECTED = 2, |
---|
302 | CS_CONNECTED_FULL = 3, |
---|
303 | CS_GOT_FIN = 4, |
---|
304 | CS_DESTROY_DELAY = 5, |
---|
305 | CS_FIN_SENT = 6, |
---|
306 | CS_RESET = 7, |
---|
307 | CS_DESTROY = 8, |
---|
308 | }; |
---|
309 | |
---|
310 | static const cstr statenames[] = { |
---|
311 | "IDLE","SYN_SENT","CONNECTED","CONNECTED_FULL","GOT_FIN","DESTROY_DELAY","FIN_SENT","RESET","DESTROY" |
---|
312 | }; |
---|
313 | |
---|
314 | struct OutgoingPacket { |
---|
315 | size_t length; |
---|
316 | size_t payload; |
---|
317 | uint64 time_sent; // microseconds |
---|
318 | uint transmissions:31; |
---|
319 | bool need_resend:1; |
---|
320 | byte data[1]; |
---|
321 | }; |
---|
322 | |
---|
323 | void no_read(void *socket, const byte *bytes, size_t count) {} |
---|
324 | void no_write(void *socket, byte *bytes, size_t count) {} |
---|
325 | size_t no_rb_size(void *socket) { return 0; } |
---|
326 | void no_state(void *socket, int state) {} |
---|
327 | void no_error(void *socket, int errcode) {} |
---|
328 | void no_overhead(void *socket, bool send, size_t count, int type) {} |
---|
329 | |
---|
330 | UTPFunctionTable zero_funcs = { |
---|
331 | &no_read, |
---|
332 | &no_write, |
---|
333 | &no_rb_size, |
---|
334 | &no_state, |
---|
335 | &no_error, |
---|
336 | &no_overhead, |
---|
337 | }; |
---|
338 | |
---|
339 | struct SizableCircularBuffer { |
---|
340 | // This is the mask. Since it's always a power of 2, adding 1 to this value will return the size. |
---|
341 | size_t mask; |
---|
342 | // This is the elements that the circular buffer points to |
---|
343 | void **elements; |
---|
344 | |
---|
345 | void *get(size_t i) { assert(elements); return elements ? elements[i & mask] : NULL; } |
---|
346 | void put(size_t i, void *data) { assert(elements); elements[i&mask] = data; } |
---|
347 | |
---|
348 | void grow(size_t item, size_t index); |
---|
349 | void ensure_size(size_t item, size_t index) { if (index > mask) grow(item, index); } |
---|
350 | size_t size() { return mask + 1; } |
---|
351 | }; |
---|
352 | |
---|
353 | static struct UTPGlobalStats _global_stats; |
---|
354 | |
---|
355 | // Item contains the element we want to make space for |
---|
356 | // index is the index in the list. |
---|
357 | void SizableCircularBuffer::grow(size_t item, size_t index) |
---|
358 | { |
---|
359 | // Figure out the new size. |
---|
360 | size_t size = mask + 1; |
---|
361 | do size *= 2; while (index >= size); |
---|
362 | |
---|
363 | // Allocate the new buffer |
---|
364 | void **buf = (void**)calloc(size, sizeof(void*)); |
---|
365 | |
---|
366 | size--; |
---|
367 | |
---|
368 | // Copy elements from the old buffer to the new buffer |
---|
369 | for (size_t i = 0; i <= mask; i++) { |
---|
370 | buf[(item - index + i) & size] = get(item - index + i); |
---|
371 | } |
---|
372 | |
---|
373 | // Swap to the newly allocated buffer |
---|
374 | mask = size; |
---|
375 | free(elements); |
---|
376 | elements = buf; |
---|
377 | } |
---|
378 | |
---|
379 | // compare if lhs is less than rhs, taking wrapping |
---|
380 | // into account. if lhs is close to UINT_MAX and rhs |
---|
381 | // is close to 0, lhs is assumed to have wrapped and |
---|
382 | // considered smaller |
---|
383 | bool wrapping_compare_less(uint32 lhs, uint32 rhs) |
---|
384 | { |
---|
385 | // distance walking from lhs to rhs, downwards |
---|
386 | const uint32 dist_down = lhs - rhs; |
---|
387 | // distance walking from lhs to rhs, upwards |
---|
388 | const uint32 dist_up = rhs - lhs; |
---|
389 | |
---|
390 | // if the distance walking up is shorter, lhs |
---|
391 | // is less than rhs. If the distance walking down |
---|
392 | // is shorter, then rhs is less than lhs |
---|
393 | return dist_up < dist_down; |
---|
394 | } |
---|
395 | |
---|
396 | struct DelayHist { |
---|
397 | uint32 delay_base; |
---|
398 | |
---|
399 | // this is the history of delay samples, |
---|
400 | // normalized by using the delay_base. These |
---|
401 | // values are always greater than 0 and measures |
---|
402 | // the queuing delay in microseconds |
---|
403 | uint32 cur_delay_hist[CUR_DELAY_SIZE]; |
---|
404 | size_t cur_delay_idx; |
---|
405 | |
---|
406 | // this is the history of delay_base. It's |
---|
407 | // a number that doesn't have an absolute meaning |
---|
408 | // only relative. It doesn't make sense to initialize |
---|
409 | // it to anything other than values relative to |
---|
410 | // what's been seen in the real world. |
---|
411 | uint32 delay_base_hist[DELAY_BASE_HISTORY]; |
---|
412 | size_t delay_base_idx; |
---|
413 | // the time when we last stepped the delay_base_idx |
---|
414 | uint32 delay_base_time; |
---|
415 | |
---|
416 | bool delay_base_initialized; |
---|
417 | |
---|
418 | void clear() |
---|
419 | { |
---|
420 | delay_base_initialized = false; |
---|
421 | delay_base = 0; |
---|
422 | cur_delay_idx = 0; |
---|
423 | delay_base_idx = 0; |
---|
424 | delay_base_time = g_current_ms; |
---|
425 | for (size_t i = 0; i < CUR_DELAY_SIZE; i++) { |
---|
426 | cur_delay_hist[i] = 0; |
---|
427 | } |
---|
428 | for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) { |
---|
429 | delay_base_hist[i] = 0; |
---|
430 | } |
---|
431 | } |
---|
432 | |
---|
433 | void shift(const uint32 offset) |
---|
434 | { |
---|
435 | // the offset should never be "negative" |
---|
436 | // assert(offset < 0x10000000); |
---|
437 | |
---|
438 | // increase all of our base delays by this amount |
---|
439 | // this is used to take clock skew into account |
---|
440 | // by observing the other side's changes in its base_delay |
---|
441 | for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) { |
---|
442 | delay_base_hist[i] += offset; |
---|
443 | } |
---|
444 | delay_base += offset; |
---|
445 | } |
---|
446 | |
---|
447 | void add_sample(const uint32 sample) |
---|
448 | { |
---|
449 | // The two clocks (in the two peers) are assumed not to |
---|
450 | // progress at the exact same rate. They are assumed to be |
---|
451 | // drifting, which causes the delay samples to contain |
---|
452 | // a systematic error, either they are under- |
---|
453 | // estimated or over-estimated. This is why we update the |
---|
454 | // delay_base every two minutes, to adjust for this. |
---|
455 | |
---|
456 | // This means the values will keep drifting and eventually wrap. |
---|
457 | // We can cross the wrapping boundry in two directions, either |
---|
458 | // going up, crossing the highest value, or going down, crossing 0. |
---|
459 | |
---|
460 | // if the delay_base is close to the max value and sample actually |
---|
461 | // wrapped on the other end we would see something like this: |
---|
462 | // delay_base = 0xffffff00, sample = 0x00000400 |
---|
463 | // sample - delay_base = 0x500 which is the correct difference |
---|
464 | |
---|
465 | // if the delay_base is instead close to 0, and we got an even lower |
---|
466 | // sample (that will eventually update the delay_base), we may see |
---|
467 | // something like this: |
---|
468 | // delay_base = 0x00000400, sample = 0xffffff00 |
---|
469 | // sample - delay_base = 0xfffffb00 |
---|
470 | // this needs to be interpreted as a negative number and the actual |
---|
471 | // recorded delay should be 0. |
---|
472 | |
---|
473 | // It is important that all arithmetic that assume wrapping |
---|
474 | // is done with unsigned intergers. Signed integers are not guaranteed |
---|
475 | // to wrap the way unsigned integers do. At least GCC takes advantage |
---|
476 | // of this relaxed rule and won't necessarily wrap signed ints. |
---|
477 | |
---|
478 | // remove the clock offset and propagation delay. |
---|
479 | // delay base is min of the sample and the current |
---|
480 | // delay base. This min-operation is subject to wrapping |
---|
481 | // and care needs to be taken to correctly choose the |
---|
482 | // true minimum. |
---|
483 | |
---|
484 | // specifically the problem case is when delay_base is very small |
---|
485 | // and sample is very large (because it wrapped past zero), sample |
---|
486 | // needs to be considered the smaller |
---|
487 | |
---|
488 | if (!delay_base_initialized) { |
---|
489 | // delay_base being 0 suggests that we haven't initialized |
---|
490 | // it or its history with any real measurements yet. Initialize |
---|
491 | // everything with this sample. |
---|
492 | for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) { |
---|
493 | // if we don't have a value, set it to the current sample |
---|
494 | delay_base_hist[i] = sample; |
---|
495 | continue; |
---|
496 | } |
---|
497 | delay_base = sample; |
---|
498 | delay_base_initialized = true; |
---|
499 | } |
---|
500 | |
---|
501 | if (wrapping_compare_less(sample, delay_base_hist[delay_base_idx])) { |
---|
502 | // sample is smaller than the current delay_base_hist entry |
---|
503 | // update it |
---|
504 | delay_base_hist[delay_base_idx] = sample; |
---|
505 | } |
---|
506 | |
---|
507 | // is sample lower than delay_base? If so, update delay_base |
---|
508 | if (wrapping_compare_less(sample, delay_base)) { |
---|
509 | // sample is smaller than the current delay_base |
---|
510 | // update it |
---|
511 | delay_base = sample; |
---|
512 | } |
---|
513 | |
---|
514 | // this operation may wrap, and is supposed to |
---|
515 | const uint32 delay = sample - delay_base; |
---|
516 | // sanity check. If this is triggered, something fishy is going on |
---|
517 | // it means the measured sample was greater than 32 seconds! |
---|
518 | // assert(delay < 0x2000000); |
---|
519 | |
---|
520 | cur_delay_hist[cur_delay_idx] = delay; |
---|
521 | cur_delay_idx = (cur_delay_idx + 1) % CUR_DELAY_SIZE; |
---|
522 | |
---|
523 | // once every minute |
---|
524 | if (g_current_ms - delay_base_time > 60 * 1000) { |
---|
525 | delay_base_time = g_current_ms; |
---|
526 | delay_base_idx = (delay_base_idx + 1) % DELAY_BASE_HISTORY; |
---|
527 | // clear up the new delay base history spot by initializing |
---|
528 | // it to the current sample, then update it |
---|
529 | delay_base_hist[delay_base_idx] = sample; |
---|
530 | delay_base = delay_base_hist[0]; |
---|
531 | // Assign the lowest delay in the last 2 minutes to delay_base |
---|
532 | for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) { |
---|
533 | if (wrapping_compare_less(delay_base_hist[i], delay_base)) |
---|
534 | delay_base = delay_base_hist[i]; |
---|
535 | } |
---|
536 | } |
---|
537 | } |
---|
538 | |
---|
539 | uint32 get_value() |
---|
540 | { |
---|
541 | uint32 value = UINT_MAX; |
---|
542 | for (size_t i = 0; i < CUR_DELAY_SIZE; i++) { |
---|
543 | value = min<uint32>(cur_delay_hist[i], value); |
---|
544 | } |
---|
545 | // value could be UINT_MAX if we have no samples yet... |
---|
546 | return value; |
---|
547 | } |
---|
548 | }; |
---|
549 | |
---|
550 | struct UTPSocket { |
---|
551 | PackedSockAddr addr; |
---|
552 | |
---|
553 | size_t idx; |
---|
554 | |
---|
555 | uint16 reorder_count; |
---|
556 | byte duplicate_ack; |
---|
557 | |
---|
558 | // the number of bytes we've received but not acked yet |
---|
559 | size_t bytes_since_ack; |
---|
560 | |
---|
561 | // the number of packets in the send queue. Packets that haven't |
---|
562 | // yet been sent count as well as packets marked as needing resend |
---|
563 | // the oldest un-acked packet in the send queue is seq_nr - cur_window_packets |
---|
564 | uint16 cur_window_packets; |
---|
565 | |
---|
566 | // how much of the window is used, number of bytes in-flight |
---|
567 | // packets that have not yet been sent do not count, packets |
---|
568 | // that are marked as needing to be re-sent (due to a timeout) |
---|
569 | // don't count either |
---|
570 | size_t cur_window; |
---|
571 | // maximum window size, in bytes |
---|
572 | size_t max_window; |
---|
573 | // SO_SNDBUF setting, in bytes |
---|
574 | size_t opt_sndbuf; |
---|
575 | // SO_RCVBUF setting, in bytes |
---|
576 | size_t opt_rcvbuf; |
---|
577 | |
---|
578 | // Is a FIN packet in the reassembly buffer? |
---|
579 | bool got_fin:1; |
---|
580 | // Timeout procedure |
---|
581 | bool fast_timeout:1; |
---|
582 | |
---|
583 | // max receive window for other end, in bytes |
---|
584 | size_t max_window_user; |
---|
585 | // 0 = original uTP header, 1 = second revision |
---|
586 | byte version; |
---|
587 | CONN_STATE state; |
---|
588 | // TickCount when we last decayed window (wraps) |
---|
589 | int32 last_rwin_decay; |
---|
590 | |
---|
591 | // the sequence number of the FIN packet. This field is only set |
---|
592 | // when we have received a FIN, and the flag field has the FIN flag set. |
---|
593 | // it is used to know when it is safe to destroy the socket, we must have |
---|
594 | // received all packets up to this sequence number first. |
---|
595 | uint16 eof_pkt; |
---|
596 | |
---|
597 | // All sequence numbers up to including this have been properly received |
---|
598 | // by us |
---|
599 | uint16 ack_nr; |
---|
600 | // This is the sequence number for the next packet to be sent. |
---|
601 | uint16 seq_nr; |
---|
602 | |
---|
603 | uint16 timeout_seq_nr; |
---|
604 | |
---|
605 | // This is the sequence number of the next packet we're allowed to |
---|
606 | // do a fast resend with. This makes sure we only do a fast-resend |
---|
607 | // once per packet. We can resend the packet with this sequence number |
---|
608 | // or any later packet (with a higher sequence number). |
---|
609 | uint16 fast_resend_seq_nr; |
---|
610 | |
---|
611 | uint32 reply_micro; |
---|
612 | |
---|
613 | // the time when we need to send another ack. If there's |
---|
614 | // nothing to ack, this is a very large number |
---|
615 | uint32 ack_time; |
---|
616 | |
---|
617 | uint32 last_got_packet; |
---|
618 | uint32 last_sent_packet; |
---|
619 | uint32 last_measured_delay; |
---|
620 | uint32 last_maxed_out_window; |
---|
621 | |
---|
622 | // the last time we added send quota to the connection |
---|
623 | // when adding send quota, this is subtracted from the |
---|
624 | // current time multiplied by max_window / rtt |
---|
625 | // which is the current allowed send rate. |
---|
626 | int32 last_send_quota; |
---|
627 | |
---|
628 | // the number of bytes we are allowed to send on |
---|
629 | // this connection. If this is more than one packet |
---|
630 | // size when we run out of data to send, it is clamped |
---|
631 | // to the packet size |
---|
632 | // this value is multiplied by 100 in order to get |
---|
633 | // higher accuracy when dealing with low rates |
---|
634 | int32 send_quota; |
---|
635 | |
---|
636 | SendToProc *send_to_proc; |
---|
637 | void *send_to_userdata; |
---|
638 | UTPFunctionTable func; |
---|
639 | void *userdata; |
---|
640 | |
---|
641 | // Round trip time |
---|
642 | uint rtt; |
---|
643 | // Round trip time variance |
---|
644 | uint rtt_var; |
---|
645 | // Round trip timeout |
---|
646 | uint rto; |
---|
647 | DelayHist rtt_hist; |
---|
648 | uint retransmit_timeout; |
---|
649 | // The RTO timer will timeout here. |
---|
650 | uint rto_timeout; |
---|
651 | // When the window size is set to zero, start this timer. It will send a new packet every 30secs. |
---|
652 | uint32 zerowindow_time; |
---|
653 | |
---|
654 | uint32 conn_seed; |
---|
655 | // Connection ID for packets I receive |
---|
656 | uint32 conn_id_recv; |
---|
657 | // Connection ID for packets I send |
---|
658 | uint32 conn_id_send; |
---|
659 | // Last rcv window we advertised, in bytes |
---|
660 | size_t last_rcv_win; |
---|
661 | |
---|
662 | DelayHist our_hist; |
---|
663 | DelayHist their_hist; |
---|
664 | |
---|
665 | // extension bytes from SYN packet |
---|
666 | byte extensions[8]; |
---|
667 | |
---|
668 | SizableCircularBuffer inbuf, outbuf; |
---|
669 | |
---|
670 | #ifdef _DEBUG |
---|
671 | // Public stats, returned by UTP_GetStats(). See utp.h |
---|
672 | UTPStats _stats; |
---|
673 | #endif // _DEBUG |
---|
674 | |
---|
675 | // Calculates the current receive window |
---|
676 | size_t get_rcv_window() const |
---|
677 | { |
---|
678 | // If we don't have a connection (such as during connection |
---|
679 | // establishment, always act as if we have an empty buffer). |
---|
680 | if (!userdata) return opt_rcvbuf; |
---|
681 | |
---|
682 | // Trim window down according to what's already in buffer. |
---|
683 | const size_t numbuf = func.get_rb_size(userdata); |
---|
684 | assert((int)numbuf >= 0); |
---|
685 | return opt_rcvbuf > numbuf ? opt_rcvbuf - numbuf : 0; |
---|
686 | } |
---|
687 | |
---|
688 | // Test if we're ready to decay max_window |
---|
689 | // XXX this breaks when spaced by > INT_MAX/2, which is 49 |
---|
690 | // days; the failure mode in that case is we do an extra decay |
---|
691 | // or fail to do one when we really shouldn't. |
---|
692 | bool can_decay_win(int32 msec) const |
---|
693 | { |
---|
694 | return msec - last_rwin_decay >= MAX_WINDOW_DECAY; |
---|
695 | } |
---|
696 | |
---|
697 | // If we can, decay max window, returns true if we actually did so |
---|
698 | void maybe_decay_win() |
---|
699 | { |
---|
700 | if (can_decay_win(g_current_ms)) { |
---|
701 | // TCP uses 0.5 |
---|
702 | max_window = (size_t)(max_window * .5); |
---|
703 | last_rwin_decay = g_current_ms; |
---|
704 | if (max_window < MIN_WINDOW_SIZE) |
---|
705 | max_window = MIN_WINDOW_SIZE; |
---|
706 | } |
---|
707 | } |
---|
708 | |
---|
709 | size_t get_header_size() const |
---|
710 | { |
---|
711 | return (version ? sizeof(PacketFormatV1) : sizeof(PacketFormat)); |
---|
712 | } |
---|
713 | |
---|
714 | size_t get_header_extensions_size() const |
---|
715 | { |
---|
716 | return (version ? sizeof(PacketFormatExtensionsV1) : sizeof(PacketFormatExtensions)); |
---|
717 | } |
---|
718 | |
---|
719 | void sent_ack() |
---|
720 | { |
---|
721 | ack_time = g_current_ms + 0x70000000; |
---|
722 | bytes_since_ack = 0; |
---|
723 | } |
---|
724 | |
---|
725 | size_t get_udp_mtu() const |
---|
726 | { |
---|
727 | socklen_t len; |
---|
728 | SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len); |
---|
729 | return UTP_GetUDPMTU((const struct sockaddr *)&sa, len); |
---|
730 | } |
---|
731 | |
---|
732 | size_t get_udp_overhead() const |
---|
733 | { |
---|
734 | socklen_t len; |
---|
735 | SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len); |
---|
736 | return UTP_GetUDPOverhead((const struct sockaddr *)&sa, len); |
---|
737 | } |
---|
738 | |
---|
739 | uint64 get_global_utp_bytes_sent() const |
---|
740 | { |
---|
741 | socklen_t len; |
---|
742 | SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len); |
---|
743 | return UTP_GetGlobalUTPBytesSent((const struct sockaddr *)&sa, len); |
---|
744 | } |
---|
745 | |
---|
746 | size_t get_overhead() const |
---|
747 | { |
---|
748 | return get_udp_overhead() + get_header_size(); |
---|
749 | } |
---|
750 | |
---|
751 | void send_data(PacketFormat* b, size_t length, bandwidth_type_t type); |
---|
752 | |
---|
753 | void send_ack(bool synack = false); |
---|
754 | |
---|
755 | void send_keep_alive(); |
---|
756 | |
---|
757 | static void send_rst(SendToProc *send_to_proc, void *send_to_userdata, |
---|
758 | const PackedSockAddr &addr, uint32 conn_id_send, |
---|
759 | uint16 ack_nr, uint16 seq_nr, byte version); |
---|
760 | |
---|
761 | void send_packet(OutgoingPacket *pkt); |
---|
762 | |
---|
763 | bool is_writable(size_t to_write); |
---|
764 | |
---|
765 | bool flush_packets(); |
---|
766 | |
---|
767 | void write_outgoing_packet(size_t payload, uint flags); |
---|
768 | |
---|
769 | void update_send_quota(); |
---|
770 | |
---|
771 | #ifdef _DEBUG |
---|
772 | void check_invariant(); |
---|
773 | #endif |
---|
774 | |
---|
775 | void check_timeouts(); |
---|
776 | |
---|
777 | int ack_packet(uint16 seq); |
---|
778 | |
---|
779 | size_t selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt); |
---|
780 | |
---|
781 | void selective_ack(uint base, const byte *mask, byte len); |
---|
782 | |
---|
783 | void apply_ledbat_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt); |
---|
784 | |
---|
785 | size_t get_packet_size(); |
---|
786 | }; |
---|
787 | |
---|
788 | Array<RST_Info> g_rst_info; |
---|
789 | Array<UTPSocket*> g_utp_sockets; |
---|
790 | |
---|
791 | static void UTP_RegisterSentPacket(size_t length) { |
---|
792 | if (length <= PACKET_SIZE_MID) { |
---|
793 | if (length <= PACKET_SIZE_EMPTY) { |
---|
794 | _global_stats._nraw_send[PACKET_SIZE_EMPTY_BUCKET]++; |
---|
795 | } else if (length <= PACKET_SIZE_SMALL) { |
---|
796 | _global_stats._nraw_send[PACKET_SIZE_SMALL_BUCKET]++; |
---|
797 | } else |
---|
798 | _global_stats._nraw_send[PACKET_SIZE_MID_BUCKET]++; |
---|
799 | } else { |
---|
800 | if (length <= PACKET_SIZE_BIG) { |
---|
801 | _global_stats._nraw_send[PACKET_SIZE_BIG_BUCKET]++; |
---|
802 | } else |
---|
803 | _global_stats._nraw_send[PACKET_SIZE_HUGE_BUCKET]++; |
---|
804 | } |
---|
805 | } |
---|
806 | |
---|
807 | void send_to_addr(SendToProc *send_to_proc, void *send_to_userdata, const byte *p, size_t len, const PackedSockAddr &addr) |
---|
808 | { |
---|
809 | socklen_t tolen; |
---|
810 | SOCKADDR_STORAGE to = addr.get_sockaddr_storage(&tolen); |
---|
811 | UTP_RegisterSentPacket(len); |
---|
812 | send_to_proc(send_to_userdata, p, len, (const struct sockaddr *)&to, tolen); |
---|
813 | } |
---|
814 | |
---|
815 | void UTPSocket::send_data(PacketFormat* b, size_t length, bandwidth_type_t type) |
---|
816 | { |
---|
817 | // time stamp this packet with local time, the stamp goes into |
---|
818 | // the header of every packet at the 8th byte for 8 bytes : |
---|
819 | // two integers, check packet.h for more |
---|
820 | uint64 time = UTP_GetMicroseconds(); |
---|
821 | |
---|
822 | PacketFormatV1* b1 = (PacketFormatV1*)b; |
---|
823 | if (version == 0) { |
---|
824 | b->tv_sec = (uint32)(time / 1000000); |
---|
825 | b->tv_usec = time % 1000000; |
---|
826 | b->reply_micro = reply_micro; |
---|
827 | } else { |
---|
828 | b1->tv_usec = (uint32)time; |
---|
829 | b1->reply_micro = reply_micro; |
---|
830 | } |
---|
831 | |
---|
832 | last_sent_packet = g_current_ms; |
---|
833 | |
---|
834 | #ifdef _DEBUG |
---|
835 | _stats._nbytes_xmit += length; |
---|
836 | ++_stats._nxmit; |
---|
837 | #endif |
---|
838 | if (userdata) { |
---|
839 | size_t n; |
---|
840 | if (type == payload_bandwidth) { |
---|
841 | // if this packet carries payload, just |
---|
842 | // count the header as overhead |
---|
843 | type = header_overhead; |
---|
844 | n = get_overhead(); |
---|
845 | } else { |
---|
846 | n = length + get_udp_overhead(); |
---|
847 | } |
---|
848 | func.on_overhead(userdata, true, n, type); |
---|
849 | } |
---|
850 | #if g_log_utp_verbose |
---|
851 | int flags = version == 0 ? b->flags : b1->type(); |
---|
852 | uint16 seq_nr = version == 0 ? b->seq_nr : b1->seq_nr; |
---|
853 | uint16 ack_nr = version == 0 ? b->ack_nr : b1->ack_nr; |
---|
854 | LOG_UTPV("0x%08x: send %s len:%u id:%u timestamp:"I64u" reply_micro:%u flags:%s seq_nr:%u ack_nr:%u", |
---|
855 | this, addrfmt(addr, addrbuf), (uint)length, conn_id_send, time, reply_micro, flagnames[flags], |
---|
856 | seq_nr, ack_nr); |
---|
857 | #endif |
---|
858 | send_to_addr(send_to_proc, send_to_userdata, (const byte*)b, length, addr); |
---|
859 | } |
---|
860 | |
---|
861 | void UTPSocket::send_ack(bool synack) |
---|
862 | { |
---|
863 | PacketFormatExtensions pfe; |
---|
864 | zeromem(&pfe); |
---|
865 | PacketFormatExtensionsV1& pfe1 = (PacketFormatExtensionsV1&)pfe; |
---|
866 | PacketFormatAck& pfa = (PacketFormatAck&)pfe1; |
---|
867 | PacketFormatAckV1& pfa1 = (PacketFormatAckV1&)pfe1; |
---|
868 | |
---|
869 | size_t len; |
---|
870 | last_rcv_win = get_rcv_window(); |
---|
871 | if (version == 0) { |
---|
872 | pfa.pf.connid = conn_id_send; |
---|
873 | pfa.pf.ack_nr = (uint16)ack_nr; |
---|
874 | pfa.pf.seq_nr = (uint16)seq_nr; |
---|
875 | pfa.pf.flags = ST_STATE; |
---|
876 | pfa.pf.ext = 0; |
---|
877 | pfa.pf.windowsize = (byte)DIV_ROUND_UP(last_rcv_win, PACKET_SIZE); |
---|
878 | len = sizeof(PacketFormat); |
---|
879 | } else { |
---|
880 | pfa1.pf.set_version(1); |
---|
881 | pfa1.pf.set_type(ST_STATE); |
---|
882 | pfa1.pf.ext = 0; |
---|
883 | pfa1.pf.connid = conn_id_send; |
---|
884 | pfa1.pf.ack_nr = ack_nr; |
---|
885 | pfa1.pf.seq_nr = seq_nr; |
---|
886 | pfa1.pf.windowsize = (uint32)last_rcv_win; |
---|
887 | len = sizeof(PacketFormatV1); |
---|
888 | } |
---|
889 | |
---|
890 | // we never need to send EACK for connections |
---|
891 | // that are shutting down |
---|
892 | if (reorder_count != 0 && state < CS_GOT_FIN) { |
---|
893 | // if reorder count > 0, send an EACK. |
---|
894 | // reorder count should always be 0 |
---|
895 | // for synacks, so this should not be |
---|
896 | // as synack |
---|
897 | assert(!synack); |
---|
898 | if (version == 0) { |
---|
899 | pfa.pf.ext = 1; |
---|
900 | pfa.ext_next = 0; |
---|
901 | pfa.ext_len = 4; |
---|
902 | } else { |
---|
903 | pfa1.pf.ext = 1; |
---|
904 | pfa1.ext_next = 0; |
---|
905 | pfa1.ext_len = 4; |
---|
906 | } |
---|
907 | uint m = 0; |
---|
908 | |
---|
909 | // reorder count should only be non-zero |
---|
910 | // if the packet ack_nr + 1 has not yet |
---|
911 | // been received |
---|
912 | assert(inbuf.get(ack_nr + 1) == NULL); |
---|
913 | size_t window = min<size_t>(14+16, inbuf.size()); |
---|
914 | // Generate bit mask of segments received. |
---|
915 | for (size_t i = 0; i < window; i++) { |
---|
916 | if (inbuf.get(ack_nr + i + 2) != NULL) { |
---|
917 | m |= 1 << i; |
---|
918 | LOG_UTPV("0x%08x: EACK packet [%u]", this, ack_nr + i + 2); |
---|
919 | } |
---|
920 | } |
---|
921 | if (version == 0) { |
---|
922 | pfa.acks[0] = (byte)m; |
---|
923 | pfa.acks[1] = (byte)(m >> 8); |
---|
924 | pfa.acks[2] = (byte)(m >> 16); |
---|
925 | pfa.acks[3] = (byte)(m >> 24); |
---|
926 | } else { |
---|
927 | pfa1.acks[0] = (byte)m; |
---|
928 | pfa1.acks[1] = (byte)(m >> 8); |
---|
929 | pfa1.acks[2] = (byte)(m >> 16); |
---|
930 | pfa1.acks[3] = (byte)(m >> 24); |
---|
931 | } |
---|
932 | len += 4 + 2; |
---|
933 | LOG_UTPV("0x%08x: Sending EACK %u [%u] bits:[%032b]", this, ack_nr, conn_id_send, m); |
---|
934 | } else if (synack) { |
---|
935 | // we only send "extensions" in response to SYN |
---|
936 | // and the reorder count is 0 in that state |
---|
937 | |
---|
938 | LOG_UTPV("0x%08x: Sending ACK %u [%u] with extension bits", this, ack_nr, conn_id_send); |
---|
939 | if (version == 0) { |
---|
940 | pfe.pf.ext = 2; |
---|
941 | pfe.ext_next = 0; |
---|
942 | pfe.ext_len = 8; |
---|
943 | memset(pfe.extensions, 0, 8); |
---|
944 | } else { |
---|
945 | pfe1.pf.ext = 2; |
---|
946 | pfe1.ext_next = 0; |
---|
947 | pfe1.ext_len = 8; |
---|
948 | memset(pfe1.extensions, 0, 8); |
---|
949 | } |
---|
950 | len += 8 + 2; |
---|
951 | } else { |
---|
952 | LOG_UTPV("0x%08x: Sending ACK %u [%u]", this, ack_nr, conn_id_send); |
---|
953 | } |
---|
954 | |
---|
955 | sent_ack(); |
---|
956 | send_data((PacketFormat*)&pfe, len, ack_overhead); |
---|
957 | } |
---|
958 | |
---|
959 | void UTPSocket::send_keep_alive() |
---|
960 | { |
---|
961 | ack_nr--; |
---|
962 | LOG_UTPV("0x%08x: Sending KeepAlive ACK %u [%u]", this, ack_nr, conn_id_send); |
---|
963 | send_ack(); |
---|
964 | ack_nr++; |
---|
965 | } |
---|
966 | |
---|
967 | void UTPSocket::send_rst(SendToProc *send_to_proc, void *send_to_userdata, |
---|
968 | const PackedSockAddr &addr, uint32 conn_id_send, uint16 ack_nr, uint16 seq_nr, byte version) |
---|
969 | { |
---|
970 | PacketFormat pf; |
---|
971 | zeromem(&pf); |
---|
972 | PacketFormatV1& pf1 = (PacketFormatV1&)pf; |
---|
973 | |
---|
974 | size_t len; |
---|
975 | if (version == 0) { |
---|
976 | pf.connid = conn_id_send; |
---|
977 | pf.ack_nr = ack_nr; |
---|
978 | pf.seq_nr = seq_nr; |
---|
979 | pf.flags = ST_RESET; |
---|
980 | pf.ext = 0; |
---|
981 | pf.windowsize = 0; |
---|
982 | len = sizeof(PacketFormat); |
---|
983 | } else { |
---|
984 | pf1.set_version(1); |
---|
985 | pf1.set_type(ST_RESET); |
---|
986 | pf1.ext = 0; |
---|
987 | pf1.connid = conn_id_send; |
---|
988 | pf1.ack_nr = ack_nr; |
---|
989 | pf1.seq_nr = seq_nr; |
---|
990 | pf1.windowsize = 0; |
---|
991 | len = sizeof(PacketFormatV1); |
---|
992 | } |
---|
993 | |
---|
994 | LOG_UTPV("%s: Sending RST id:%u seq_nr:%u ack_nr:%u", addrfmt(addr, addrbuf), conn_id_send, seq_nr, ack_nr); |
---|
995 | LOG_UTPV("send %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, conn_id_send); |
---|
996 | send_to_addr(send_to_proc, send_to_userdata, (const byte*)&pf1, len, addr); |
---|
997 | } |
---|
998 | |
---|
999 | void UTPSocket::send_packet(OutgoingPacket *pkt) |
---|
1000 | { |
---|
1001 | // only count against the quota the first time we |
---|
1002 | // send the packet. Don't enforce quota when closing |
---|
1003 | // a socket. Only enforce the quota when we're sending |
---|
1004 | // at slow rates (max window < packet size) |
---|
1005 | size_t max_send = min(max_window, opt_sndbuf, max_window_user); |
---|
1006 | |
---|
1007 | if (pkt->transmissions == 0 || pkt->need_resend) { |
---|
1008 | cur_window += pkt->payload; |
---|
1009 | } |
---|
1010 | |
---|
1011 | size_t packet_size = get_packet_size(); |
---|
1012 | if (pkt->transmissions == 0 && max_send < packet_size) { |
---|
1013 | assert(state == CS_FIN_SENT || |
---|
1014 | (int32)pkt->payload <= send_quota / 100); |
---|
1015 | send_quota = send_quota - (int32)(pkt->payload * 100); |
---|
1016 | } |
---|
1017 | |
---|
1018 | pkt->need_resend = false; |
---|
1019 | |
---|
1020 | PacketFormatV1* p1 = (PacketFormatV1*)pkt->data; |
---|
1021 | PacketFormat* p = (PacketFormat*)pkt->data; |
---|
1022 | if (version == 0) { |
---|
1023 | p->ack_nr = ack_nr; |
---|
1024 | } else { |
---|
1025 | p1->ack_nr = ack_nr; |
---|
1026 | } |
---|
1027 | pkt->time_sent = UTP_GetMicroseconds(); |
---|
1028 | pkt->transmissions++; |
---|
1029 | sent_ack(); |
---|
1030 | send_data((PacketFormat*)pkt->data, pkt->length, |
---|
1031 | (state == CS_SYN_SENT) ? connect_overhead |
---|
1032 | : (pkt->transmissions == 1) ? payload_bandwidth |
---|
1033 | : retransmit_overhead); |
---|
1034 | } |
---|
1035 | |
---|
1036 | bool UTPSocket::is_writable(size_t to_write) |
---|
1037 | { |
---|
1038 | // return true if it's OK to stuff another packet into the |
---|
1039 | // outgoing queue. Since we may be using packet pacing, we |
---|
1040 | // might not actually send the packet right away to affect the |
---|
1041 | // cur_window. The only thing that happens when we add another |
---|
1042 | // packet is that cur_window_packets is increased. |
---|
1043 | size_t max_send = min(max_window, opt_sndbuf, max_window_user); |
---|
1044 | |
---|
1045 | size_t packet_size = get_packet_size(); |
---|
1046 | |
---|
1047 | if (cur_window + packet_size >= max_window) |
---|
1048 | last_maxed_out_window = g_current_ms; |
---|
1049 | |
---|
1050 | // if we don't have enough quota, we can't write regardless |
---|
1051 | if (USE_PACKET_PACING) { |
---|
1052 | if (send_quota / 100 < (int32)to_write) return false; |
---|
1053 | } |
---|
1054 | |
---|
1055 | // subtract one to save space for the FIN packet |
---|
1056 | if (cur_window_packets >= OUTGOING_BUFFER_MAX_SIZE - 1) return false; |
---|
1057 | |
---|
1058 | // if sending another packet would not make the window exceed |
---|
1059 | // the max_window, we can write |
---|
1060 | if (cur_window + packet_size <= max_send) return true; |
---|
1061 | |
---|
1062 | // if the window size is less than a packet, and we have enough |
---|
1063 | // quota to send a packet, we can write, even though it would |
---|
1064 | // make the window exceed the max size |
---|
1065 | // the last condition is needed to not put too many packets |
---|
1066 | // in the send buffer. cur_window isn't updated until we flush |
---|
1067 | // the send buffer, so we need to take the number of packets |
---|
1068 | // into account |
---|
1069 | if (USE_PACKET_PACING) { |
---|
1070 | if (max_window < to_write && |
---|
1071 | cur_window < max_window && |
---|
1072 | cur_window_packets == 0) { |
---|
1073 | return true; |
---|
1074 | } |
---|
1075 | } |
---|
1076 | |
---|
1077 | return false; |
---|
1078 | } |
---|
1079 | |
---|
1080 | bool UTPSocket::flush_packets() |
---|
1081 | { |
---|
1082 | size_t packet_size = get_packet_size(); |
---|
1083 | |
---|
1084 | // send packets that are waiting on the pacer to be sent |
---|
1085 | // i has to be an unsigned 16 bit counter to wrap correctly |
---|
1086 | // signed types are not guaranteed to wrap the way you expect |
---|
1087 | for (uint16 i = seq_nr - cur_window_packets; i != seq_nr; ++i) { |
---|
1088 | OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(i); |
---|
1089 | if (pkt == 0 || (pkt->transmissions > 0 && pkt->need_resend == false)) continue; |
---|
1090 | // have we run out of quota? |
---|
1091 | if (!is_writable(pkt->payload)) { |
---|
1092 | return true; |
---|
1093 | } |
---|
1094 | |
---|
1095 | // Nagle check |
---|
1096 | // don't send the last packet if we have one packet in-flight |
---|
1097 | // and the current packet is still smaller than packet_size. |
---|
1098 | if (i != ((seq_nr - 1) & ACK_NR_MASK) || |
---|
1099 | cur_window_packets == 1 || |
---|
1100 | pkt->payload >= packet_size) { |
---|
1101 | send_packet(pkt); |
---|
1102 | |
---|
1103 | // No need to send another ack if there is nothing to reorder. |
---|
1104 | if (reorder_count == 0) { |
---|
1105 | sent_ack(); |
---|
1106 | } |
---|
1107 | } |
---|
1108 | } |
---|
1109 | return false; |
---|
1110 | } |
---|
1111 | |
---|
1112 | void UTPSocket::write_outgoing_packet(size_t payload, uint flags) |
---|
1113 | { |
---|
1114 | // Setup initial timeout timer |
---|
1115 | if (cur_window_packets == 0) { |
---|
1116 | retransmit_timeout = rto; |
---|
1117 | rto_timeout = g_current_ms + retransmit_timeout; |
---|
1118 | assert(cur_window == 0); |
---|
1119 | } |
---|
1120 | |
---|
1121 | size_t packet_size = get_packet_size(); |
---|
1122 | do { |
---|
1123 | assert(cur_window_packets < OUTGOING_BUFFER_MAX_SIZE); |
---|
1124 | assert(flags == ST_DATA || flags == ST_FIN); |
---|
1125 | |
---|
1126 | size_t added = 0; |
---|
1127 | |
---|
1128 | OutgoingPacket *pkt = NULL; |
---|
1129 | |
---|
1130 | if (cur_window_packets > 0) { |
---|
1131 | pkt = (OutgoingPacket*)outbuf.get(seq_nr - 1); |
---|
1132 | } |
---|
1133 | |
---|
1134 | const size_t header_size = get_header_size(); |
---|
1135 | bool append = true; |
---|
1136 | |
---|
1137 | // if there's any room left in the last packet in the window |
---|
1138 | // and it hasn't been sent yet, fill that frame first |
---|
1139 | if (payload && pkt && !pkt->transmissions && pkt->payload < packet_size) { |
---|
1140 | // Use the previous unsent packet |
---|
1141 | added = min(payload + pkt->payload, max<size_t>(packet_size, pkt->payload)) - pkt->payload; |
---|
1142 | pkt = (OutgoingPacket*)realloc(pkt, |
---|
1143 | (sizeof(OutgoingPacket) - 1) + |
---|
1144 | header_size + |
---|
1145 | pkt->payload + added); |
---|
1146 | outbuf.put(seq_nr - 1, pkt); |
---|
1147 | append = false; |
---|
1148 | assert(!pkt->need_resend); |
---|
1149 | } else { |
---|
1150 | // Create the packet to send. |
---|
1151 | added = payload; |
---|
1152 | pkt = (OutgoingPacket*)malloc((sizeof(OutgoingPacket) - 1) + |
---|
1153 | header_size + |
---|
1154 | added); |
---|
1155 | pkt->payload = 0; |
---|
1156 | pkt->transmissions = 0; |
---|
1157 | pkt->need_resend = false; |
---|
1158 | } |
---|
1159 | |
---|
1160 | if (added) { |
---|
1161 | // Fill it with data from the upper layer. |
---|
1162 | func.on_write(userdata, pkt->data + header_size + pkt->payload, added); |
---|
1163 | } |
---|
1164 | pkt->payload += added; |
---|
1165 | pkt->length = header_size + pkt->payload; |
---|
1166 | |
---|
1167 | last_rcv_win = get_rcv_window(); |
---|
1168 | |
---|
1169 | PacketFormat* p = (PacketFormat*)pkt->data; |
---|
1170 | PacketFormatV1* p1 = (PacketFormatV1*)pkt->data; |
---|
1171 | if (version == 0) { |
---|
1172 | p->connid = conn_id_send; |
---|
1173 | p->ext = 0; |
---|
1174 | p->windowsize = (byte)DIV_ROUND_UP(last_rcv_win, PACKET_SIZE); |
---|
1175 | p->ack_nr = ack_nr; |
---|
1176 | p->flags = flags; |
---|
1177 | } else { |
---|
1178 | p1->set_version(1); |
---|
1179 | p1->set_type(flags); |
---|
1180 | p1->ext = 0; |
---|
1181 | p1->connid = conn_id_send; |
---|
1182 | p1->windowsize = (uint32)last_rcv_win; |
---|
1183 | p1->ack_nr = ack_nr; |
---|
1184 | } |
---|
1185 | |
---|
1186 | if (append) { |
---|
1187 | // Remember the message in the outgoing queue. |
---|
1188 | outbuf.ensure_size(seq_nr, cur_window_packets); |
---|
1189 | outbuf.put(seq_nr, pkt); |
---|
1190 | if (version == 0) p->seq_nr = seq_nr; |
---|
1191 | else p1->seq_nr = seq_nr; |
---|
1192 | seq_nr++; |
---|
1193 | cur_window_packets++; |
---|
1194 | } |
---|
1195 | |
---|
1196 | payload -= added; |
---|
1197 | |
---|
1198 | } while (payload); |
---|
1199 | |
---|
1200 | flush_packets(); |
---|
1201 | } |
---|
1202 | |
---|
1203 | void UTPSocket::update_send_quota() |
---|
1204 | { |
---|
1205 | int dt = g_current_ms - last_send_quota; |
---|
1206 | if (dt == 0) return; |
---|
1207 | last_send_quota = g_current_ms; |
---|
1208 | size_t add = max_window * dt * 100 / (rtt_hist.delay_base?rtt_hist.delay_base:50); |
---|
1209 | if (add > max_window * 100 && add > MAX_CWND_INCREASE_BYTES_PER_RTT * 100) add = max_window; |
---|
1210 | send_quota += (int32)add; |
---|
1211 | // LOG_UTPV("0x%08x: UTPSocket::update_send_quota dt:%d rtt:%u max_window:%u quota:%d", |
---|
1212 | // this, dt, rtt, (uint)max_window, send_quota / 100); |
---|
1213 | } |
---|
1214 | |
---|
1215 | #ifdef _DEBUG |
---|
1216 | void UTPSocket::check_invariant() |
---|
1217 | { |
---|
1218 | if (reorder_count > 0) { |
---|
1219 | assert(inbuf.get(ack_nr + 1) == NULL); |
---|
1220 | } |
---|
1221 | |
---|
1222 | size_t outstanding_bytes = 0; |
---|
1223 | for (int i = 0; i < cur_window_packets; ++i) { |
---|
1224 | OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1); |
---|
1225 | if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue; |
---|
1226 | outstanding_bytes += pkt->payload; |
---|
1227 | } |
---|
1228 | assert(outstanding_bytes == cur_window); |
---|
1229 | } |
---|
1230 | #endif |
---|
1231 | |
---|
1232 | void UTPSocket::check_timeouts() |
---|
1233 | { |
---|
1234 | #ifdef _DEBUG |
---|
1235 | check_invariant(); |
---|
1236 | #endif |
---|
1237 | |
---|
1238 | // this invariant should always be true |
---|
1239 | assert(cur_window_packets == 0 || outbuf.get(seq_nr - cur_window_packets)); |
---|
1240 | |
---|
1241 | LOG_UTPV("0x%08x: CheckTimeouts timeout:%d max_window:%u cur_window:%u quota:%d " |
---|
1242 | "state:%s cur_window_packets:%u bytes_since_ack:%u ack_time:%d", |
---|
1243 | this, (int)(rto_timeout - g_current_ms), (uint)max_window, (uint)cur_window, |
---|
1244 | send_quota / 100, statenames[state], cur_window_packets, |
---|
1245 | (uint)bytes_since_ack, (int)(g_current_ms - ack_time)); |
---|
1246 | |
---|
1247 | update_send_quota(); |
---|
1248 | flush_packets(); |
---|
1249 | |
---|
1250 | |
---|
1251 | if (USE_PACKET_PACING) { |
---|
1252 | // In case the new send quota made it possible to send another packet |
---|
1253 | // Mark the socket as writable. If we don't use pacing, the send |
---|
1254 | // quota does not affect if the socket is writeable |
---|
1255 | // if we don't use packet pacing, the writable event is triggered |
---|
1256 | // whenever the cur_window falls below the max_window, so we don't |
---|
1257 | // need this check then |
---|
1258 | if (state == CS_CONNECTED_FULL && is_writable(get_packet_size())) { |
---|
1259 | state = CS_CONNECTED; |
---|
1260 | LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u", |
---|
1261 | this, (uint)max_window, (uint)cur_window, send_quota / 100, (uint)get_packet_size()); |
---|
1262 | func.on_state(userdata, UTP_STATE_WRITABLE); |
---|
1263 | } |
---|
1264 | } |
---|
1265 | |
---|
1266 | switch (state) { |
---|
1267 | case CS_SYN_SENT: |
---|
1268 | case CS_CONNECTED_FULL: |
---|
1269 | case CS_CONNECTED: |
---|
1270 | case CS_FIN_SENT: { |
---|
1271 | |
---|
1272 | // Reset max window... |
---|
1273 | if ((int)(g_current_ms - zerowindow_time) >= 0 && max_window_user == 0) { |
---|
1274 | max_window_user = PACKET_SIZE; |
---|
1275 | } |
---|
1276 | |
---|
1277 | if ((int)(g_current_ms - rto_timeout) >= 0 && |
---|
1278 | (!(USE_PACKET_PACING) || cur_window_packets > 0) && |
---|
1279 | rto_timeout > 0) { |
---|
1280 | |
---|
1281 | /* |
---|
1282 | OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets); |
---|
1283 | |
---|
1284 | // If there were a lot of retransmissions, force recomputation of round trip time |
---|
1285 | if (pkt->transmissions >= 4) |
---|
1286 | rtt = 0; |
---|
1287 | */ |
---|
1288 | |
---|
1289 | // Increase RTO |
---|
1290 | const uint new_timeout = retransmit_timeout * 2; |
---|
1291 | if (new_timeout >= 30000 || (state == CS_SYN_SENT && new_timeout > 6000)) { |
---|
1292 | // more than 30 seconds with no reply. kill it. |
---|
1293 | // if we haven't even connected yet, give up sooner. 6 seconds |
---|
1294 | // means 2 tries at the following timeouts: 3, 6 seconds |
---|
1295 | if (state == CS_FIN_SENT) |
---|
1296 | state = CS_DESTROY; |
---|
1297 | else |
---|
1298 | state = CS_RESET; |
---|
1299 | func.on_error(userdata, ETIMEDOUT); |
---|
1300 | goto getout; |
---|
1301 | } |
---|
1302 | |
---|
1303 | retransmit_timeout = new_timeout; |
---|
1304 | rto_timeout = g_current_ms + new_timeout; |
---|
1305 | |
---|
1306 | // On Timeout |
---|
1307 | duplicate_ack = 0; |
---|
1308 | |
---|
1309 | // rate = min_rate |
---|
1310 | max_window = get_packet_size(); |
---|
1311 | send_quota = max<int32>((int32)max_window * 100, send_quota); |
---|
1312 | |
---|
1313 | // every packet should be considered lost |
---|
1314 | for (int i = 0; i < cur_window_packets; ++i) { |
---|
1315 | OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1); |
---|
1316 | if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue; |
---|
1317 | pkt->need_resend = true; |
---|
1318 | assert(cur_window >= pkt->payload); |
---|
1319 | cur_window -= pkt->payload; |
---|
1320 | } |
---|
1321 | |
---|
1322 | // used in parse_log.py |
---|
1323 | LOG_UTP("0x%08x: Packet timeout. Resend. seq_nr:%u. timeout:%u max_window:%u", |
---|
1324 | this, seq_nr - cur_window_packets, retransmit_timeout, (uint)max_window); |
---|
1325 | |
---|
1326 | fast_timeout = true; |
---|
1327 | timeout_seq_nr = seq_nr; |
---|
1328 | |
---|
1329 | if (cur_window_packets > 0) { |
---|
1330 | OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets); |
---|
1331 | assert(pkt); |
---|
1332 | send_quota = max<int32>((int32)pkt->length * 100, send_quota); |
---|
1333 | |
---|
1334 | // Re-send the packet. |
---|
1335 | send_packet(pkt); |
---|
1336 | } |
---|
1337 | } |
---|
1338 | |
---|
1339 | // Mark the socket as writable |
---|
1340 | if (state == CS_CONNECTED_FULL && is_writable(get_packet_size())) { |
---|
1341 | state = CS_CONNECTED; |
---|
1342 | LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u", |
---|
1343 | this, (uint)max_window, (uint)cur_window, send_quota / 100, (uint)get_packet_size()); |
---|
1344 | func.on_state(userdata, UTP_STATE_WRITABLE); |
---|
1345 | } |
---|
1346 | |
---|
1347 | if (state >= CS_CONNECTED && state <= CS_FIN_SENT) { |
---|
1348 | // Send acknowledgment packets periodically, or when the threshold is reached |
---|
1349 | if (bytes_since_ack > DELAYED_ACK_BYTE_THRESHOLD || |
---|
1350 | (int)(g_current_ms - ack_time) >= 0) { |
---|
1351 | send_ack(); |
---|
1352 | } |
---|
1353 | |
---|
1354 | if ((int)(g_current_ms - last_sent_packet) >= KEEPALIVE_INTERVAL) { |
---|
1355 | send_keep_alive(); |
---|
1356 | } |
---|
1357 | } |
---|
1358 | |
---|
1359 | break; |
---|
1360 | } |
---|
1361 | |
---|
1362 | // Close? |
---|
1363 | case CS_GOT_FIN: |
---|
1364 | case CS_DESTROY_DELAY: |
---|
1365 | if ((int)(g_current_ms - rto_timeout) >= 0) { |
---|
1366 | state = (state == CS_DESTROY_DELAY) ? CS_DESTROY : CS_RESET; |
---|
1367 | if (cur_window_packets > 0 && userdata) { |
---|
1368 | func.on_error(userdata, ECONNRESET); |
---|
1369 | } |
---|
1370 | } |
---|
1371 | break; |
---|
1372 | // prevent warning |
---|
1373 | case CS_IDLE: |
---|
1374 | case CS_RESET: |
---|
1375 | case CS_DESTROY: |
---|
1376 | break; |
---|
1377 | } |
---|
1378 | |
---|
1379 | getout: |
---|
1380 | |
---|
1381 | // make sure we don't accumulate quota when we don't have |
---|
1382 | // anything to send |
---|
1383 | int32 limit = max<int32>((int32)max_window / 2, 5 * (int32)get_packet_size()) * 100; |
---|
1384 | if (send_quota > limit) send_quota = limit; |
---|
1385 | } |
---|
1386 | |
---|
1387 | // returns: |
---|
1388 | // 0: the packet was acked. |
---|
1389 | // 1: it means that the packet had already been acked |
---|
1390 | // 2: the packet has not been sent yet |
---|
1391 | int UTPSocket::ack_packet(uint16 seq) |
---|
1392 | { |
---|
1393 | OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq); |
---|
1394 | |
---|
1395 | // the packet has already been acked (or not sent) |
---|
1396 | if (pkt == NULL) { |
---|
1397 | LOG_UTPV("0x%08x: got ack for:%u (already acked, or never sent)", this, seq); |
---|
1398 | return 1; |
---|
1399 | } |
---|
1400 | |
---|
1401 | // can't ack packets that haven't been sent yet! |
---|
1402 | if (pkt->transmissions == 0) { |
---|
1403 | LOG_UTPV("0x%08x: got ack for:%u (never sent, pkt_size:%u need_resend:%u)", |
---|
1404 | this, seq, (uint)pkt->payload, pkt->need_resend); |
---|
1405 | return 2; |
---|
1406 | } |
---|
1407 | |
---|
1408 | LOG_UTPV("0x%08x: got ack for:%u (pkt_size:%u need_resend:%u)", |
---|
1409 | this, seq, (uint)pkt->payload, pkt->need_resend); |
---|
1410 | |
---|
1411 | outbuf.put(seq, NULL); |
---|
1412 | |
---|
1413 | // if we never re-sent the packet, update the RTT estimate |
---|
1414 | if (pkt->transmissions == 1) { |
---|
1415 | // Estimate the round trip time. |
---|
1416 | const uint32 ertt = (uint32)((UTP_GetMicroseconds() - pkt->time_sent) / 1000); |
---|
1417 | if (rtt == 0) { |
---|
1418 | // First round trip time sample |
---|
1419 | rtt = ertt; |
---|
1420 | rtt_var = ertt / 2; |
---|
1421 | // sanity check. rtt should never be more than 6 seconds |
---|
1422 | // assert(rtt < 6000); |
---|
1423 | } else { |
---|
1424 | // Compute new round trip times |
---|
1425 | const int delta = (int)rtt - ertt; |
---|
1426 | rtt_var = rtt_var + (int)(abs(delta) - rtt_var) / 4; |
---|
1427 | rtt = rtt - rtt/8 + ertt/8; |
---|
1428 | // sanity check. rtt should never be more than 6 seconds |
---|
1429 | // assert(rtt < 6000); |
---|
1430 | rtt_hist.add_sample(ertt); |
---|
1431 | } |
---|
1432 | rto = max<uint>(rtt + rtt_var * 4, 500); |
---|
1433 | LOG_UTPV("0x%08x: rtt:%u avg:%u var:%u rto:%u", |
---|
1434 | this, ertt, rtt, rtt_var, rto); |
---|
1435 | } |
---|
1436 | retransmit_timeout = rto; |
---|
1437 | rto_timeout = g_current_ms + rto; |
---|
1438 | // if need_resend is set, this packet has already |
---|
1439 | // been considered timed-out, and is not included in |
---|
1440 | // the cur_window anymore |
---|
1441 | if (!pkt->need_resend) { |
---|
1442 | assert(cur_window >= pkt->payload); |
---|
1443 | cur_window -= pkt->payload; |
---|
1444 | } |
---|
1445 | free(pkt); |
---|
1446 | return 0; |
---|
1447 | } |
---|
1448 | |
---|
1449 | // count the number of bytes that were acked by the EACK header |
---|
1450 | size_t UTPSocket::selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt) |
---|
1451 | { |
---|
1452 | if (cur_window_packets == 0) return 0; |
---|
1453 | |
---|
1454 | size_t acked_bytes = 0; |
---|
1455 | int bits = len * 8; |
---|
1456 | |
---|
1457 | do { |
---|
1458 | uint v = base + bits; |
---|
1459 | |
---|
1460 | // ignore bits that haven't been sent yet |
---|
1461 | // see comment in UTPSocket::selective_ack |
---|
1462 | if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1)) |
---|
1463 | continue; |
---|
1464 | |
---|
1465 | // ignore bits that represents packets we haven't sent yet |
---|
1466 | // or packets that have already been acked |
---|
1467 | OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v); |
---|
1468 | if (!pkt || pkt->transmissions == 0) |
---|
1469 | continue; |
---|
1470 | |
---|
1471 | // Count the number of segments that were successfully received past it. |
---|
1472 | if (bits >= 0 && mask[bits>>3] & (1 << (bits & 7))) { |
---|
1473 | assert((int)(pkt->payload) >= 0); |
---|
1474 | acked_bytes += pkt->payload; |
---|
1475 | min_rtt = min<int64>(min_rtt, UTP_GetMicroseconds() - pkt->time_sent); |
---|
1476 | continue; |
---|
1477 | } |
---|
1478 | } while (--bits >= -1); |
---|
1479 | return acked_bytes; |
---|
1480 | } |
---|
1481 | |
---|
1482 | void UTPSocket::selective_ack(uint base, const byte *mask, byte len) |
---|
1483 | { |
---|
1484 | if (cur_window_packets == 0) return; |
---|
1485 | |
---|
1486 | // the range is inclusive [0, 31] bits |
---|
1487 | int bits = len * 8 - 1; |
---|
1488 | |
---|
1489 | int count = 0; |
---|
1490 | |
---|
1491 | // resends is a stack of sequence numbers we need to resend. Since we |
---|
1492 | // iterate in reverse over the acked packets, at the end, the top packets |
---|
1493 | // are the ones we want to resend |
---|
1494 | int resends[32]; |
---|
1495 | int nr = 0; |
---|
1496 | |
---|
1497 | LOG_UTPV("0x%08x: Got EACK [%032b] base:%u", this, *(uint32*)mask, base); |
---|
1498 | do { |
---|
1499 | // we're iterating over the bits from higher sequence numbers |
---|
1500 | // to lower (kind of in reverse order, wich might not be very |
---|
1501 | // intuitive) |
---|
1502 | uint v = base + bits; |
---|
1503 | |
---|
1504 | // ignore bits that haven't been sent yet |
---|
1505 | // and bits that fall below the ACKed sequence number |
---|
1506 | // this can happen if an EACK message gets |
---|
1507 | // reordered and arrives after a packet that ACKs up past |
---|
1508 | // the base for thie EACK message |
---|
1509 | |
---|
1510 | // this is essentially the same as: |
---|
1511 | // if v >= seq_nr || v <= seq_nr - cur_window_packets |
---|
1512 | // but it takes wrapping into account |
---|
1513 | |
---|
1514 | // if v == seq_nr the -1 will make it wrap. if v > seq_nr |
---|
1515 | // it will also wrap (since it will fall further below 0) |
---|
1516 | // and be > cur_window_packets. |
---|
1517 | // if v == seq_nr - cur_window_packets, the result will be |
---|
1518 | // seq_nr - (seq_nr - cur_window_packets) - 1 |
---|
1519 | // == seq_nr - seq_nr + cur_window_packets - 1 |
---|
1520 | // == cur_window_packets - 1 which will be caught by the |
---|
1521 | // test. If v < seq_nr - cur_window_packets the result will grow |
---|
1522 | // fall furhter outside of the cur_window_packets range. |
---|
1523 | |
---|
1524 | // sequence number space: |
---|
1525 | // |
---|
1526 | // rejected < accepted > rejected |
---|
1527 | // <============+--------------+============> |
---|
1528 | // ^ ^ |
---|
1529 | // | | |
---|
1530 | // (seq_nr-wnd) seq_nr |
---|
1531 | |
---|
1532 | if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1)) |
---|
1533 | continue; |
---|
1534 | |
---|
1535 | // this counts as a duplicate ack, even though we might have |
---|
1536 | // received an ack for this packet previously (in another EACK |
---|
1537 | // message for instance) |
---|
1538 | bool bit_set = bits >= 0 && mask[bits>>3] & (1 << (bits & 7)); |
---|
1539 | |
---|
1540 | // if this packet is acked, it counts towards the duplicate ack counter |
---|
1541 | if (bit_set) count++; |
---|
1542 | |
---|
1543 | // ignore bits that represents packets we haven't sent yet |
---|
1544 | // or packets that have already been acked |
---|
1545 | OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v); |
---|
1546 | if (!pkt || pkt->transmissions == 0) { |
---|
1547 | LOG_UTPV("0x%08x: skipping %u. pkt:%08x transmissions:%u %s", |
---|
1548 | this, v, pkt, pkt?pkt->transmissions:0, pkt?"(not sent yet?)":"(already acked?)"); |
---|
1549 | continue; |
---|
1550 | } |
---|
1551 | |
---|
1552 | // Count the number of segments that were successfully received past it. |
---|
1553 | if (bit_set) { |
---|
1554 | // the selective ack should never ACK the packet we're waiting for to decrement cur_window_packets |
---|
1555 | assert((v & outbuf.mask) != ((seq_nr - cur_window_packets) & outbuf.mask)); |
---|
1556 | ack_packet(v); |
---|
1557 | continue; |
---|
1558 | } |
---|
1559 | |
---|
1560 | // Resend segments |
---|
1561 | // if count is less than our re-send limit, we haven't seen enough |
---|
1562 | // acked packets in front of this one to warrant a re-send. |
---|
1563 | // if count == 0, we're still going through the tail of zeroes |
---|
1564 | if (((v - fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE && |
---|
1565 | count >= DUPLICATE_ACKS_BEFORE_RESEND && |
---|
1566 | duplicate_ack < DUPLICATE_ACKS_BEFORE_RESEND) { |
---|
1567 | resends[nr++] = v; |
---|
1568 | LOG_UTPV("0x%08x: no ack for %u", this, v); |
---|
1569 | } else { |
---|
1570 | LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u", |
---|
1571 | this, v, count, duplicate_ack, fast_resend_seq_nr); |
---|
1572 | } |
---|
1573 | } while (--bits >= -1); |
---|
1574 | |
---|
1575 | if (((base - 1 - fast_resend_seq_nr) & ACK_NR_MASK) < 256 && |
---|
1576 | count >= DUPLICATE_ACKS_BEFORE_RESEND && |
---|
1577 | duplicate_ack < DUPLICATE_ACKS_BEFORE_RESEND) { |
---|
1578 | // if we get enough duplicate acks to start |
---|
1579 | // resending, the first packet we should resend |
---|
1580 | // is base-1 |
---|
1581 | resends[nr++] = base - 1; |
---|
1582 | } else { |
---|
1583 | LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u", |
---|
1584 | this, base - 1, count, duplicate_ack, fast_resend_seq_nr); |
---|
1585 | } |
---|
1586 | |
---|
1587 | bool back_off = false; |
---|
1588 | int i = 0; |
---|
1589 | while (nr > 0) { |
---|
1590 | uint v = resends[--nr]; |
---|
1591 | // don't consider the tail of 0:es to be lost packets |
---|
1592 | // only unacked packets with acked packets after should |
---|
1593 | // be considered lost |
---|
1594 | OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v); |
---|
1595 | |
---|
1596 | // this may be an old (re-ordered) packet, and some of the |
---|
1597 | // packets in here may have been acked already. In which |
---|
1598 | // case they will not be in the send queue anymore |
---|
1599 | if (!pkt) continue; |
---|
1600 | |
---|
1601 | // used in parse_log.py |
---|
1602 | LOG_UTP("0x%08x: Packet %u lost. Resending", this, v); |
---|
1603 | |
---|
1604 | // On Loss |
---|
1605 | back_off = true; |
---|
1606 | #ifdef _DEBUG |
---|
1607 | ++_stats._rexmit; |
---|
1608 | #endif |
---|
1609 | send_packet(pkt); |
---|
1610 | fast_resend_seq_nr = v + 1; |
---|
1611 | |
---|
1612 | // Re-send max 4 packets. |
---|
1613 | if (++i >= 4) break; |
---|
1614 | } |
---|
1615 | |
---|
1616 | if (back_off) |
---|
1617 | maybe_decay_win(); |
---|
1618 | |
---|
1619 | duplicate_ack = count; |
---|
1620 | } |
---|
1621 | |
---|
1622 | void UTPSocket::apply_ledbat_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt) |
---|
1623 | { |
---|
1624 | // the delay can never be greater than the rtt. The min_rtt |
---|
1625 | // variable is the RTT in microseconds |
---|
1626 | |
---|
1627 | assert(min_rtt >= 0); |
---|
1628 | int32 our_delay = min<uint32>(our_hist.get_value(), uint32(min_rtt)); |
---|
1629 | assert(our_delay != INT_MAX); |
---|
1630 | assert(our_delay >= 0); |
---|
1631 | |
---|
1632 | SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(); |
---|
1633 | UTP_DelaySample((sockaddr*)&sa, our_delay / 1000); |
---|
1634 | |
---|
1635 | // This test the connection under heavy load from foreground |
---|
1636 | // traffic. Pretend that our delays are very high to force the |
---|
1637 | // connection to use sub-packet size window sizes |
---|
1638 | //our_delay *= 4; |
---|
1639 | |
---|
1640 | // target is microseconds |
---|
1641 | int target = CCONTROL_TARGET; |
---|
1642 | if (target <= 0) target = 100000; |
---|
1643 | |
---|
1644 | double off_target = target - our_delay; |
---|
1645 | |
---|
1646 | // this is the same as: |
---|
1647 | // |
---|
1648 | // (min(off_target, target) / target) * (bytes_acked / max_window) * MAX_CWND_INCREASE_BYTES_PER_RTT |
---|
1649 | // |
---|
1650 | // so, it's scaling the max increase by the fraction of the window this ack represents, and the fraction |
---|
1651 | // of the target delay the current delay represents. |
---|
1652 | // The min() around off_target protects against crazy values of our_delay, which may happen when th |
---|
1653 | // timestamps wraps, or by just having a malicious peer sending garbage. This caps the increase |
---|
1654 | // of the window size to MAX_CWND_INCREASE_BYTES_PER_RTT per rtt. |
---|
1655 | // as for large negative numbers, this direction is already capped at the min packet size further down |
---|
1656 | // the min around the bytes_acked protects against the case where the window size was recently |
---|
1657 | // shrunk and the number of acked bytes exceeds that. This is considered no more than one full |
---|
1658 | // window, in order to keep the gain within sane boundries. |
---|
1659 | |
---|
1660 | assert(bytes_acked > 0); |
---|
1661 | double window_factor = (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked); |
---|
1662 | double delay_factor = off_target / target; |
---|
1663 | double scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT * window_factor * delay_factor; |
---|
1664 | |
---|
1665 | // since MAX_CWND_INCREASE_BYTES_PER_RTT is a cap on how much the window size (max_window) |
---|
1666 | // may increase per RTT, we may not increase the window size more than that proportional |
---|
1667 | // to the number of bytes that were acked, so that once one window has been acked (one rtt) |
---|
1668 | // the increase limit is not exceeded |
---|
1669 | // the +1. is to allow for floating point imprecision |
---|
1670 | assert(scaled_gain <= 1. + MAX_CWND_INCREASE_BYTES_PER_RTT * (int)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked)); |
---|
1671 | |
---|
1672 | if (scaled_gain > 0 && g_current_ms - last_maxed_out_window > 300) { |
---|
1673 | // if it was more than 300 milliseconds since we tried to send a packet |
---|
1674 | // and stopped because we hit the max window, we're most likely rate |
---|
1675 | // limited (which prevents us from ever hitting the window size) |
---|
1676 | // if this is the case, we cannot let the max_window grow indefinitely |
---|
1677 | scaled_gain = 0; |
---|
1678 | } |
---|
1679 | |
---|
1680 | if (scaled_gain + max_window < MIN_WINDOW_SIZE) { |
---|
1681 | max_window = MIN_WINDOW_SIZE; |
---|
1682 | } else { |
---|
1683 | max_window = (size_t)(max_window + scaled_gain); |
---|
1684 | } |
---|
1685 | |
---|
1686 | // make sure that the congestion window is below max |
---|
1687 | // make sure that we don't shrink our window too small |
---|
1688 | max_window = clamp<size_t>(max_window, MIN_WINDOW_SIZE, opt_sndbuf); |
---|
1689 | |
---|
1690 | // used in parse_log.py |
---|
1691 | LOG_UTP("0x%08x: actual_delay:%u our_delay:%d their_delay:%u off_target:%d max_window:%u " |
---|
1692 | "delay_base:%u delay_sum:%d target_delay:%d acked_bytes:%u cur_window:%u " |
---|
1693 | "scaled_gain:%f rtt:%u rate:%u quota:%d wnduser:%u rto:%u timeout:%d get_microseconds:"I64u" " |
---|
1694 | "cur_window_packets:%u packet_size:%u their_delay_base:%u their_actual_delay:%u", |
---|
1695 | this, actual_delay, our_delay / 1000, their_hist.get_value() / 1000, |
---|
1696 | (int)off_target / 1000, (uint)(max_window), our_hist.delay_base, |
---|
1697 | (our_delay + their_hist.get_value()) / 1000, target / 1000, (uint)bytes_acked, |
---|
1698 | (uint)(cur_window - bytes_acked), (float)(scaled_gain), rtt, |
---|
1699 | (uint)(max_window * 1000 / (rtt_hist.delay_base?rtt_hist.delay_base:50)), |
---|
1700 | send_quota / 100, (uint)max_window_user, rto, (int)(rto_timeout - g_current_ms), |
---|
1701 | UTP_GetMicroseconds(), cur_window_packets, (uint)get_packet_size(), |
---|
1702 | their_hist.delay_base, their_hist.delay_base + their_hist.get_value()); |
---|
1703 | } |
---|
1704 | |
---|
1705 | static void UTP_RegisterRecvPacket(UTPSocket *conn, size_t len) |
---|
1706 | { |
---|
1707 | #ifdef _DEBUG |
---|
1708 | ++conn->_stats._nrecv; |
---|
1709 | conn->_stats._nbytes_recv += len; |
---|
1710 | #endif |
---|
1711 | |
---|
1712 | if (len <= PACKET_SIZE_MID) { |
---|
1713 | if (len <= PACKET_SIZE_EMPTY) { |
---|
1714 | _global_stats._nraw_recv[PACKET_SIZE_EMPTY_BUCKET]++; |
---|
1715 | } else if (len <= PACKET_SIZE_SMALL) { |
---|
1716 | _global_stats._nraw_recv[PACKET_SIZE_SMALL_BUCKET]++; |
---|
1717 | } else |
---|
1718 | _global_stats._nraw_recv[PACKET_SIZE_MID_BUCKET]++; |
---|
1719 | } else { |
---|
1720 | if (len <= PACKET_SIZE_BIG) { |
---|
1721 | _global_stats._nraw_recv[PACKET_SIZE_BIG_BUCKET]++; |
---|
1722 | } else |
---|
1723 | _global_stats._nraw_recv[PACKET_SIZE_HUGE_BUCKET]++; |
---|
1724 | } |
---|
1725 | } |
---|
1726 | |
---|
1727 | // returns the max number of bytes of payload the uTP |
---|
1728 | // connection is allowed to send |
---|
1729 | size_t UTPSocket::get_packet_size() |
---|
1730 | { |
---|
1731 | int header_size = version == 1 |
---|
1732 | ? sizeof(PacketFormatV1) |
---|
1733 | : sizeof(PacketFormat); |
---|
1734 | |
---|
1735 | size_t mtu = get_udp_mtu(); |
---|
1736 | |
---|
1737 | if (DYNAMIC_PACKET_SIZE_ENABLED) { |
---|
1738 | SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(); |
---|
1739 | size_t max_packet_size = UTP_GetPacketSize((sockaddr*)&sa); |
---|
1740 | return min(mtu - header_size, max_packet_size); |
---|
1741 | } |
---|
1742 | else |
---|
1743 | { |
---|
1744 | return mtu - header_size; |
---|
1745 | } |
---|
1746 | } |
---|
1747 | |
---|
1748 | // Process an incoming packet |
---|
1749 | // syn is true if this is the first packet received. It will cut off parsing |
---|
1750 | // as soon as the header is done |
---|
1751 | size_t UTP_ProcessIncoming(UTPSocket *conn, const byte *packet, size_t len, bool syn = false) |
---|
1752 | { |
---|
1753 | UTP_RegisterRecvPacket(conn, len); |
---|
1754 | |
---|
1755 | g_current_ms = UTP_GetMilliseconds(); |
---|
1756 | |
---|
1757 | conn->update_send_quota(); |
---|
1758 | |
---|
1759 | const PacketFormat *pf = (PacketFormat*)packet; |
---|
1760 | const PacketFormatV1 *pf1 = (PacketFormatV1*)packet; |
---|
1761 | const byte *packet_end = packet + len; |
---|
1762 | |
---|
1763 | uint16 pk_seq_nr; |
---|
1764 | uint16 pk_ack_nr; |
---|
1765 | uint8 pk_flags; |
---|
1766 | if (conn->version == 0) { |
---|
1767 | pk_seq_nr = pf->seq_nr; |
---|
1768 | pk_ack_nr = pf->ack_nr; |
---|
1769 | pk_flags = pf->flags; |
---|
1770 | } else { |
---|
1771 | pk_seq_nr = pf1->seq_nr; |
---|
1772 | pk_ack_nr = pf1->ack_nr; |
---|
1773 | pk_flags = pf1->type(); |
---|
1774 | } |
---|
1775 | |
---|
1776 | if (pk_flags >= ST_NUM_STATES) return 0; |
---|
1777 | |
---|
1778 | LOG_UTPV("0x%08x: Got %s. seq_nr:%u ack_nr:%u state:%s version:%u timestamp:"I64u" reply_micro:%u", |
---|
1779 | conn, flagnames[pk_flags], pk_seq_nr, pk_ack_nr, statenames[conn->state], conn->version, |
---|
1780 | conn->version == 0?(uint64)(pf->tv_sec) * 1000000 + pf->tv_usec:uint64(pf1->tv_usec), |
---|
1781 | conn->version == 0?(uint32)(pf->reply_micro):(uint32)(pf1->reply_micro)); |
---|
1782 | |
---|
1783 | // mark receipt time |
---|
1784 | uint64 time = UTP_GetMicroseconds(); |
---|
1785 | |
---|
1786 | // RSTs are handled earlier, since the connid matches the send id not the recv id |
---|
1787 | assert(pk_flags != ST_RESET); |
---|
1788 | |
---|
1789 | // TODO: maybe send a ST_RESET if we're in CS_RESET? |
---|
1790 | |
---|
1791 | const byte *selack_ptr = NULL; |
---|
1792 | |
---|
1793 | // Unpack UTP packet options |
---|
1794 | // Data pointer |
---|
1795 | const byte *data = (const byte*)pf + conn->get_header_size(); |
---|
1796 | if (conn->get_header_size() > len) { |
---|
1797 | LOG_UTPV("0x%08x: Invalid packet size (less than header size)", conn); |
---|
1798 | return 0; |
---|
1799 | } |
---|
1800 | // Skip the extension headers |
---|
1801 | uint extension = conn->version == 0 ? pf->ext : pf1->ext; |
---|
1802 | if (extension != 0) { |
---|
1803 | do { |
---|
1804 | // Verify that the packet is valid. |
---|
1805 | data += 2; |
---|
1806 | |
---|
1807 | if ((int)(packet_end - data) < 0 || (int)(packet_end - data) < data[-1]) { |
---|
1808 | LOG_UTPV("0x%08x: Invalid len of extensions", conn); |
---|
1809 | return 0; |
---|
1810 | } |
---|
1811 | |
---|
1812 | switch(extension) { |
---|
1813 | case 1: // Selective Acknowledgment |
---|
1814 | selack_ptr = data; |
---|
1815 | break; |
---|
1816 | case 2: // extension bits |
---|
1817 | if (data[-1] != 8) { |
---|
1818 | LOG_UTPV("0x%08x: Invalid len of extension bits header", conn); |
---|
1819 | return 0; |
---|
1820 | } |
---|
1821 | memcpy(conn->extensions, data, 8); |
---|
1822 | LOG_UTPV("0x%08x: got extension bits:%02x%02x%02x%02x%02x%02x%02x%02x", conn, |
---|
1823 | conn->extensions[0], conn->extensions[1], conn->extensions[2], conn->extensions[3], |
---|
1824 | conn->extensions[4], conn->extensions[5], conn->extensions[6], conn->extensions[7]); |
---|
1825 | } |
---|
1826 | extension = data[-2]; |
---|
1827 | data += data[-1]; |
---|
1828 | } while (extension); |
---|
1829 | } |
---|
1830 | |
---|
1831 | if (conn->state == CS_SYN_SENT) { |
---|
1832 | // if this is a syn-ack, initialize our ack_nr |
---|
1833 | // to match the sequence number we got from |
---|
1834 | // the other end |
---|
1835 | conn->ack_nr = (pk_seq_nr - 1) & SEQ_NR_MASK; |
---|
1836 | } |
---|
1837 | |
---|
1838 | g_current_ms = UTP_GetMilliseconds(); |
---|
1839 | conn->last_got_packet = g_current_ms; |
---|
1840 | |
---|
1841 | if (syn) { |
---|
1842 | return 0; |
---|
1843 | } |
---|
1844 | |
---|
1845 | // seqnr is the number of packets past the expected |
---|
1846 | // packet this is. ack_nr is the last acked, seq_nr is the |
---|
1847 | // current. Subtracring 1 makes 0 mean "this is the next |
---|
1848 | // expected packet". |
---|
1849 | const uint seqnr = (pk_seq_nr - conn->ack_nr - 1) & SEQ_NR_MASK; |
---|
1850 | |
---|
1851 | // Getting an invalid sequence number? |
---|
1852 | if (seqnr >= REORDER_BUFFER_MAX_SIZE) { |
---|
1853 | if (seqnr >= (SEQ_NR_MASK + 1) - REORDER_BUFFER_MAX_SIZE && pk_flags != ST_STATE) { |
---|
1854 | conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD); |
---|
1855 | } |
---|
1856 | LOG_UTPV(" Got old Packet/Ack (%u/%u)=%u!", pk_seq_nr, conn->ack_nr, seqnr); |
---|
1857 | return 0; |
---|
1858 | } |
---|
1859 | |
---|
1860 | // Process acknowledgment |
---|
1861 | // acks is the number of packets that was acked |
---|
1862 | int acks = (pk_ack_nr - (conn->seq_nr - 1 - conn->cur_window_packets)) & ACK_NR_MASK; |
---|
1863 | |
---|
1864 | // this happens when we receive an old ack nr |
---|
1865 | if (acks > conn->cur_window_packets) acks = 0; |
---|
1866 | |
---|
1867 | // if we get the same ack_nr as in the last packet |
---|
1868 | // increase the duplicate_ack counter, otherwise reset |
---|
1869 | // it to 0 |
---|
1870 | if (conn->cur_window_packets > 0) { |
---|
1871 | if (pk_ack_nr == ((conn->seq_nr - conn->cur_window_packets - 1) & ACK_NR_MASK) && |
---|
1872 | conn->cur_window_packets > 0) { |
---|
1873 | //++conn->duplicate_ack; |
---|
1874 | } else { |
---|
1875 | conn->duplicate_ack = 0; |
---|
1876 | } |
---|
1877 | |
---|
1878 | // TODO: if duplicate_ack == DUPLICATE_ACK_BEFORE_RESEND |
---|
1879 | // and fast_resend_seq_nr <= ack_nr + 1 |
---|
1880 | // resend ack_nr + 1 |
---|
1881 | } |
---|
1882 | |
---|
1883 | // figure out how many bytes were acked |
---|
1884 | size_t acked_bytes = 0; |
---|
1885 | |
---|
1886 | // the minimum rtt of all acks |
---|
1887 | // this is the upper limit on the delay we get back |
---|
1888 | // from the other peer. Our delay cannot exceed |
---|
1889 | // the rtt of the packet. If it does, clamp it. |
---|
1890 | // this is done in apply_ledbat_ccontrol() |
---|
1891 | int64 min_rtt = INT64_MAX; |
---|
1892 | |
---|
1893 | for (int i = 0; i < acks; ++i) { |
---|
1894 | int seq = conn->seq_nr - conn->cur_window_packets + i; |
---|
1895 | OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(seq); |
---|
1896 | if (pkt == 0 || pkt->transmissions == 0) continue; |
---|
1897 | assert((int)(pkt->payload) >= 0); |
---|
1898 | acked_bytes += pkt->payload; |
---|
1899 | min_rtt = min<int64>(min_rtt, UTP_GetMicroseconds() - pkt->time_sent); |
---|
1900 | } |
---|
1901 | |
---|
1902 | // count bytes acked by EACK |
---|
1903 | if (selack_ptr != NULL) { |
---|
1904 | acked_bytes += conn->selective_ack_bytes((pk_ack_nr + 2) & ACK_NR_MASK, |
---|
1905 | selack_ptr, selack_ptr[-1], min_rtt); |
---|
1906 | } |
---|
1907 | |
---|
1908 | LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%d cur_window:%u cur_window_packets:%u relative_seqnr:%u max_window:%u min_rtt:%u rtt:%u", |
---|
1909 | conn, acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets, |
---|
1910 | seqnr, (uint)conn->max_window, (uint)(min_rtt / 1000), conn->rtt); |
---|
1911 | |
---|
1912 | uint64 p; |
---|
1913 | |
---|
1914 | if (conn->version == 0) { |
---|
1915 | p = uint64(pf->tv_sec) * 1000000 + pf->tv_usec; |
---|
1916 | } else { |
---|
1917 | p = pf1->tv_usec; |
---|
1918 | } |
---|
1919 | |
---|
1920 | conn->last_measured_delay = g_current_ms; |
---|
1921 | |
---|
1922 | // get delay in both directions |
---|
1923 | // record the delay to report back |
---|
1924 | const uint32 their_delay = (uint32)(p == 0 ? 0 : time - p); |
---|
1925 | conn->reply_micro = their_delay; |
---|
1926 | uint32 prev_delay_base = conn->their_hist.delay_base; |
---|
1927 | if (their_delay != 0) conn->their_hist.add_sample(their_delay); |
---|
1928 | |
---|
1929 | // if their new delay base is less than their previous one |
---|
1930 | // we should shift our delay base in the other direction in order |
---|
1931 | // to take the clock skew into account |
---|
1932 | if (prev_delay_base != 0 && |
---|
1933 | wrapping_compare_less(conn->their_hist.delay_base, prev_delay_base)) { |
---|
1934 | // never adjust more than 10 milliseconds |
---|
1935 | if (prev_delay_base - conn->their_hist.delay_base <= 10000) { |
---|
1936 | conn->our_hist.shift(prev_delay_base - conn->their_hist.delay_base); |
---|
1937 | } |
---|
1938 | } |
---|
1939 | |
---|
1940 | const uint32 actual_delay = conn->version==0 |
---|
1941 | ?(pf->reply_micro==INT_MAX?0:uint32(pf->reply_micro)) |
---|
1942 | :(uint32(pf1->reply_micro)==INT_MAX?0:uint32(pf1->reply_micro)); |
---|
1943 | |
---|
1944 | // if the actual delay is 0, it means the other end |
---|
1945 | // hasn't received a sample from us yet, and doesn't |
---|
1946 | // know what it is. We can't update out history unless |
---|
1947 | // we have a true measured sample |
---|
1948 | prev_delay_base = conn->our_hist.delay_base; |
---|
1949 | if (actual_delay != 0) conn->our_hist.add_sample(actual_delay); |
---|
1950 | |
---|
1951 | // if our new delay base is less than our previous one |
---|
1952 | // we should shift the other end's delay base in the other |
---|
1953 | // direction in order to take the clock skew into account |
---|
1954 | // This is commented out because it creates bad interactions |
---|
1955 | // with our adjustment in the other direction. We don't really |
---|
1956 | // need our estimates of the other peer to be very accurate |
---|
1957 | // anyway. The problem with shifting here is that we're more |
---|
1958 | // likely shift it back later because of a low latency. This |
---|
1959 | // second shift back would cause us to shift our delay base |
---|
1960 | // which then get's into a death spiral of shifting delay bases |
---|
1961 | /* if (prev_delay_base != 0 && |
---|
1962 | wrapping_compare_less(conn->our_hist.delay_base, prev_delay_base)) { |
---|
1963 | // never adjust more than 10 milliseconds |
---|
1964 | if (prev_delay_base - conn->our_hist.delay_base <= 10000) { |
---|
1965 | conn->their_hist.Shift(prev_delay_base - conn->our_hist.delay_base); |
---|
1966 | } |
---|
1967 | } |
---|
1968 | */ |
---|
1969 | |
---|
1970 | // if the delay estimate exceeds the RTT, adjust the base_delay to |
---|
1971 | // compensate |
---|
1972 | if (conn->our_hist.get_value() > uint32(min_rtt)) { |
---|
1973 | conn->our_hist.shift(conn->our_hist.get_value() - min_rtt); |
---|
1974 | } |
---|
1975 | |
---|
1976 | // only apply the congestion controller on acks |
---|
1977 | // if we don't have a delay measurement, there's |
---|
1978 | // no point in invoking the congestion control |
---|
1979 | if (actual_delay != 0 && acked_bytes >= 1) |
---|
1980 | conn->apply_ledbat_ccontrol(acked_bytes, actual_delay, min_rtt); |
---|
1981 | |
---|
1982 | // sanity check, the other end should never ack packets |
---|
1983 | // past the point we've sent |
---|
1984 | if (acks <= conn->cur_window_packets) { |
---|
1985 | conn->max_window_user = conn->version == 0 |
---|
1986 | ? pf->windowsize * PACKET_SIZE : pf1->windowsize; |
---|
1987 | |
---|
1988 | // If max user window is set to 0, then we startup a timer |
---|
1989 | // That will reset it to 1 after 15 seconds. |
---|
1990 | if (conn->max_window_user == 0) |
---|
1991 | // Reset max_window_user to 1 every 15 seconds. |
---|
1992 | conn->zerowindow_time = g_current_ms + 15000; |
---|
1993 | |
---|
1994 | // Respond to connect message |
---|
1995 | // Switch to CONNECTED state. |
---|
1996 | if (conn->state == CS_SYN_SENT) { |
---|
1997 | conn->state = CS_CONNECTED; |
---|
1998 | conn->func.on_state(conn->userdata, UTP_STATE_CONNECT); |
---|
1999 | |
---|
2000 | // We've sent a fin, and everything was ACKed (including the FIN), |
---|
2001 | // it's safe to destroy the socket. cur_window_packets == acks |
---|
2002 | // means that this packet acked all the remaining packets that |
---|
2003 | // were in-flight. |
---|
2004 | } else if (conn->state == CS_FIN_SENT && conn->cur_window_packets == acks) { |
---|
2005 | conn->state = CS_DESTROY; |
---|
2006 | } |
---|
2007 | |
---|
2008 | // Update fast resend counter |
---|
2009 | if (wrapping_compare_less(conn->fast_resend_seq_nr, (pk_ack_nr + 1) & ACK_NR_MASK)) |
---|
2010 | conn->fast_resend_seq_nr = pk_ack_nr + 1; |
---|
2011 | |
---|
2012 | LOG_UTPV("0x%08x: fast_resend_seq_nr:%u", conn, conn->fast_resend_seq_nr); |
---|
2013 | |
---|
2014 | for (int i = 0; i < acks; ++i) { |
---|
2015 | int ack_status = conn->ack_packet(conn->seq_nr - conn->cur_window_packets); |
---|
2016 | // if ack_status is 0, the packet was acked. |
---|
2017 | // if acl_stauts is 1, it means that the packet had already been acked |
---|
2018 | // if it's 2, the packet has not been sent yet |
---|
2019 | // We need to break this loop in the latter case. This could potentially |
---|
2020 | // happen if we get an ack_nr that does not exceed what we have stuffed |
---|
2021 | // into the outgoing buffer, but does exceed what we have sent |
---|
2022 | if (ack_status == 2) { |
---|
2023 | #ifdef _DEBUG |
---|
2024 | OutgoingPacket* pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets); |
---|
2025 | assert(pkt->transmissions == 0); |
---|
2026 | #endif |
---|
2027 | break; |
---|
2028 | } |
---|
2029 | conn->cur_window_packets--; |
---|
2030 | } |
---|
2031 | #ifdef _DEBUG |
---|
2032 | if (conn->cur_window_packets == 0) assert(conn->cur_window == 0); |
---|
2033 | #endif |
---|
2034 | |
---|
2035 | // packets in front of this may have been acked by a |
---|
2036 | // selective ack (EACK). Keep decreasing the window packet size |
---|
2037 | // until we hit a packet that is still waiting to be acked |
---|
2038 | // in the send queue |
---|
2039 | // this is especially likely to happen when the other end |
---|
2040 | // has the EACK send bug older versions of uTP had |
---|
2041 | while (conn->cur_window_packets > 0 && !conn->outbuf.get(conn->seq_nr - conn->cur_window_packets)) |
---|
2042 | conn->cur_window_packets--; |
---|
2043 | |
---|
2044 | #ifdef _DEBUG |
---|
2045 | if (conn->cur_window_packets == 0) assert(conn->cur_window == 0); |
---|
2046 | #endif |
---|
2047 | |
---|
2048 | // this invariant should always be true |
---|
2049 | assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets)); |
---|
2050 | |
---|
2051 | // flush Nagle |
---|
2052 | if (conn->cur_window_packets == 1) { |
---|
2053 | OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - 1); |
---|
2054 | // do we still have quota? |
---|
2055 | if (pkt->transmissions == 0 && |
---|
2056 | (!(USE_PACKET_PACING) || conn->send_quota / 100 >= (int32)pkt->length)) { |
---|
2057 | conn->send_packet(pkt); |
---|
2058 | |
---|
2059 | // No need to send another ack if there is nothing to reorder. |
---|
2060 | if (conn->reorder_count == 0) { |
---|
2061 | conn->sent_ack(); |
---|
2062 | } |
---|
2063 | } |
---|
2064 | } |
---|
2065 | |
---|
2066 | // Fast timeout-retry |
---|
2067 | if (conn->fast_timeout) { |
---|
2068 | LOG_UTPV("Fast timeout %u,%u,%u?", (uint)conn->cur_window, conn->seq_nr - conn->timeout_seq_nr, conn->timeout_seq_nr); |
---|
2069 | // if the fast_resend_seq_nr is not pointing to the oldest outstanding packet, it suggests that we've already |
---|
2070 | // resent the packet that timed out, and we should leave the fast-timeout mode. |
---|
2071 | if (((conn->seq_nr - conn->cur_window_packets) & ACK_NR_MASK) != conn->fast_resend_seq_nr) { |
---|
2072 | conn->fast_timeout = false; |
---|
2073 | } else { |
---|
2074 | // resend the oldest packet and increment fast_resend_seq_nr |
---|
2075 | // to not allow another fast resend on it again |
---|
2076 | OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets); |
---|
2077 | if (pkt && pkt->transmissions > 0) { |
---|
2078 | LOG_UTPV("0x%08x: Packet %u fast timeout-retry.", conn, conn->seq_nr - conn->cur_window_packets); |
---|
2079 | #ifdef _DEBUG |
---|
2080 | ++conn->_stats._fastrexmit; |
---|
2081 | #endif |
---|
2082 | conn->fast_resend_seq_nr++; |
---|
2083 | conn->send_packet(pkt); |
---|
2084 | } |
---|
2085 | } |
---|
2086 | } |
---|
2087 | } |
---|
2088 | |
---|
2089 | // Process selective acknowledgent |
---|
2090 | if (selack_ptr != NULL) { |
---|
2091 | conn->selective_ack(pk_ack_nr + 2, selack_ptr, selack_ptr[-1]); |
---|
2092 | } |
---|
2093 | |
---|
2094 | // this invariant should always be true |
---|
2095 | assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets)); |
---|
2096 | |
---|
2097 | LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%u cur_window:%u cur_window_packets:%u quota:%d", |
---|
2098 | conn, acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets, |
---|
2099 | conn->send_quota / 100); |
---|
2100 | |
---|
2101 | // In case the ack dropped the current window below |
---|
2102 | // the max_window size, Mark the socket as writable |
---|
2103 | if (conn->state == CS_CONNECTED_FULL && conn->is_writable(conn->get_packet_size())) { |
---|
2104 | conn->state = CS_CONNECTED; |
---|
2105 | LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u", |
---|
2106 | conn, (uint)conn->max_window, (uint)conn->cur_window, conn->send_quota / 100, (uint)conn->get_packet_size()); |
---|
2107 | conn->func.on_state(conn->userdata, UTP_STATE_WRITABLE); |
---|
2108 | } |
---|
2109 | |
---|
2110 | if (pk_flags == ST_STATE) { |
---|
2111 | // This is a state packet only. |
---|
2112 | return 0; |
---|
2113 | } |
---|
2114 | |
---|
2115 | // The connection is not in a state that can accept data? |
---|
2116 | if (conn->state != CS_CONNECTED && |
---|
2117 | conn->state != CS_CONNECTED_FULL && |
---|
2118 | conn->state != CS_FIN_SENT) { |
---|
2119 | return 0; |
---|
2120 | } |
---|
2121 | |
---|
2122 | // Is this a finalize packet? |
---|
2123 | if (pk_flags == ST_FIN && !conn->got_fin) { |
---|
2124 | LOG_UTPV("Got FIN eof_pkt:%u", pk_seq_nr); |
---|
2125 | conn->got_fin = true; |
---|
2126 | conn->eof_pkt = pk_seq_nr; |
---|
2127 | // at this point, it is possible for the |
---|
2128 | // other end to have sent packets with |
---|
2129 | // sequence numbers higher than seq_nr. |
---|
2130 | // if this is the case, our reorder_count |
---|
2131 | // is out of sync. This case is dealt with |
---|
2132 | // when we re-order and hit the eof_pkt. |
---|
2133 | // we'll just ignore any packets with |
---|
2134 | // sequence numbers past this |
---|
2135 | } |
---|
2136 | |
---|
2137 | // Getting an in-order packet? |
---|
2138 | if (seqnr == 0) { |
---|
2139 | size_t count = packet_end - data; |
---|
2140 | if (count > 0 && conn->state != CS_FIN_SENT) { |
---|
2141 | LOG_UTPV("0x%08x: Got Data len:%u (rb:%u)", conn, (uint)count, (uint)conn->func.get_rb_size(conn->userdata)); |
---|
2142 | // Post bytes to the upper layer |
---|
2143 | conn->func.on_read(conn->userdata, data, count); |
---|
2144 | } |
---|
2145 | conn->ack_nr++; |
---|
2146 | conn->bytes_since_ack += count; |
---|
2147 | |
---|
2148 | // Check if the next packet has been received too, but waiting |
---|
2149 | // in the reorder buffer. |
---|
2150 | for (;;) { |
---|
2151 | |
---|
2152 | if (conn->got_fin && conn->eof_pkt == conn->ack_nr) { |
---|
2153 | if (conn->state != CS_FIN_SENT) { |
---|
2154 | conn->state = CS_GOT_FIN; |
---|
2155 | conn->rto_timeout = g_current_ms + min<uint>(conn->rto * 3, 60); |
---|
2156 | |
---|
2157 | LOG_UTPV("0x%08x: Posting EOF", conn); |
---|
2158 | conn->func.on_state(conn->userdata, UTP_STATE_EOF); |
---|
2159 | } |
---|
2160 | |
---|
2161 | // if the other end wants to close, ack immediately |
---|
2162 | conn->send_ack(); |
---|
2163 | |
---|
2164 | // reorder_count is not necessarily 0 at this point. |
---|
2165 | // even though it is most of the time, the other end |
---|
2166 | // may have sent packets with higher sequence numbers |
---|
2167 | // than what later end up being eof_pkt |
---|
2168 | // since we have received all packets up to eof_pkt |
---|
2169 | // just ignore the ones after it. |
---|
2170 | conn->reorder_count = 0; |
---|
2171 | } |
---|
2172 | |
---|
2173 | // Quick get-out in case there is nothing to reorder |
---|
2174 | if (conn->reorder_count == 0) |
---|
2175 | break; |
---|
2176 | |
---|
2177 | // Check if there are additional buffers in the reorder buffers |
---|
2178 | // that need delivery. |
---|
2179 | byte *p = (byte*)conn->inbuf.get(conn->ack_nr+1); |
---|
2180 | if (p == NULL) |
---|
2181 | break; |
---|
2182 | conn->inbuf.put(conn->ack_nr+1, NULL); |
---|
2183 | count = *(uint*)p; |
---|
2184 | if (count > 0 && conn->state != CS_FIN_SENT) { |
---|
2185 | // Pass the bytes to the upper layer |
---|
2186 | conn->func.on_read(conn->userdata, p + sizeof(uint), count); |
---|
2187 | } |
---|
2188 | conn->ack_nr++; |
---|
2189 | conn->bytes_since_ack += count; |
---|
2190 | |
---|
2191 | // Free the element from the reorder buffer |
---|
2192 | free(p); |
---|
2193 | assert(conn->reorder_count > 0); |
---|
2194 | conn->reorder_count--; |
---|
2195 | } |
---|
2196 | |
---|
2197 | // start the delayed ACK timer |
---|
2198 | conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD); |
---|
2199 | } else { |
---|
2200 | // Getting an out of order packet. |
---|
2201 | // The packet needs to be remembered and rearranged later. |
---|
2202 | |
---|
2203 | // if we have received a FIN packet, and the EOF-sequence number |
---|
2204 | // is lower than the sequence number of the packet we just received |
---|
2205 | // something is wrong. |
---|
2206 | if (conn->got_fin && pk_seq_nr > conn->eof_pkt) { |
---|
2207 | LOG_UTPV("0x%08x: Got an invalid packet sequence number, past EOF " |
---|
2208 | "reorder_count:%u len:%u (rb:%u)", |
---|
2209 | conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata)); |
---|
2210 | return 0; |
---|
2211 | } |
---|
2212 | |
---|
2213 | // if the sequence number is entirely off the expected |
---|
2214 | // one, just drop it. We can't allocate buffer space in |
---|
2215 | // the inbuf entirely based on untrusted input |
---|
2216 | if (seqnr > 0x3ff) { |
---|
2217 | LOG_UTPV("0x%08x: Got an invalid packet sequence number, too far off " |
---|
2218 | "reorder_count:%u len:%u (rb:%u)", |
---|
2219 | conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata)); |
---|
2220 | return 0; |
---|
2221 | } |
---|
2222 | |
---|
2223 | // we need to grow the circle buffer before we |
---|
2224 | // check if the packet is already in here, so that |
---|
2225 | // we don't end up looking at an older packet (since |
---|
2226 | // the indices wraps around). |
---|
2227 | conn->inbuf.ensure_size(pk_seq_nr + 1, seqnr + 1); |
---|
2228 | |
---|
2229 | // Has this packet already been received? (i.e. a duplicate) |
---|
2230 | // If that is the case, just discard it. |
---|
2231 | if (conn->inbuf.get(pk_seq_nr) != NULL) { |
---|
2232 | #ifdef _DEBUG |
---|
2233 | ++conn->_stats._nduprecv; |
---|
2234 | #endif |
---|
2235 | return 0; |
---|
2236 | } |
---|
2237 | |
---|
2238 | // Allocate memory to fit the packet that needs to re-ordered |
---|
2239 | byte *mem = (byte*)malloc((packet_end - data) + sizeof(uint)); |
---|
2240 | *(uint*)mem = (uint)(packet_end - data); |
---|
2241 | memcpy(mem + sizeof(uint), data, packet_end - data); |
---|
2242 | |
---|
2243 | // Insert into reorder buffer and increment the count |
---|
2244 | // of # of packets to be reordered. |
---|
2245 | // we add one to seqnr in order to leave the last |
---|
2246 | // entry empty, that way the assert in send_ack |
---|
2247 | // is valid. we have to add one to seqnr too, in order |
---|
2248 | // to make the circular buffer grow around the correct |
---|
2249 | // point (which is conn->ack_nr + 1). |
---|
2250 | assert(conn->inbuf.get(pk_seq_nr) == NULL); |
---|
2251 | assert((pk_seq_nr & conn->inbuf.mask) != ((conn->ack_nr+1) & conn->inbuf.mask)); |
---|
2252 | conn->inbuf.put(pk_seq_nr, mem); |
---|
2253 | conn->reorder_count++; |
---|
2254 | |
---|
2255 | LOG_UTPV("0x%08x: Got out of order data reorder_count:%u len:%u (rb:%u)", |
---|
2256 | conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata)); |
---|
2257 | |
---|
2258 | // Setup so the partial ACK message will get sent immediately. |
---|
2259 | conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, 1); |
---|
2260 | } |
---|
2261 | |
---|
2262 | // If ack_time or ack_bytes indicate that we need to send and ack, send one |
---|
2263 | // here instead of waiting for the timer to trigger |
---|
2264 | LOG_UTPV("bytes_since_ack:%u ack_time:%d", |
---|
2265 | (uint)conn->bytes_since_ack, (int)(g_current_ms - conn->ack_time)); |
---|
2266 | if (conn->state == CS_CONNECTED || conn->state == CS_CONNECTED_FULL) { |
---|
2267 | if (conn->bytes_since_ack > DELAYED_ACK_BYTE_THRESHOLD || |
---|
2268 | (int)(g_current_ms - conn->ack_time) >= 0) { |
---|
2269 | conn->send_ack(); |
---|
2270 | } |
---|
2271 | } |
---|
2272 | return (size_t)(packet_end - data); |
---|
2273 | } |
---|
2274 | |
---|
2275 | inline bool UTP_IsV1(PacketFormatV1 const* pf) |
---|
2276 | { |
---|
2277 | return pf->version() == 1 && pf->type() < ST_NUM_STATES && pf->ext < 3; |
---|
2278 | } |
---|
2279 | |
---|
2280 | void UTP_Free(UTPSocket *conn) |
---|
2281 | { |
---|
2282 | LOG_UTPV("0x%08x: Killing socket", conn); |
---|
2283 | |
---|
2284 | conn->func.on_state(conn->userdata, UTP_STATE_DESTROYING); |
---|
2285 | UTP_SetCallbacks(conn, NULL, NULL); |
---|
2286 | |
---|
2287 | assert(conn->idx < g_utp_sockets.GetCount()); |
---|
2288 | assert(g_utp_sockets[conn->idx] == conn); |
---|
2289 | |
---|
2290 | // Unlink object from the global list |
---|
2291 | assert(g_utp_sockets.GetCount() > 0); |
---|
2292 | |
---|
2293 | UTPSocket *last = g_utp_sockets[g_utp_sockets.GetCount() - 1]; |
---|
2294 | |
---|
2295 | assert(last->idx < g_utp_sockets.GetCount()); |
---|
2296 | assert(g_utp_sockets[last->idx] == last); |
---|
2297 | |
---|
2298 | last->idx = conn->idx; |
---|
2299 | |
---|
2300 | g_utp_sockets[conn->idx] = last; |
---|
2301 | |
---|
2302 | // Decrease the count |
---|
2303 | g_utp_sockets.SetCount(g_utp_sockets.GetCount() - 1); |
---|
2304 | |
---|
2305 | // Free all memory occupied by the socket object. |
---|
2306 | for (size_t i = 0; i <= conn->inbuf.mask; i++) { |
---|
2307 | free(conn->inbuf.elements[i]); |
---|
2308 | } |
---|
2309 | for (size_t i = 0; i <= conn->outbuf.mask; i++) { |
---|
2310 | free(conn->outbuf.elements[i]); |
---|
2311 | } |
---|
2312 | free(conn->inbuf.elements); |
---|
2313 | free(conn->outbuf.elements); |
---|
2314 | |
---|
2315 | // Finally free the socket object |
---|
2316 | free(conn); |
---|
2317 | } |
---|
2318 | |
---|
2319 | |
---|
2320 | // Public functions: |
---|
2321 | /////////////////////////////////////////////////////////////////////////////// |
---|
2322 | |
---|
2323 | // Create a UTP socket |
---|
2324 | UTPSocket *UTP_Create(SendToProc *send_to_proc, void *send_to_userdata, const struct sockaddr *addr, socklen_t addrlen) |
---|
2325 | { |
---|
2326 | UTPSocket *conn = (UTPSocket*)calloc(1, sizeof(UTPSocket)); |
---|
2327 | |
---|
2328 | g_current_ms = UTP_GetMilliseconds(); |
---|
2329 | |
---|
2330 | UTP_SetCallbacks(conn, NULL, NULL); |
---|
2331 | conn->our_hist.clear(); |
---|
2332 | conn->their_hist.clear(); |
---|
2333 | conn->rto = 3000; |
---|
2334 | conn->rtt_var = 800; |
---|
2335 | conn->seq_nr = 1; |
---|
2336 | conn->ack_nr = 0; |
---|
2337 | conn->max_window_user = 255 * PACKET_SIZE; |
---|
2338 | conn->addr = PackedSockAddr((const SOCKADDR_STORAGE*)addr, addrlen); |
---|
2339 | conn->send_to_proc = send_to_proc; |
---|
2340 | conn->send_to_userdata = send_to_userdata; |
---|
2341 | conn->ack_time = g_current_ms + 0x70000000; |
---|
2342 | conn->last_got_packet = g_current_ms; |
---|
2343 | conn->last_sent_packet = g_current_ms; |
---|
2344 | conn->last_measured_delay = g_current_ms + 0x70000000; |
---|
2345 | conn->last_rwin_decay = int32(g_current_ms) - MAX_WINDOW_DECAY; |
---|
2346 | conn->last_send_quota = g_current_ms; |
---|
2347 | conn->send_quota = PACKET_SIZE * 100; |
---|
2348 | conn->cur_window_packets = 0; |
---|
2349 | conn->fast_resend_seq_nr = conn->seq_nr; |
---|
2350 | |
---|
2351 | // default to version 1 |
---|
2352 | UTP_SetSockopt(conn, SO_UTPVERSION, 1); |
---|
2353 | |
---|
2354 | // we need to fit one packet in the window |
---|
2355 | // when we start the connection |
---|
2356 | conn->max_window = conn->get_packet_size(); |
---|
2357 | conn->state = CS_IDLE; |
---|
2358 | |
---|
2359 | conn->outbuf.mask = 15; |
---|
2360 | conn->inbuf.mask = 15; |
---|
2361 | |
---|
2362 | conn->outbuf.elements = (void**)calloc(16, sizeof(void*)); |
---|
2363 | conn->inbuf.elements = (void**)calloc(16, sizeof(void*)); |
---|
2364 | |
---|
2365 | conn->idx = g_utp_sockets.Append(conn); |
---|
2366 | |
---|
2367 | LOG_UTPV("0x%08x: UTP_Create", conn); |
---|
2368 | |
---|
2369 | return conn; |
---|
2370 | } |
---|
2371 | |
---|
2372 | void UTP_SetCallbacks(UTPSocket *conn, UTPFunctionTable *funcs, void *userdata) |
---|
2373 | { |
---|
2374 | assert(conn); |
---|
2375 | |
---|
2376 | if (funcs == NULL) { |
---|
2377 | funcs = &zero_funcs; |
---|
2378 | } |
---|
2379 | conn->func = *funcs; |
---|
2380 | conn->userdata = userdata; |
---|
2381 | } |
---|
2382 | |
---|
2383 | bool UTP_SetSockopt(UTPSocket* conn, int opt, int val) |
---|
2384 | { |
---|
2385 | assert(conn); |
---|
2386 | |
---|
2387 | switch (opt) { |
---|
2388 | case SO_SNDBUF: |
---|
2389 | assert(val >= 1); |
---|
2390 | conn->opt_sndbuf = val; |
---|
2391 | return true; |
---|
2392 | case SO_RCVBUF: |
---|
2393 | conn->opt_rcvbuf = val; |
---|
2394 | return true; |
---|
2395 | case SO_UTPVERSION: |
---|
2396 | assert(conn->state == CS_IDLE); |
---|
2397 | if (conn->state != CS_IDLE) { |
---|
2398 | // too late |
---|
2399 | return false; |
---|
2400 | } |
---|
2401 | if (conn->version == 1 && val == 0) { |
---|
2402 | conn->reply_micro = INT_MAX; |
---|
2403 | conn->opt_rcvbuf = 200 * 1024; |
---|
2404 | conn->opt_sndbuf = OUTGOING_BUFFER_MAX_SIZE * PACKET_SIZE; |
---|
2405 | } else if (conn->version == 0 && val == 1) { |
---|
2406 | conn->reply_micro = 0; |
---|
2407 | conn->opt_rcvbuf = 3 * 1024 * 1024 + 512 * 1024; |
---|
2408 | conn->opt_sndbuf = conn->opt_rcvbuf; |
---|
2409 | } |
---|
2410 | conn->version = val; |
---|
2411 | return true; |
---|
2412 | } |
---|
2413 | |
---|
2414 | return false; |
---|
2415 | } |
---|
2416 | |
---|
2417 | // Try to connect to a specified host. |
---|
2418 | // 'initial' is the number of data bytes to send in the connect packet. |
---|
2419 | void UTP_Connect(UTPSocket *conn) |
---|
2420 | { |
---|
2421 | assert(conn); |
---|
2422 | |
---|
2423 | assert(conn->state == CS_IDLE); |
---|
2424 | assert(conn->cur_window_packets == 0); |
---|
2425 | assert(conn->outbuf.get(conn->seq_nr) == NULL); |
---|
2426 | assert(sizeof(PacketFormatV1) == 20); |
---|
2427 | |
---|
2428 | conn->state = CS_SYN_SENT; |
---|
2429 | |
---|
2430 | g_current_ms = UTP_GetMilliseconds(); |
---|
2431 | |
---|
2432 | // Create and send a connect message |
---|
2433 | uint32 conn_seed = UTP_Random(); |
---|
2434 | |
---|
2435 | // we identify newer versions by setting the |
---|
2436 | // first two bytes to 0x0001 |
---|
2437 | if (conn->version > 0) { |
---|
2438 | conn_seed &= 0xffff; |
---|
2439 | } |
---|
2440 | |
---|
2441 | // used in parse_log.py |
---|
2442 | LOG_UTP("0x%08x: UTP_Connect conn_seed:%u packet_size:%u (B) " |
---|
2443 | "target_delay:%u (ms) delay_history:%u " |
---|
2444 | "delay_base_history:%u (minutes)", |
---|
2445 | conn, conn_seed, PACKET_SIZE, CCONTROL_TARGET / 1000, |
---|
2446 | CUR_DELAY_SIZE, DELAY_BASE_HISTORY); |
---|
2447 | |
---|
2448 | // Setup initial timeout timer. |
---|
2449 | conn->retransmit_timeout = 3000; |
---|
2450 | conn->rto_timeout = g_current_ms + conn->retransmit_timeout; |
---|
2451 | conn->last_rcv_win = conn->get_rcv_window(); |
---|
2452 | |
---|
2453 | conn->conn_seed = conn_seed; |
---|
2454 | conn->conn_id_recv = conn_seed; |
---|
2455 | conn->conn_id_send = conn_seed+1; |
---|
2456 | // if you need compatibiltiy with 1.8.1, use this. it increases attackability though. |
---|
2457 | //conn->seq_nr = 1; |
---|
2458 | conn->seq_nr = UTP_Random(); |
---|
2459 | |
---|
2460 | // Create the connect packet. |
---|
2461 | const size_t header_ext_size = conn->get_header_extensions_size(); |
---|
2462 | |
---|
2463 | OutgoingPacket *pkt = (OutgoingPacket*)malloc(sizeof(OutgoingPacket) - 1 + header_ext_size); |
---|
2464 | |
---|
2465 | PacketFormatExtensions* p = (PacketFormatExtensions*)pkt->data; |
---|
2466 | PacketFormatExtensionsV1* p1 = (PacketFormatExtensionsV1*)pkt->data; |
---|
2467 | |
---|
2468 | memset(p, 0, header_ext_size); |
---|
2469 | // SYN packets are special, and have the receive ID in the connid field, |
---|
2470 | // instead of conn_id_send. |
---|
2471 | if (conn->version == 0) { |
---|
2472 | p->pf.connid = conn->conn_id_recv; |
---|
2473 | p->pf.ext = 2; |
---|
2474 | p->pf.windowsize = (byte)DIV_ROUND_UP(conn->last_rcv_win, PACKET_SIZE); |
---|
2475 | p->pf.seq_nr = conn->seq_nr; |
---|
2476 | p->pf.flags = ST_SYN; |
---|
2477 | p->ext_next = 0; |
---|
2478 | p->ext_len = 8; |
---|
2479 | memset(p->extensions, 0, 8); |
---|
2480 | } else { |
---|
2481 | p1->pf.set_version(1); |
---|
2482 | p1->pf.set_type(ST_SYN); |
---|
2483 | p1->pf.ext = 2; |
---|
2484 | p1->pf.connid = conn->conn_id_recv; |
---|
2485 | p1->pf.windowsize = (uint32)conn->last_rcv_win; |
---|
2486 | p1->pf.seq_nr = conn->seq_nr; |
---|
2487 | p1->ext_next = 0; |
---|
2488 | p1->ext_len = 8; |
---|
2489 | memset(p1->extensions, 0, 8); |
---|
2490 | } |
---|
2491 | pkt->transmissions = 0; |
---|
2492 | pkt->length = header_ext_size; |
---|
2493 | pkt->payload = 0; |
---|
2494 | |
---|
2495 | //LOG_UTPV("0x%08x: Sending connect %s [%u].", |
---|
2496 | // conn, addrfmt(conn->addr, addrbuf), conn_seed); |
---|
2497 | |
---|
2498 | // Remember the message in the outgoing queue. |
---|
2499 | conn->outbuf.ensure_size(conn->seq_nr, conn->cur_window_packets); |
---|
2500 | conn->outbuf.put(conn->seq_nr, pkt); |
---|
2501 | conn->seq_nr++; |
---|
2502 | conn->cur_window_packets++; |
---|
2503 | |
---|
2504 | conn->send_packet(pkt); |
---|
2505 | } |
---|
2506 | |
---|
2507 | bool UTP_IsIncomingUTP(UTPGotIncomingConnection *incoming_proc, |
---|
2508 | SendToProc *send_to_proc, void *send_to_userdata, |
---|
2509 | const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen) |
---|
2510 | { |
---|
2511 | const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen); |
---|
2512 | |
---|
2513 | if (len < sizeof(PacketFormat) && len < sizeof(PacketFormatV1)) { |
---|
2514 | LOG_UTPV("recv %s len:%u too small", addrfmt(addr, addrbuf), (uint)len); |
---|
2515 | return false; |
---|
2516 | } |
---|
2517 | |
---|
2518 | const PacketFormat* p = (PacketFormat*)buffer; |
---|
2519 | const PacketFormatV1* p1 = (PacketFormatV1*)buffer; |
---|
2520 | |
---|
2521 | const byte version = UTP_IsV1(p1); |
---|
2522 | const uint32 id = (version == 0) ? p->connid : uint32(p1->connid); |
---|
2523 | |
---|
2524 | if (version == 0 && len < sizeof(PacketFormat)) { |
---|
2525 | LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(addr, addrbuf), (uint)len, version); |
---|
2526 | return false; |
---|
2527 | } |
---|
2528 | |
---|
2529 | if (version == 1 && len < sizeof(PacketFormatV1)) { |
---|
2530 | LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(addr, addrbuf), (uint)len, version); |
---|
2531 | return false; |
---|
2532 | } |
---|
2533 | |
---|
2534 | LOG_UTPV("recv %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, id); |
---|
2535 | |
---|
2536 | const PacketFormat *pf = (PacketFormat*)p; |
---|
2537 | const PacketFormatV1 *pf1 = (PacketFormatV1*)p; |
---|
2538 | |
---|
2539 | if (version == 0) { |
---|
2540 | LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf->seq_nr, (uint)pf->ack_nr); |
---|
2541 | } else { |
---|
2542 | LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf1->seq_nr, (uint)pf1->ack_nr); |
---|
2543 | } |
---|
2544 | |
---|
2545 | const byte flags = version == 0 ? pf->flags : pf1->type(); |
---|
2546 | |
---|
2547 | for (size_t i = 0; i < g_utp_sockets.GetCount(); i++) { |
---|
2548 | UTPSocket *conn = g_utp_sockets[i]; |
---|
2549 | //LOG_UTPV("Examining UTPSocket %s for %s and (seed:%u s:%u r:%u) for %u", |
---|
2550 | // addrfmt(conn->addr, addrbuf), addrfmt(addr, addrbuf2), conn->conn_seed, conn->conn_id_send, conn->conn_id_recv, id); |
---|
2551 | if (conn->addr != addr) |
---|
2552 | continue; |
---|
2553 | |
---|
2554 | if (flags == ST_RESET && (conn->conn_id_send == id || conn->conn_id_recv == id)) { |
---|
2555 | LOG_UTPV("0x%08x: recv RST for existing connection", conn); |
---|
2556 | if (!conn->userdata || conn->state == CS_FIN_SENT) { |
---|
2557 | conn->state = CS_DESTROY; |
---|
2558 | } else { |
---|
2559 | conn->state = CS_RESET; |
---|
2560 | } |
---|
2561 | if (conn->userdata) { |
---|
2562 | conn->func.on_overhead(conn->userdata, false, len + conn->get_udp_overhead(), |
---|
2563 | close_overhead); |
---|
2564 | const int err = conn->state == CS_SYN_SENT ? |
---|
2565 | ECONNREFUSED : |
---|
2566 | ECONNRESET; |
---|
2567 | conn->func.on_error(conn->userdata, err); |
---|
2568 | } |
---|
2569 | return true; |
---|
2570 | } else if (flags != ST_SYN && conn->conn_id_recv == id) { |
---|
2571 | LOG_UTPV("0x%08x: recv processing", conn); |
---|
2572 | const size_t read = UTP_ProcessIncoming(conn, buffer, len); |
---|
2573 | if (conn->userdata) { |
---|
2574 | conn->func.on_overhead(conn->userdata, false, |
---|
2575 | (len - read) + conn->get_udp_overhead(), |
---|
2576 | header_overhead); |
---|
2577 | } |
---|
2578 | return true; |
---|
2579 | } |
---|
2580 | } |
---|
2581 | |
---|
2582 | if (flags == ST_RESET) { |
---|
2583 | LOG_UTPV("recv RST for unknown connection"); |
---|
2584 | return true; |
---|
2585 | } |
---|
2586 | |
---|
2587 | const uint32 seq_nr = version == 0 ? pf->seq_nr : pf1->seq_nr; |
---|
2588 | if (flags != ST_SYN) { |
---|
2589 | for (size_t i = 0; i < g_rst_info.GetCount(); i++) { |
---|
2590 | if (g_rst_info[i].connid != id) |
---|
2591 | continue; |
---|
2592 | if (g_rst_info[i].addr != addr) |
---|
2593 | continue; |
---|
2594 | if (seq_nr != g_rst_info[i].ack_nr) |
---|
2595 | continue; |
---|
2596 | g_rst_info[i].timestamp = UTP_GetMilliseconds(); |
---|
2597 | LOG_UTPV("recv not sending RST to non-SYN (stored)"); |
---|
2598 | return true; |
---|
2599 | } |
---|
2600 | if (g_rst_info.GetCount() > RST_INFO_LIMIT) { |
---|
2601 | LOG_UTPV("recv not sending RST to non-SYN (limit at %u stored)", (uint)g_rst_info.GetCount()); |
---|
2602 | return true; |
---|
2603 | } |
---|
2604 | LOG_UTPV("recv send RST to non-SYN (%u stored)", (uint)g_rst_info.GetCount()); |
---|
2605 | RST_Info &r = g_rst_info.Append(); |
---|
2606 | r.addr = addr; |
---|
2607 | r.connid = id; |
---|
2608 | r.ack_nr = seq_nr; |
---|
2609 | r.timestamp = UTP_GetMilliseconds(); |
---|
2610 | |
---|
2611 | UTPSocket::send_rst(send_to_proc, send_to_userdata, addr, id, seq_nr, UTP_Random(), version); |
---|
2612 | return true; |
---|
2613 | } |
---|
2614 | |
---|
2615 | if (incoming_proc) { |
---|
2616 | LOG_UTPV("Incoming connection from %s uTP version:%u", addrfmt(addr, addrbuf), version); |
---|
2617 | |
---|
2618 | // Create a new UTP socket to handle this new connection |
---|
2619 | UTPSocket *conn = UTP_Create(send_to_proc, send_to_userdata, to, tolen); |
---|
2620 | // Need to track this value to be able to detect duplicate CONNECTs |
---|
2621 | conn->conn_seed = id; |
---|
2622 | // This is value that identifies this connection for them. |
---|
2623 | conn->conn_id_send = id; |
---|
2624 | // This is value that identifies this connection for us. |
---|
2625 | conn->conn_id_recv = id+1; |
---|
2626 | conn->ack_nr = seq_nr; |
---|
2627 | conn->seq_nr = UTP_Random(); |
---|
2628 | conn->fast_resend_seq_nr = conn->seq_nr; |
---|
2629 | |
---|
2630 | UTP_SetSockopt(conn, SO_UTPVERSION, version); |
---|
2631 | conn->state = CS_CONNECTED; |
---|
2632 | |
---|
2633 | const size_t read = UTP_ProcessIncoming(conn, buffer, len, true); |
---|
2634 | |
---|
2635 | LOG_UTPV("0x%08x: recv send connect ACK", conn); |
---|
2636 | conn->send_ack(true); |
---|
2637 | |
---|
2638 | incoming_proc(send_to_userdata, conn); |
---|
2639 | |
---|
2640 | // we report overhead after incoming_proc, because the callbacks are setup now |
---|
2641 | if (conn->userdata) { |
---|
2642 | // SYN |
---|
2643 | conn->func.on_overhead(conn->userdata, false, (len - read) + conn->get_udp_overhead(), |
---|
2644 | header_overhead); |
---|
2645 | // SYNACK |
---|
2646 | conn->func.on_overhead(conn->userdata, true, conn->get_overhead(), |
---|
2647 | ack_overhead); |
---|
2648 | } |
---|
2649 | } |
---|
2650 | |
---|
2651 | return true; |
---|
2652 | } |
---|
2653 | |
---|
2654 | bool UTP_HandleICMP(const byte* buffer, size_t len, const struct sockaddr *to, socklen_t tolen) |
---|
2655 | { |
---|
2656 | const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen); |
---|
2657 | |
---|
2658 | // Want the whole packet so we have connection ID |
---|
2659 | if (len < sizeof(PacketFormat)) { |
---|
2660 | return false; |
---|
2661 | } |
---|
2662 | |
---|
2663 | const PacketFormat* p = (PacketFormat*)buffer; |
---|
2664 | const PacketFormatV1* p1 = (PacketFormatV1*)buffer; |
---|
2665 | |
---|
2666 | const byte version = UTP_IsV1(p1); |
---|
2667 | const uint32 id = (version == 0) ? p->connid : uint32(p1->connid); |
---|
2668 | |
---|
2669 | for (size_t i = 0; i < g_utp_sockets.GetCount(); ++i) { |
---|
2670 | UTPSocket *conn = g_utp_sockets[i]; |
---|
2671 | if (conn->addr == addr && |
---|
2672 | conn->conn_id_recv == id) { |
---|
2673 | // Don't pass on errors for idle/closed connections |
---|
2674 | if (conn->state != CS_IDLE) { |
---|
2675 | if (!conn->userdata || conn->state == CS_FIN_SENT) { |
---|
2676 | LOG_UTPV("0x%08x: icmp packet causing socket destruction", conn); |
---|
2677 | conn->state = CS_DESTROY; |
---|
2678 | } else { |
---|
2679 | conn->state = CS_RESET; |
---|
2680 | } |
---|
2681 | if (conn->userdata) { |
---|
2682 | const int err = conn->state == CS_SYN_SENT ? |
---|
2683 | ECONNREFUSED : |
---|
2684 | ECONNRESET; |
---|
2685 | LOG_UTPV("0x%08x: icmp packet causing error on socket:%d", conn, err); |
---|
2686 | conn->func.on_error(conn->userdata, err); |
---|
2687 | } |
---|
2688 | } |
---|
2689 | return true; |
---|
2690 | } |
---|
2691 | } |
---|
2692 | return false; |
---|
2693 | } |
---|
2694 | |
---|
2695 | // Write bytes to the UTP socket. |
---|
2696 | // Returns true if the socket is still writable. |
---|
2697 | bool UTP_Write(UTPSocket *conn, size_t bytes) |
---|
2698 | { |
---|
2699 | assert(conn); |
---|
2700 | |
---|
2701 | #ifdef g_log_utp_verbose |
---|
2702 | size_t param = bytes; |
---|
2703 | #endif |
---|
2704 | |
---|
2705 | if (conn->state != CS_CONNECTED) { |
---|
2706 | LOG_UTPV("0x%08x: UTP_Write %u bytes = false (not CS_CONNECTED)", conn, (uint)bytes); |
---|
2707 | return false; |
---|
2708 | } |
---|
2709 | |
---|
2710 | g_current_ms = UTP_GetMilliseconds(); |
---|
2711 | |
---|
2712 | conn->update_send_quota(); |
---|
2713 | |
---|
2714 | // don't send unless it will all fit in the window |
---|
2715 | size_t packet_size = conn->get_packet_size(); |
---|
2716 | size_t num_to_send = min<size_t>(bytes, packet_size); |
---|
2717 | while (conn->is_writable(num_to_send)) { |
---|
2718 | // Send an outgoing packet. |
---|
2719 | // Also add it to the outgoing of packets that have been sent but not ACKed. |
---|
2720 | |
---|
2721 | if (num_to_send == 0) { |
---|
2722 | LOG_UTPV("0x%08x: UTP_Write %u bytes = true", conn, (uint)param); |
---|
2723 | return true; |
---|
2724 | } |
---|
2725 | bytes -= num_to_send; |
---|
2726 | |
---|
2727 | LOG_UTPV("0x%08x: Sending packet. seq_nr:%u ack_nr:%u wnd:%u/%u/%u rcv_win:%u size:%u quota:%d cur_window_packets:%u", |
---|
2728 | conn, conn->seq_nr, conn->ack_nr, |
---|
2729 | (uint)(conn->cur_window + num_to_send), |
---|
2730 | (uint)conn->max_window, (uint)conn->max_window_user, |
---|
2731 | (uint)conn->last_rcv_win, num_to_send, conn->send_quota / 100, |
---|
2732 | conn->cur_window_packets); |
---|
2733 | conn->write_outgoing_packet(num_to_send, ST_DATA); |
---|
2734 | num_to_send = min<size_t>(bytes, packet_size); |
---|
2735 | } |
---|
2736 | |
---|
2737 | // mark the socket as not being writable. |
---|
2738 | conn->state = CS_CONNECTED_FULL; |
---|
2739 | LOG_UTPV("0x%08x: UTP_Write %u bytes = false", conn, (uint)bytes); |
---|
2740 | return false; |
---|
2741 | } |
---|
2742 | |
---|
2743 | void UTP_RBDrained(UTPSocket *conn) |
---|
2744 | { |
---|
2745 | assert(conn); |
---|
2746 | |
---|
2747 | const size_t rcvwin = conn->get_rcv_window(); |
---|
2748 | |
---|
2749 | if (rcvwin > conn->last_rcv_win) { |
---|
2750 | // If last window was 0 send ACK immediately, otherwise should set timer |
---|
2751 | if (conn->last_rcv_win == 0) { |
---|
2752 | conn->send_ack(); |
---|
2753 | } else { |
---|
2754 | conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD); |
---|
2755 | } |
---|
2756 | } |
---|
2757 | } |
---|
2758 | |
---|
2759 | void UTP_CheckTimeouts() |
---|
2760 | { |
---|
2761 | g_current_ms = UTP_GetMilliseconds(); |
---|
2762 | |
---|
2763 | for (size_t i = 0; i < g_rst_info.GetCount(); i++) { |
---|
2764 | if ((int)(g_current_ms - g_rst_info[i].timestamp) >= RST_INFO_TIMEOUT) { |
---|
2765 | g_rst_info.MoveUpLast(i); |
---|
2766 | i--; |
---|
2767 | } |
---|
2768 | } |
---|
2769 | if (g_rst_info.GetCount() != g_rst_info.GetAlloc()) { |
---|
2770 | g_rst_info.Compact(); |
---|
2771 | } |
---|
2772 | |
---|
2773 | for (size_t i = 0; i != g_utp_sockets.GetCount(); i++) { |
---|
2774 | UTPSocket *conn = g_utp_sockets[i]; |
---|
2775 | conn->check_timeouts(); |
---|
2776 | |
---|
2777 | // Check if the object was deleted |
---|
2778 | if (conn->state == CS_DESTROY) { |
---|
2779 | LOG_UTPV("0x%08x: Destroying", conn); |
---|
2780 | UTP_Free(conn); |
---|
2781 | i--; |
---|
2782 | } |
---|
2783 | } |
---|
2784 | } |
---|
2785 | |
---|
2786 | size_t UTP_GetPacketSize(UTPSocket *socket) |
---|
2787 | { |
---|
2788 | return socket->get_packet_size(); |
---|
2789 | } |
---|
2790 | |
---|
2791 | void UTP_GetPeerName(UTPSocket *conn, struct sockaddr *addr, socklen_t *addrlen) |
---|
2792 | { |
---|
2793 | assert(conn); |
---|
2794 | |
---|
2795 | socklen_t len; |
---|
2796 | const SOCKADDR_STORAGE sa = conn->addr.get_sockaddr_storage(&len); |
---|
2797 | *addrlen = min(len, *addrlen); |
---|
2798 | memcpy(addr, &sa, *addrlen); |
---|
2799 | } |
---|
2800 | |
---|
2801 | void UTP_GetDelays(UTPSocket *conn, int32 *ours, int32 *theirs, uint32 *age) |
---|
2802 | { |
---|
2803 | assert(conn); |
---|
2804 | |
---|
2805 | if (ours) *ours = conn->our_hist.get_value(); |
---|
2806 | if (theirs) *theirs = conn->their_hist.get_value(); |
---|
2807 | if (age) *age = g_current_ms - conn->last_measured_delay; |
---|
2808 | } |
---|
2809 | |
---|
2810 | #ifdef _DEBUG |
---|
2811 | void UTP_GetStats(UTPSocket *conn, UTPStats *stats) |
---|
2812 | { |
---|
2813 | assert(conn); |
---|
2814 | |
---|
2815 | *stats = conn->_stats; |
---|
2816 | } |
---|
2817 | #endif // _DEBUG |
---|
2818 | |
---|
2819 | void UTP_GetGlobalStats(UTPGlobalStats *stats) |
---|
2820 | { |
---|
2821 | *stats = _global_stats; |
---|
2822 | } |
---|
2823 | |
---|
2824 | // Close the UTP socket. |
---|
2825 | // It is not valid for the upper layer to refer to socket after it is closed. |
---|
2826 | // Data will keep to try being delivered after the close. |
---|
2827 | void UTP_Close(UTPSocket *conn) |
---|
2828 | { |
---|
2829 | assert(conn); |
---|
2830 | |
---|
2831 | assert(conn->state != CS_DESTROY_DELAY && conn->state != CS_FIN_SENT && conn->state != CS_DESTROY); |
---|
2832 | |
---|
2833 | LOG_UTPV("0x%08x: UTP_Close in state:%s", conn, statenames[conn->state]); |
---|
2834 | |
---|
2835 | switch(conn->state) { |
---|
2836 | case CS_CONNECTED: |
---|
2837 | case CS_CONNECTED_FULL: |
---|
2838 | conn->state = CS_FIN_SENT; |
---|
2839 | conn->write_outgoing_packet(0, ST_FIN); |
---|
2840 | break; |
---|
2841 | |
---|
2842 | case CS_SYN_SENT: |
---|
2843 | conn->rto_timeout = UTP_GetMilliseconds() + min<uint>(conn->rto * 2, 60); |
---|
2844 | case CS_GOT_FIN: |
---|
2845 | conn->state = CS_DESTROY_DELAY; |
---|
2846 | break; |
---|
2847 | |
---|
2848 | default: |
---|
2849 | conn->state = CS_DESTROY; |
---|
2850 | break; |
---|
2851 | } |
---|
2852 | } |
---|