1 | /* |
---|
2 | * This file Copyright (C) Mnemosyne LLC |
---|
3 | * |
---|
4 | * This file is licensed by the GPL version 2. Works owned by the |
---|
5 | * Transmission project are granted a special exemption to clause 2(b) |
---|
6 | * so that the bulk of its code can remain under the MIT license. |
---|
7 | * This exemption does not extend to derived works not owned by |
---|
8 | * the Transmission project. |
---|
9 | * |
---|
10 | * $Id: peer-io.c 12954 2011-10-08 23:53:27Z jordan $ |
---|
11 | */ |
---|
12 | |
---|
13 | #include <assert.h> |
---|
14 | #include <errno.h> |
---|
15 | #include <string.h> |
---|
16 | |
---|
17 | #include <event2/event.h> |
---|
18 | #include <event2/buffer.h> |
---|
19 | #include <event2/bufferevent.h> |
---|
20 | |
---|
21 | #include "transmission.h" |
---|
22 | #include "session.h" |
---|
23 | #include "bandwidth.h" |
---|
24 | #include "crypto.h" |
---|
25 | #include "net.h" |
---|
26 | #include "peer-common.h" /* MAX_BLOCK_SIZE */ |
---|
27 | #include "peer-io.h" |
---|
28 | #include "trevent.h" /* tr_runInEventThread() */ |
---|
29 | #include "tr-utp.h" |
---|
30 | #include "utils.h" |
---|
31 | |
---|
32 | |
---|
33 | #ifdef WIN32 |
---|
34 | #define EAGAIN WSAEWOULDBLOCK |
---|
35 | #define EINTR WSAEINTR |
---|
36 | #define EINPROGRESS WSAEINPROGRESS |
---|
37 | #define EPIPE WSAECONNRESET |
---|
38 | #endif |
---|
39 | |
---|
40 | /* The amount of read bufferring that we allow for uTP sockets. */ |
---|
41 | |
---|
42 | #define UTP_READ_BUFFER_SIZE (256 * 1024) |
---|
43 | |
---|
44 | static size_t |
---|
45 | guessPacketOverhead( size_t d ) |
---|
46 | { |
---|
47 | /** |
---|
48 | * http://sd.wareonearth.com/~phil/net/overhead/ |
---|
49 | * |
---|
50 | * TCP over Ethernet: |
---|
51 | * Assuming no header compression (e.g. not PPP) |
---|
52 | * Add 20 IPv4 header or 40 IPv6 header (no options) |
---|
53 | * Add 20 TCP header |
---|
54 | * Add 12 bytes optional TCP timestamps |
---|
55 | * Max TCP Payload data rates over ethernet are thus: |
---|
56 | * (1500-40)/(38+1500) = 94.9285 % IPv4, minimal headers |
---|
57 | * (1500-52)/(38+1500) = 94.1482 % IPv4, TCP timestamps |
---|
58 | * (1500-52)/(42+1500) = 93.9040 % 802.1q, IPv4, TCP timestamps |
---|
59 | * (1500-60)/(38+1500) = 93.6281 % IPv6, minimal headers |
---|
60 | * (1500-72)/(38+1500) = 92.8479 % IPv6, TCP timestamps |
---|
61 | * (1500-72)/(42+1500) = 92.6070 % 802.1q, IPv6, ICP timestamps |
---|
62 | */ |
---|
63 | const double assumed_payload_data_rate = 94.0; |
---|
64 | |
---|
65 | return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d ); |
---|
66 | } |
---|
67 | |
---|
68 | /** |
---|
69 | *** |
---|
70 | **/ |
---|
71 | |
---|
72 | #define dbgmsg( io, ... ) \ |
---|
73 | do { \ |
---|
74 | if( tr_deepLoggingIsActive( ) ) \ |
---|
75 | tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \ |
---|
76 | } while( 0 ) |
---|
77 | |
---|
78 | /** |
---|
79 | *** |
---|
80 | **/ |
---|
81 | |
---|
82 | struct tr_datatype |
---|
83 | { |
---|
84 | struct tr_datatype * next; |
---|
85 | size_t length; |
---|
86 | bool isPieceData; |
---|
87 | }; |
---|
88 | |
---|
89 | static struct tr_datatype * datatype_pool = NULL; |
---|
90 | |
---|
91 | static const struct tr_datatype TR_DATATYPE_INIT = { NULL, 0, false }; |
---|
92 | |
---|
93 | static struct tr_datatype * |
---|
94 | datatype_new( void ) |
---|
95 | { |
---|
96 | struct tr_datatype * ret; |
---|
97 | |
---|
98 | if( datatype_pool == NULL ) |
---|
99 | ret = tr_new( struct tr_datatype, 1 ); |
---|
100 | else { |
---|
101 | ret = datatype_pool; |
---|
102 | datatype_pool = datatype_pool->next; |
---|
103 | } |
---|
104 | |
---|
105 | *ret = TR_DATATYPE_INIT; |
---|
106 | return ret; |
---|
107 | } |
---|
108 | |
---|
109 | static void |
---|
110 | datatype_free( struct tr_datatype * datatype ) |
---|
111 | { |
---|
112 | datatype->next = datatype_pool; |
---|
113 | datatype_pool = datatype; |
---|
114 | } |
---|
115 | |
---|
116 | static void |
---|
117 | peer_io_pull_datatype( tr_peerIo * io ) |
---|
118 | { |
---|
119 | struct tr_datatype * tmp; |
---|
120 | |
---|
121 | if(( tmp = io->outbuf_datatypes )) |
---|
122 | { |
---|
123 | io->outbuf_datatypes = tmp->next; |
---|
124 | datatype_free( tmp ); |
---|
125 | } |
---|
126 | } |
---|
127 | |
---|
128 | static void |
---|
129 | peer_io_push_datatype( tr_peerIo * io, struct tr_datatype * datatype ) |
---|
130 | { |
---|
131 | struct tr_datatype * tmp; |
---|
132 | |
---|
133 | if(( tmp = io->outbuf_datatypes )) { |
---|
134 | while( tmp->next != NULL ) |
---|
135 | tmp = tmp->next; |
---|
136 | tmp->next = datatype; |
---|
137 | } else { |
---|
138 | io->outbuf_datatypes = datatype; |
---|
139 | } |
---|
140 | } |
---|
141 | |
---|
142 | /*** |
---|
143 | **** |
---|
144 | ***/ |
---|
145 | |
---|
146 | static void |
---|
147 | didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred ) |
---|
148 | { |
---|
149 | while( bytes_transferred && tr_isPeerIo( io ) ) |
---|
150 | { |
---|
151 | struct tr_datatype * next = io->outbuf_datatypes; |
---|
152 | |
---|
153 | const unsigned int payload = MIN( next->length, bytes_transferred ); |
---|
154 | /* For uTP sockets, the overhead is computed in utp_on_overhead. */ |
---|
155 | const unsigned int overhead = |
---|
156 | io->socket ? guessPacketOverhead( payload ) : 0; |
---|
157 | const uint64_t now = tr_time_msec( ); |
---|
158 | |
---|
159 | tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now ); |
---|
160 | |
---|
161 | if( overhead > 0 ) |
---|
162 | tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now ); |
---|
163 | |
---|
164 | if( io->didWrite ) |
---|
165 | io->didWrite( io, payload, next->isPieceData, io->userData ); |
---|
166 | |
---|
167 | if( tr_isPeerIo( io ) ) |
---|
168 | { |
---|
169 | bytes_transferred -= payload; |
---|
170 | next->length -= payload; |
---|
171 | if( !next->length ) |
---|
172 | peer_io_pull_datatype( io ); |
---|
173 | } |
---|
174 | } |
---|
175 | } |
---|
176 | |
---|
177 | static void |
---|
178 | canReadWrapper( tr_peerIo * io ) |
---|
179 | { |
---|
180 | bool err = 0; |
---|
181 | bool done = 0; |
---|
182 | tr_session * session; |
---|
183 | |
---|
184 | dbgmsg( io, "canRead" ); |
---|
185 | |
---|
186 | tr_peerIoRef( io ); |
---|
187 | |
---|
188 | session = io->session; |
---|
189 | |
---|
190 | /* try to consume the input buffer */ |
---|
191 | if( io->canRead ) |
---|
192 | { |
---|
193 | const uint64_t now = tr_time_msec( ); |
---|
194 | |
---|
195 | tr_sessionLock( session ); |
---|
196 | |
---|
197 | while( !done && !err ) |
---|
198 | { |
---|
199 | size_t piece = 0; |
---|
200 | const size_t oldLen = evbuffer_get_length( io->inbuf ); |
---|
201 | const int ret = io->canRead( io, io->userData, &piece ); |
---|
202 | const size_t used = oldLen - evbuffer_get_length( io->inbuf ); |
---|
203 | const unsigned int overhead = guessPacketOverhead( used ); |
---|
204 | |
---|
205 | if( piece || (piece!=used) ) |
---|
206 | { |
---|
207 | if( piece ) |
---|
208 | tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now ); |
---|
209 | |
---|
210 | if( used != piece ) |
---|
211 | tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now ); |
---|
212 | } |
---|
213 | |
---|
214 | if( overhead > 0 ) |
---|
215 | tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now ); |
---|
216 | |
---|
217 | switch( ret ) |
---|
218 | { |
---|
219 | case READ_NOW: |
---|
220 | if( evbuffer_get_length( io->inbuf ) ) |
---|
221 | continue; |
---|
222 | done = 1; |
---|
223 | break; |
---|
224 | |
---|
225 | case READ_LATER: |
---|
226 | done = 1; |
---|
227 | break; |
---|
228 | |
---|
229 | case READ_ERR: |
---|
230 | err = 1; |
---|
231 | break; |
---|
232 | } |
---|
233 | |
---|
234 | assert( tr_isPeerIo( io ) ); |
---|
235 | } |
---|
236 | |
---|
237 | tr_sessionUnlock( session ); |
---|
238 | } |
---|
239 | |
---|
240 | tr_peerIoUnref( io ); |
---|
241 | } |
---|
242 | |
---|
243 | static void |
---|
244 | event_read_cb( int fd, short event UNUSED, void * vio ) |
---|
245 | { |
---|
246 | int res; |
---|
247 | int e; |
---|
248 | tr_peerIo * io = vio; |
---|
249 | |
---|
250 | /* Limit the input buffer to 256K, so it doesn't grow too large */ |
---|
251 | unsigned int howmuch; |
---|
252 | unsigned int curlen; |
---|
253 | const tr_direction dir = TR_DOWN; |
---|
254 | const unsigned int max = 256 * 1024; |
---|
255 | |
---|
256 | assert( tr_isPeerIo( io ) ); |
---|
257 | assert( io->socket >= 0 ); |
---|
258 | |
---|
259 | io->pendingEvents &= ~EV_READ; |
---|
260 | |
---|
261 | curlen = evbuffer_get_length( io->inbuf ); |
---|
262 | howmuch = curlen >= max ? 0 : max - curlen; |
---|
263 | howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch ); |
---|
264 | |
---|
265 | dbgmsg( io, "libevent says this peer is ready to read" ); |
---|
266 | |
---|
267 | /* if we don't have any bandwidth left, stop reading */ |
---|
268 | if( howmuch < 1 ) { |
---|
269 | tr_peerIoSetEnabled( io, dir, false ); |
---|
270 | return; |
---|
271 | } |
---|
272 | |
---|
273 | EVUTIL_SET_SOCKET_ERROR( 0 ); |
---|
274 | res = evbuffer_read( io->inbuf, fd, (int)howmuch ); |
---|
275 | e = EVUTIL_SOCKET_ERROR( ); |
---|
276 | |
---|
277 | if( res > 0 ) |
---|
278 | { |
---|
279 | tr_peerIoSetEnabled( io, dir, true ); |
---|
280 | |
---|
281 | /* Invoke the user callback - must always be called last */ |
---|
282 | canReadWrapper( io ); |
---|
283 | } |
---|
284 | else |
---|
285 | { |
---|
286 | char errstr[512]; |
---|
287 | short what = BEV_EVENT_READING; |
---|
288 | |
---|
289 | if( res == 0 ) /* EOF */ |
---|
290 | what |= BEV_EVENT_EOF; |
---|
291 | else if( res == -1 ) { |
---|
292 | if( e == EAGAIN || e == EINTR ) { |
---|
293 | tr_peerIoSetEnabled( io, dir, true ); |
---|
294 | return; |
---|
295 | } |
---|
296 | what |= BEV_EVENT_ERROR; |
---|
297 | } |
---|
298 | |
---|
299 | dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", |
---|
300 | res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) ); |
---|
301 | |
---|
302 | if( io->gotError != NULL ) |
---|
303 | io->gotError( io, what, io->userData ); |
---|
304 | } |
---|
305 | } |
---|
306 | |
---|
307 | static int |
---|
308 | tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch ) |
---|
309 | { |
---|
310 | int e; |
---|
311 | int n; |
---|
312 | char errstr[256]; |
---|
313 | |
---|
314 | EVUTIL_SET_SOCKET_ERROR( 0 ); |
---|
315 | n = evbuffer_write_atmost( io->outbuf, fd, howmuch ); |
---|
316 | e = EVUTIL_SOCKET_ERROR( ); |
---|
317 | dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") ); |
---|
318 | |
---|
319 | return n; |
---|
320 | } |
---|
321 | |
---|
322 | static void |
---|
323 | event_write_cb( int fd, short event UNUSED, void * vio ) |
---|
324 | { |
---|
325 | int res = 0; |
---|
326 | int e; |
---|
327 | short what = BEV_EVENT_WRITING; |
---|
328 | tr_peerIo * io = vio; |
---|
329 | size_t howmuch; |
---|
330 | const tr_direction dir = TR_UP; |
---|
331 | char errstr[1024]; |
---|
332 | |
---|
333 | assert( tr_isPeerIo( io ) ); |
---|
334 | assert( io->socket >= 0 ); |
---|
335 | |
---|
336 | io->pendingEvents &= ~EV_WRITE; |
---|
337 | |
---|
338 | dbgmsg( io, "libevent says this peer is ready to write" ); |
---|
339 | |
---|
340 | /* Write as much as possible, since the socket is non-blocking, write() will |
---|
341 | * return if it can't write any more data without blocking */ |
---|
342 | howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) ); |
---|
343 | |
---|
344 | /* if we don't have any bandwidth left, stop writing */ |
---|
345 | if( howmuch < 1 ) { |
---|
346 | tr_peerIoSetEnabled( io, dir, false ); |
---|
347 | return; |
---|
348 | } |
---|
349 | |
---|
350 | EVUTIL_SET_SOCKET_ERROR( 0 ); |
---|
351 | res = tr_evbuffer_write( io, fd, howmuch ); |
---|
352 | e = EVUTIL_SOCKET_ERROR( ); |
---|
353 | |
---|
354 | if (res == -1) { |
---|
355 | if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS) |
---|
356 | goto reschedule; |
---|
357 | /* error case */ |
---|
358 | what |= BEV_EVENT_ERROR; |
---|
359 | } else if (res == 0) { |
---|
360 | /* eof case */ |
---|
361 | what |= BEV_EVENT_EOF; |
---|
362 | } |
---|
363 | if (res <= 0) |
---|
364 | goto error; |
---|
365 | |
---|
366 | if( evbuffer_get_length( io->outbuf ) ) |
---|
367 | tr_peerIoSetEnabled( io, dir, true ); |
---|
368 | |
---|
369 | didWriteWrapper( io, res ); |
---|
370 | return; |
---|
371 | |
---|
372 | reschedule: |
---|
373 | if( evbuffer_get_length( io->outbuf ) ) |
---|
374 | tr_peerIoSetEnabled( io, dir, true ); |
---|
375 | return; |
---|
376 | |
---|
377 | error: |
---|
378 | |
---|
379 | tr_net_strerror( errstr, sizeof( errstr ), e ); |
---|
380 | dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr ); |
---|
381 | |
---|
382 | if( io->gotError != NULL ) |
---|
383 | io->gotError( io, what, io->userData ); |
---|
384 | } |
---|
385 | |
---|
386 | /** |
---|
387 | *** |
---|
388 | **/ |
---|
389 | |
---|
390 | static void |
---|
391 | maybeSetCongestionAlgorithm( int socket, const char * algorithm ) |
---|
392 | { |
---|
393 | if( algorithm && *algorithm ) |
---|
394 | { |
---|
395 | const int rc = tr_netSetCongestionControl( socket, algorithm ); |
---|
396 | |
---|
397 | if( rc < 0 ) |
---|
398 | tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s", |
---|
399 | algorithm, tr_strerror( errno )); |
---|
400 | } |
---|
401 | } |
---|
402 | |
---|
403 | #ifdef WITH_UTP |
---|
404 | /* UTP callbacks */ |
---|
405 | |
---|
406 | static void |
---|
407 | utp_on_read(void *closure, const unsigned char *buf, size_t buflen) |
---|
408 | { |
---|
409 | int rc; |
---|
410 | tr_peerIo *io = closure; |
---|
411 | assert( tr_isPeerIo( io ) ); |
---|
412 | |
---|
413 | rc = evbuffer_add( io->inbuf, buf, buflen ); |
---|
414 | dbgmsg( io, "utp_on_read got %zu bytes", buflen ); |
---|
415 | |
---|
416 | if( rc < 0 ) { |
---|
417 | tr_nerr( "UTP", "On read evbuffer_add" ); |
---|
418 | return; |
---|
419 | } |
---|
420 | |
---|
421 | tr_peerIoSetEnabled( io, TR_DOWN, true ); |
---|
422 | canReadWrapper( io ); |
---|
423 | } |
---|
424 | |
---|
425 | static void |
---|
426 | utp_on_write(void *closure, unsigned char *buf, size_t buflen) |
---|
427 | { |
---|
428 | int rc; |
---|
429 | tr_peerIo *io = closure; |
---|
430 | assert( tr_isPeerIo( io ) ); |
---|
431 | |
---|
432 | rc = evbuffer_remove( io->outbuf, buf, buflen ); |
---|
433 | dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc ); |
---|
434 | assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */ |
---|
435 | if( rc < (long)buflen ) { |
---|
436 | tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen); |
---|
437 | } |
---|
438 | |
---|
439 | didWriteWrapper( io, buflen ); |
---|
440 | } |
---|
441 | |
---|
442 | static size_t |
---|
443 | utp_get_rb_size(void *closure) |
---|
444 | { |
---|
445 | size_t bytes; |
---|
446 | tr_peerIo *io = closure; |
---|
447 | assert( tr_isPeerIo( io ) ); |
---|
448 | |
---|
449 | bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE ); |
---|
450 | |
---|
451 | dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes ); |
---|
452 | return UTP_READ_BUFFER_SIZE - bytes; |
---|
453 | } |
---|
454 | |
---|
455 | static void |
---|
456 | utp_on_state_change(void *closure, int state) |
---|
457 | { |
---|
458 | tr_peerIo *io = closure; |
---|
459 | assert( tr_isPeerIo( io ) ); |
---|
460 | |
---|
461 | if( state == UTP_STATE_CONNECT ) { |
---|
462 | dbgmsg( io, "utp_on_state_change -- changed to connected" ); |
---|
463 | io->utpSupported = true; |
---|
464 | } else if( state == UTP_STATE_WRITABLE ) { |
---|
465 | dbgmsg( io, "utp_on_state_change -- changed to writable" ); |
---|
466 | } else if( state == UTP_STATE_EOF ) { |
---|
467 | if( io->gotError ) |
---|
468 | io->gotError( io, BEV_EVENT_EOF, io->userData ); |
---|
469 | } else if( state == UTP_STATE_DESTROYING ) { |
---|
470 | tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" ); |
---|
471 | return; |
---|
472 | } else { |
---|
473 | tr_nerr( "UTP", "Unknown state %d", state ); |
---|
474 | } |
---|
475 | } |
---|
476 | |
---|
477 | static void |
---|
478 | utp_on_error(void *closure, int errcode) |
---|
479 | { |
---|
480 | tr_peerIo *io = closure; |
---|
481 | assert( tr_isPeerIo( io ) ); |
---|
482 | |
---|
483 | dbgmsg( io, "utp_on_error -- errcode is %d", errcode ); |
---|
484 | |
---|
485 | if( io->gotError ) { |
---|
486 | errno = errcode; |
---|
487 | io->gotError( io, BEV_EVENT_ERROR, io->userData ); |
---|
488 | } |
---|
489 | } |
---|
490 | |
---|
491 | static void |
---|
492 | utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED) |
---|
493 | { |
---|
494 | tr_peerIo *io = closure; |
---|
495 | assert( tr_isPeerIo( io ) ); |
---|
496 | |
---|
497 | dbgmsg( io, "utp_on_overhead -- count is %zu", count ); |
---|
498 | |
---|
499 | tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN, |
---|
500 | count, false, tr_time_msec() ); |
---|
501 | } |
---|
502 | |
---|
503 | static struct UTPFunctionTable utp_function_table = { |
---|
504 | .on_read = utp_on_read, |
---|
505 | .on_write = utp_on_write, |
---|
506 | .get_rb_size = utp_get_rb_size, |
---|
507 | .on_state = utp_on_state_change, |
---|
508 | .on_error = utp_on_error, |
---|
509 | .on_overhead = utp_on_overhead |
---|
510 | }; |
---|
511 | |
---|
512 | |
---|
513 | /* Dummy UTP callbacks. */ |
---|
514 | /* We switch a UTP socket to use these after the associated peerIo has been |
---|
515 | destroyed -- see io_dtor. */ |
---|
516 | |
---|
517 | static void |
---|
518 | dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED ) |
---|
519 | { |
---|
520 | /* This cannot happen, as far as I'm aware. */ |
---|
521 | tr_nerr( "UTP", "On_read called on closed socket" ); |
---|
522 | |
---|
523 | } |
---|
524 | |
---|
525 | static void |
---|
526 | dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen) |
---|
527 | { |
---|
528 | /* This can very well happen if we've shut down a peer connection that |
---|
529 | had unflushed buffers. Complain and send zeroes. */ |
---|
530 | tr_ndbg( "UTP", "On_write called on closed socket" ); |
---|
531 | memset( buf, 0, buflen ); |
---|
532 | } |
---|
533 | |
---|
534 | static size_t |
---|
535 | dummy_get_rb_size( void * closure UNUSED ) |
---|
536 | { |
---|
537 | return 0; |
---|
538 | } |
---|
539 | |
---|
540 | static void |
---|
541 | dummy_on_state_change(void * closure UNUSED, int state UNUSED ) |
---|
542 | { |
---|
543 | return; |
---|
544 | } |
---|
545 | |
---|
546 | static void |
---|
547 | dummy_on_error( void * closure UNUSED, int errcode UNUSED ) |
---|
548 | { |
---|
549 | return; |
---|
550 | } |
---|
551 | |
---|
552 | static void |
---|
553 | dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED ) |
---|
554 | { |
---|
555 | return; |
---|
556 | } |
---|
557 | |
---|
558 | static struct UTPFunctionTable dummy_utp_function_table = { |
---|
559 | .on_read = dummy_read, |
---|
560 | .on_write = dummy_write, |
---|
561 | .get_rb_size = dummy_get_rb_size, |
---|
562 | .on_state = dummy_on_state_change, |
---|
563 | .on_error = dummy_on_error, |
---|
564 | .on_overhead = dummy_on_overhead |
---|
565 | }; |
---|
566 | |
---|
567 | #endif /* #ifdef WITH_UTP */ |
---|
568 | |
---|
569 | static tr_peerIo* |
---|
570 | tr_peerIoNew( tr_session * session, |
---|
571 | tr_bandwidth * parent, |
---|
572 | const tr_address * addr, |
---|
573 | tr_port port, |
---|
574 | const uint8_t * torrentHash, |
---|
575 | bool isIncoming, |
---|
576 | bool isSeed, |
---|
577 | int socket, |
---|
578 | struct UTPSocket * utp_socket) |
---|
579 | { |
---|
580 | tr_peerIo * io; |
---|
581 | |
---|
582 | assert( session != NULL ); |
---|
583 | assert( session->events != NULL ); |
---|
584 | assert( tr_isBool( isIncoming ) ); |
---|
585 | assert( tr_isBool( isSeed ) ); |
---|
586 | assert( tr_amInEventThread( session ) ); |
---|
587 | assert( (socket < 0) == (utp_socket != NULL) ); |
---|
588 | #ifndef WITH_UTP |
---|
589 | assert( socket >= 0 ); |
---|
590 | #endif |
---|
591 | |
---|
592 | if( socket >= 0 ) { |
---|
593 | tr_netSetTOS( socket, session->peerSocketTOS ); |
---|
594 | maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm ); |
---|
595 | } |
---|
596 | |
---|
597 | io = tr_new0( tr_peerIo, 1 ); |
---|
598 | io->magicNumber = PEER_IO_MAGIC_NUMBER; |
---|
599 | io->refCount = 1; |
---|
600 | tr_cryptoConstruct( &io->crypto, torrentHash, isIncoming ); |
---|
601 | io->session = session; |
---|
602 | io->addr = *addr; |
---|
603 | io->isSeed = isSeed; |
---|
604 | io->port = port; |
---|
605 | io->socket = socket; |
---|
606 | io->utp_socket = utp_socket; |
---|
607 | io->isIncoming = isIncoming != 0; |
---|
608 | io->timeCreated = tr_time( ); |
---|
609 | io->inbuf = evbuffer_new( ); |
---|
610 | io->outbuf = evbuffer_new( ); |
---|
611 | tr_bandwidthConstruct( &io->bandwidth, session, parent ); |
---|
612 | tr_bandwidthSetPeer( &io->bandwidth, io ); |
---|
613 | dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent ); |
---|
614 | dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket ); |
---|
615 | |
---|
616 | if( io->socket >= 0 ) { |
---|
617 | io->event_read = event_new( session->event_base, |
---|
618 | io->socket, EV_READ, event_read_cb, io ); |
---|
619 | io->event_write = event_new( session->event_base, |
---|
620 | io->socket, EV_WRITE, event_write_cb, io ); |
---|
621 | } |
---|
622 | #ifdef WITH_UTP |
---|
623 | else { |
---|
624 | UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE ); |
---|
625 | dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" ); |
---|
626 | UTP_SetCallbacks( utp_socket, |
---|
627 | &utp_function_table, |
---|
628 | io ); |
---|
629 | if( !isIncoming ) { |
---|
630 | dbgmsg( io, "%s", "calling UTP_Connect" ); |
---|
631 | UTP_Connect( utp_socket ); |
---|
632 | } |
---|
633 | } |
---|
634 | #endif |
---|
635 | |
---|
636 | return io; |
---|
637 | } |
---|
638 | |
---|
639 | tr_peerIo* |
---|
640 | tr_peerIoNewIncoming( tr_session * session, |
---|
641 | tr_bandwidth * parent, |
---|
642 | const tr_address * addr, |
---|
643 | tr_port port, |
---|
644 | int fd, |
---|
645 | struct UTPSocket * utp_socket ) |
---|
646 | { |
---|
647 | assert( session ); |
---|
648 | assert( tr_address_is_valid( addr ) ); |
---|
649 | |
---|
650 | return tr_peerIoNew( session, parent, addr, port, NULL, true, false, |
---|
651 | fd, utp_socket ); |
---|
652 | } |
---|
653 | |
---|
654 | tr_peerIo* |
---|
655 | tr_peerIoNewOutgoing( tr_session * session, |
---|
656 | tr_bandwidth * parent, |
---|
657 | const tr_address * addr, |
---|
658 | tr_port port, |
---|
659 | const uint8_t * torrentHash, |
---|
660 | bool isSeed, |
---|
661 | bool utp ) |
---|
662 | { |
---|
663 | int fd = -1; |
---|
664 | struct UTPSocket * utp_socket = NULL; |
---|
665 | |
---|
666 | assert( session ); |
---|
667 | assert( tr_address_is_valid( addr ) ); |
---|
668 | assert( torrentHash ); |
---|
669 | |
---|
670 | if( utp ) |
---|
671 | utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed ); |
---|
672 | |
---|
673 | if( !utp_socket ) { |
---|
674 | fd = tr_netOpenPeerSocket( session, addr, port, isSeed ); |
---|
675 | dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd ); |
---|
676 | } |
---|
677 | |
---|
678 | if( fd < 0 && utp_socket == NULL ) |
---|
679 | return NULL; |
---|
680 | |
---|
681 | return tr_peerIoNew( session, parent, addr, port, |
---|
682 | torrentHash, false, isSeed, fd, utp_socket ); |
---|
683 | } |
---|
684 | |
---|
685 | /*** |
---|
686 | **** |
---|
687 | ***/ |
---|
688 | |
---|
689 | static void |
---|
690 | event_enable( tr_peerIo * io, short event ) |
---|
691 | { |
---|
692 | assert( tr_amInEventThread( io->session ) ); |
---|
693 | assert( io->session != NULL ); |
---|
694 | assert( io->session->events != NULL ); |
---|
695 | |
---|
696 | if( io->socket < 0 ) |
---|
697 | return; |
---|
698 | |
---|
699 | assert( io->session->events != NULL ); |
---|
700 | assert( event_initialized( io->event_read ) ); |
---|
701 | assert( event_initialized( io->event_write ) ); |
---|
702 | |
---|
703 | if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) ) |
---|
704 | { |
---|
705 | dbgmsg( io, "enabling libevent ready-to-read polling" ); |
---|
706 | event_add( io->event_read, NULL ); |
---|
707 | io->pendingEvents |= EV_READ; |
---|
708 | } |
---|
709 | |
---|
710 | if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) ) |
---|
711 | { |
---|
712 | dbgmsg( io, "enabling libevent ready-to-write polling" ); |
---|
713 | event_add( io->event_write, NULL ); |
---|
714 | io->pendingEvents |= EV_WRITE; |
---|
715 | } |
---|
716 | } |
---|
717 | |
---|
718 | static void |
---|
719 | event_disable( struct tr_peerIo * io, short event ) |
---|
720 | { |
---|
721 | assert( tr_amInEventThread( io->session ) ); |
---|
722 | assert( io->session != NULL ); |
---|
723 | |
---|
724 | if( io->socket < 0 ) |
---|
725 | return; |
---|
726 | |
---|
727 | assert( io->session->events != NULL ); |
---|
728 | assert( event_initialized( io->event_read ) ); |
---|
729 | assert( event_initialized( io->event_write ) ); |
---|
730 | |
---|
731 | if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) ) |
---|
732 | { |
---|
733 | dbgmsg( io, "disabling libevent ready-to-read polling" ); |
---|
734 | event_del( io->event_read ); |
---|
735 | io->pendingEvents &= ~EV_READ; |
---|
736 | } |
---|
737 | |
---|
738 | if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) ) |
---|
739 | { |
---|
740 | dbgmsg( io, "disabling libevent ready-to-write polling" ); |
---|
741 | event_del( io->event_write ); |
---|
742 | io->pendingEvents &= ~EV_WRITE; |
---|
743 | } |
---|
744 | } |
---|
745 | |
---|
746 | void |
---|
747 | tr_peerIoSetEnabled( tr_peerIo * io, |
---|
748 | tr_direction dir, |
---|
749 | bool isEnabled ) |
---|
750 | { |
---|
751 | const short event = dir == TR_UP ? EV_WRITE : EV_READ; |
---|
752 | |
---|
753 | assert( tr_isPeerIo( io ) ); |
---|
754 | assert( tr_isDirection( dir ) ); |
---|
755 | assert( tr_amInEventThread( io->session ) ); |
---|
756 | assert( io->session->events != NULL ); |
---|
757 | |
---|
758 | if( isEnabled ) |
---|
759 | event_enable( io, event ); |
---|
760 | else |
---|
761 | event_disable( io, event ); |
---|
762 | } |
---|
763 | |
---|
764 | /*** |
---|
765 | **** |
---|
766 | ***/ |
---|
767 | static void |
---|
768 | io_close_socket( tr_peerIo * io ) |
---|
769 | { |
---|
770 | if( io->socket >= 0 ) { |
---|
771 | tr_netClose( io->session, io->socket ); |
---|
772 | io->socket = -1; |
---|
773 | } |
---|
774 | |
---|
775 | if( io->event_read != NULL) { |
---|
776 | event_free( io->event_read ); |
---|
777 | io->event_read = NULL; |
---|
778 | } |
---|
779 | |
---|
780 | if( io->event_write != NULL) { |
---|
781 | event_free( io->event_write ); |
---|
782 | io->event_write = NULL; |
---|
783 | } |
---|
784 | |
---|
785 | #ifdef WITH_UTP |
---|
786 | if( io->utp_socket ) { |
---|
787 | UTP_SetCallbacks( io->utp_socket, |
---|
788 | &dummy_utp_function_table, |
---|
789 | NULL ); |
---|
790 | UTP_Close( io->utp_socket ); |
---|
791 | |
---|
792 | io->utp_socket = NULL; |
---|
793 | } |
---|
794 | #endif |
---|
795 | } |
---|
796 | |
---|
797 | static void |
---|
798 | io_dtor( void * vio ) |
---|
799 | { |
---|
800 | tr_peerIo * io = vio; |
---|
801 | |
---|
802 | assert( tr_isPeerIo( io ) ); |
---|
803 | assert( tr_amInEventThread( io->session ) ); |
---|
804 | assert( io->session->events != NULL ); |
---|
805 | |
---|
806 | dbgmsg( io, "in tr_peerIo destructor" ); |
---|
807 | event_disable( io, EV_READ | EV_WRITE ); |
---|
808 | tr_bandwidthDestruct( &io->bandwidth ); |
---|
809 | evbuffer_free( io->outbuf ); |
---|
810 | evbuffer_free( io->inbuf ); |
---|
811 | io_close_socket( io ); |
---|
812 | tr_cryptoDestruct( &io->crypto ); |
---|
813 | |
---|
814 | while( io->outbuf_datatypes != NULL ) |
---|
815 | peer_io_pull_datatype( io ); |
---|
816 | |
---|
817 | memset( io, ~0, sizeof( tr_peerIo ) ); |
---|
818 | tr_free( io ); |
---|
819 | } |
---|
820 | |
---|
821 | static void |
---|
822 | tr_peerIoFree( tr_peerIo * io ) |
---|
823 | { |
---|
824 | if( io ) |
---|
825 | { |
---|
826 | dbgmsg( io, "in tr_peerIoFree" ); |
---|
827 | io->canRead = NULL; |
---|
828 | io->didWrite = NULL; |
---|
829 | io->gotError = NULL; |
---|
830 | tr_runInEventThread( io->session, io_dtor, io ); |
---|
831 | } |
---|
832 | } |
---|
833 | |
---|
834 | void |
---|
835 | tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io ) |
---|
836 | { |
---|
837 | assert( tr_isPeerIo( io ) ); |
---|
838 | |
---|
839 | dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d", |
---|
840 | file, line, io->refCount, io->refCount+1 ); |
---|
841 | |
---|
842 | ++io->refCount; |
---|
843 | } |
---|
844 | |
---|
845 | void |
---|
846 | tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io ) |
---|
847 | { |
---|
848 | assert( tr_isPeerIo( io ) ); |
---|
849 | |
---|
850 | dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d", |
---|
851 | file, line, io->refCount, io->refCount-1 ); |
---|
852 | |
---|
853 | if( !--io->refCount ) |
---|
854 | tr_peerIoFree( io ); |
---|
855 | } |
---|
856 | |
---|
857 | const tr_address* |
---|
858 | tr_peerIoGetAddress( const tr_peerIo * io, tr_port * port ) |
---|
859 | { |
---|
860 | assert( tr_isPeerIo( io ) ); |
---|
861 | |
---|
862 | if( port ) |
---|
863 | *port = io->port; |
---|
864 | |
---|
865 | return &io->addr; |
---|
866 | } |
---|
867 | |
---|
868 | const char* |
---|
869 | tr_peerIoAddrStr( const tr_address * addr, tr_port port ) |
---|
870 | { |
---|
871 | static char buf[512]; |
---|
872 | tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_address_to_string( addr ), ntohs( port ) ); |
---|
873 | return buf; |
---|
874 | } |
---|
875 | |
---|
876 | const char* tr_peerIoGetAddrStr( const tr_peerIo * io ) |
---|
877 | { |
---|
878 | return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error"; |
---|
879 | } |
---|
880 | |
---|
881 | void |
---|
882 | tr_peerIoSetIOFuncs( tr_peerIo * io, |
---|
883 | tr_can_read_cb readcb, |
---|
884 | tr_did_write_cb writecb, |
---|
885 | tr_net_error_cb errcb, |
---|
886 | void * userData ) |
---|
887 | { |
---|
888 | io->canRead = readcb; |
---|
889 | io->didWrite = writecb; |
---|
890 | io->gotError = errcb; |
---|
891 | io->userData = userData; |
---|
892 | } |
---|
893 | |
---|
894 | void |
---|
895 | tr_peerIoClear( tr_peerIo * io ) |
---|
896 | { |
---|
897 | tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL ); |
---|
898 | tr_peerIoSetEnabled( io, TR_UP, false ); |
---|
899 | tr_peerIoSetEnabled( io, TR_DOWN, false ); |
---|
900 | } |
---|
901 | |
---|
902 | int |
---|
903 | tr_peerIoReconnect( tr_peerIo * io ) |
---|
904 | { |
---|
905 | short int pendingEvents; |
---|
906 | tr_session * session; |
---|
907 | |
---|
908 | assert( tr_isPeerIo( io ) ); |
---|
909 | assert( !tr_peerIoIsIncoming( io ) ); |
---|
910 | |
---|
911 | session = tr_peerIoGetSession( io ); |
---|
912 | |
---|
913 | pendingEvents = io->pendingEvents; |
---|
914 | event_disable( io, EV_READ | EV_WRITE ); |
---|
915 | |
---|
916 | io_close_socket( io ); |
---|
917 | |
---|
918 | io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed ); |
---|
919 | io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io ); |
---|
920 | io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io ); |
---|
921 | |
---|
922 | if( io->socket >= 0 ) |
---|
923 | { |
---|
924 | event_enable( io, pendingEvents ); |
---|
925 | tr_netSetTOS( io->socket, session->peerSocketTOS ); |
---|
926 | maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm ); |
---|
927 | return 0; |
---|
928 | } |
---|
929 | |
---|
930 | return -1; |
---|
931 | } |
---|
932 | |
---|
933 | /** |
---|
934 | *** |
---|
935 | **/ |
---|
936 | |
---|
937 | void |
---|
938 | tr_peerIoSetTorrentHash( tr_peerIo * io, |
---|
939 | const uint8_t * hash ) |
---|
940 | { |
---|
941 | assert( tr_isPeerIo( io ) ); |
---|
942 | |
---|
943 | tr_cryptoSetTorrentHash( &io->crypto, hash ); |
---|
944 | } |
---|
945 | |
---|
946 | const uint8_t* |
---|
947 | tr_peerIoGetTorrentHash( tr_peerIo * io ) |
---|
948 | { |
---|
949 | assert( tr_isPeerIo( io ) ); |
---|
950 | |
---|
951 | return tr_cryptoGetTorrentHash( &io->crypto ); |
---|
952 | } |
---|
953 | |
---|
954 | int |
---|
955 | tr_peerIoHasTorrentHash( const tr_peerIo * io ) |
---|
956 | { |
---|
957 | assert( tr_isPeerIo( io ) ); |
---|
958 | |
---|
959 | return tr_cryptoHasTorrentHash( &io->crypto ); |
---|
960 | } |
---|
961 | |
---|
962 | /** |
---|
963 | *** |
---|
964 | **/ |
---|
965 | |
---|
966 | void |
---|
967 | tr_peerIoSetPeersId( tr_peerIo * io, const uint8_t * peer_id ) |
---|
968 | { |
---|
969 | assert( tr_isPeerIo( io ) ); |
---|
970 | |
---|
971 | if( ( io->peerIdIsSet = peer_id != NULL ) ) |
---|
972 | memcpy( io->peerId, peer_id, 20 ); |
---|
973 | else |
---|
974 | memset( io->peerId, 0, 20 ); |
---|
975 | } |
---|
976 | |
---|
977 | /** |
---|
978 | *** |
---|
979 | **/ |
---|
980 | |
---|
981 | static unsigned int |
---|
982 | getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now ) |
---|
983 | { |
---|
984 | /* this is all kind of arbitrary, but what seems to work well is |
---|
985 | * being large enough to hold the next 20 seconds' worth of input, |
---|
986 | * or a few blocks, whichever is bigger. |
---|
987 | * It's okay to tweak this as needed */ |
---|
988 | const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP ); |
---|
989 | const unsigned int period = 15u; /* arbitrary */ |
---|
990 | /* the 3 is arbitrary; the .5 is to leave room for messages */ |
---|
991 | static const unsigned int ceiling = (unsigned int)( MAX_BLOCK_SIZE * 3.5 ); |
---|
992 | return MAX( ceiling, currentSpeed_Bps*period ); |
---|
993 | } |
---|
994 | |
---|
995 | size_t |
---|
996 | tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now ) |
---|
997 | { |
---|
998 | const size_t desiredLen = getDesiredOutputBufferSize( io, now ); |
---|
999 | const size_t currentLen = evbuffer_get_length( io->outbuf ); |
---|
1000 | size_t freeSpace = 0; |
---|
1001 | |
---|
1002 | if( desiredLen > currentLen ) |
---|
1003 | freeSpace = desiredLen - currentLen; |
---|
1004 | |
---|
1005 | return freeSpace; |
---|
1006 | } |
---|
1007 | |
---|
1008 | /** |
---|
1009 | *** |
---|
1010 | **/ |
---|
1011 | |
---|
1012 | void |
---|
1013 | tr_peerIoSetEncryption( tr_peerIo * io, tr_encryption_type encryption_type ) |
---|
1014 | { |
---|
1015 | assert( tr_isPeerIo( io ) ); |
---|
1016 | assert( encryption_type == PEER_ENCRYPTION_NONE |
---|
1017 | || encryption_type == PEER_ENCRYPTION_RC4 ); |
---|
1018 | |
---|
1019 | io->encryption_type = encryption_type; |
---|
1020 | } |
---|
1021 | |
---|
1022 | /** |
---|
1023 | *** |
---|
1024 | **/ |
---|
1025 | |
---|
1026 | static void |
---|
1027 | addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData ) |
---|
1028 | { |
---|
1029 | struct tr_datatype * d; |
---|
1030 | d = datatype_new( ); |
---|
1031 | d->isPieceData = isPieceData != 0; |
---|
1032 | d->length = byteCount; |
---|
1033 | peer_io_push_datatype( io, d ); |
---|
1034 | } |
---|
1035 | |
---|
1036 | static void |
---|
1037 | maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf ) |
---|
1038 | { |
---|
1039 | if( io->encryption_type == PEER_ENCRYPTION_RC4 ) |
---|
1040 | { |
---|
1041 | struct evbuffer_ptr pos; |
---|
1042 | struct evbuffer_iovec iovec; |
---|
1043 | evbuffer_ptr_set( buf, &pos, 0, EVBUFFER_PTR_SET ); |
---|
1044 | do { |
---|
1045 | evbuffer_peek( buf, -1, &pos, &iovec, 1 ); |
---|
1046 | tr_cryptoEncrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base ); |
---|
1047 | } while( !evbuffer_ptr_set( buf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) ); |
---|
1048 | } |
---|
1049 | } |
---|
1050 | |
---|
1051 | void |
---|
1052 | tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData ) |
---|
1053 | { |
---|
1054 | const size_t byteCount = evbuffer_get_length( buf ); |
---|
1055 | maybeEncryptBuffer( io, buf ); |
---|
1056 | evbuffer_add_buffer( io->outbuf, buf ); |
---|
1057 | addDatatype( io, byteCount, isPieceData ); |
---|
1058 | } |
---|
1059 | |
---|
1060 | void |
---|
1061 | tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData ) |
---|
1062 | { |
---|
1063 | struct evbuffer_iovec iovec; |
---|
1064 | evbuffer_reserve_space( io->outbuf, byteCount, &iovec, 1 ); |
---|
1065 | |
---|
1066 | iovec.iov_len = byteCount; |
---|
1067 | if( io->encryption_type == PEER_ENCRYPTION_RC4 ) |
---|
1068 | tr_cryptoEncrypt( &io->crypto, iovec.iov_len, bytes, iovec.iov_base ); |
---|
1069 | else |
---|
1070 | memcpy( iovec.iov_base, bytes, iovec.iov_len ); |
---|
1071 | evbuffer_commit_space( io->outbuf, &iovec, 1 ); |
---|
1072 | |
---|
1073 | addDatatype( io, byteCount, isPieceData ); |
---|
1074 | } |
---|
1075 | |
---|
1076 | /*** |
---|
1077 | **** |
---|
1078 | ***/ |
---|
1079 | |
---|
1080 | void |
---|
1081 | evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte ) |
---|
1082 | { |
---|
1083 | evbuffer_add( outbuf, &byte, 1 ); |
---|
1084 | } |
---|
1085 | |
---|
1086 | void |
---|
1087 | evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs ) |
---|
1088 | { |
---|
1089 | const uint16_t ns = htons( addme_hs ); |
---|
1090 | evbuffer_add( outbuf, &ns, sizeof( ns ) ); |
---|
1091 | } |
---|
1092 | |
---|
1093 | void |
---|
1094 | evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl ) |
---|
1095 | { |
---|
1096 | const uint32_t nl = htonl( addme_hl ); |
---|
1097 | evbuffer_add( outbuf, &nl, sizeof( nl ) ); |
---|
1098 | } |
---|
1099 | |
---|
1100 | void |
---|
1101 | evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll ) |
---|
1102 | { |
---|
1103 | const uint64_t nll = tr_htonll( addme_hll ); |
---|
1104 | evbuffer_add( outbuf, &nll, sizeof( nll ) ); |
---|
1105 | } |
---|
1106 | |
---|
1107 | /*** |
---|
1108 | **** |
---|
1109 | ***/ |
---|
1110 | |
---|
1111 | void |
---|
1112 | tr_peerIoReadBytesToBuf( tr_peerIo * io, struct evbuffer * inbuf, struct evbuffer * outbuf, size_t byteCount ) |
---|
1113 | { |
---|
1114 | struct evbuffer * tmp; |
---|
1115 | const size_t old_length = evbuffer_get_length( outbuf ); |
---|
1116 | |
---|
1117 | assert( tr_isPeerIo( io ) ); |
---|
1118 | assert( evbuffer_get_length( inbuf ) >= byteCount ); |
---|
1119 | |
---|
1120 | /* append it to outbuf */ |
---|
1121 | tmp = evbuffer_new( ); |
---|
1122 | evbuffer_remove_buffer( inbuf, tmp, byteCount ); |
---|
1123 | evbuffer_add_buffer( outbuf, tmp ); |
---|
1124 | evbuffer_free( tmp ); |
---|
1125 | |
---|
1126 | /* decrypt if needed */ |
---|
1127 | if( io->encryption_type == PEER_ENCRYPTION_RC4 ) { |
---|
1128 | struct evbuffer_ptr pos; |
---|
1129 | struct evbuffer_iovec iovec; |
---|
1130 | evbuffer_ptr_set( outbuf, &pos, old_length, EVBUFFER_PTR_SET ); |
---|
1131 | do { |
---|
1132 | evbuffer_peek( outbuf, byteCount, &pos, &iovec, 1 ); |
---|
1133 | tr_cryptoDecrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base ); |
---|
1134 | byteCount -= iovec.iov_len; |
---|
1135 | } while( !evbuffer_ptr_set( outbuf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) ); |
---|
1136 | } |
---|
1137 | } |
---|
1138 | |
---|
1139 | void |
---|
1140 | tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount ) |
---|
1141 | { |
---|
1142 | assert( tr_isPeerIo( io ) ); |
---|
1143 | assert( evbuffer_get_length( inbuf ) >= byteCount ); |
---|
1144 | |
---|
1145 | switch( io->encryption_type ) |
---|
1146 | { |
---|
1147 | case PEER_ENCRYPTION_NONE: |
---|
1148 | evbuffer_remove( inbuf, bytes, byteCount ); |
---|
1149 | break; |
---|
1150 | |
---|
1151 | case PEER_ENCRYPTION_RC4: |
---|
1152 | evbuffer_remove( inbuf, bytes, byteCount ); |
---|
1153 | tr_cryptoDecrypt( &io->crypto, byteCount, bytes, bytes ); |
---|
1154 | break; |
---|
1155 | |
---|
1156 | default: |
---|
1157 | assert( 0 ); |
---|
1158 | } |
---|
1159 | } |
---|
1160 | |
---|
1161 | void |
---|
1162 | tr_peerIoReadUint16( tr_peerIo * io, |
---|
1163 | struct evbuffer * inbuf, |
---|
1164 | uint16_t * setme ) |
---|
1165 | { |
---|
1166 | uint16_t tmp; |
---|
1167 | tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) ); |
---|
1168 | *setme = ntohs( tmp ); |
---|
1169 | } |
---|
1170 | |
---|
1171 | void tr_peerIoReadUint32( tr_peerIo * io, |
---|
1172 | struct evbuffer * inbuf, |
---|
1173 | uint32_t * setme ) |
---|
1174 | { |
---|
1175 | uint32_t tmp; |
---|
1176 | tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) ); |
---|
1177 | *setme = ntohl( tmp ); |
---|
1178 | } |
---|
1179 | |
---|
1180 | void |
---|
1181 | tr_peerIoDrain( tr_peerIo * io, |
---|
1182 | struct evbuffer * inbuf, |
---|
1183 | size_t byteCount ) |
---|
1184 | { |
---|
1185 | char buf[4096]; |
---|
1186 | const size_t buflen = sizeof( buf ); |
---|
1187 | |
---|
1188 | while( byteCount > 0 ) |
---|
1189 | { |
---|
1190 | const size_t thisPass = MIN( byteCount, buflen ); |
---|
1191 | tr_peerIoReadBytes( io, inbuf, buf, thisPass ); |
---|
1192 | byteCount -= thisPass; |
---|
1193 | } |
---|
1194 | } |
---|
1195 | |
---|
1196 | /*** |
---|
1197 | **** |
---|
1198 | ***/ |
---|
1199 | |
---|
1200 | static int |
---|
1201 | tr_peerIoTryRead( tr_peerIo * io, size_t howmuch ) |
---|
1202 | { |
---|
1203 | int res = 0; |
---|
1204 | |
---|
1205 | if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch ))) |
---|
1206 | { |
---|
1207 | if( io->utp_socket != NULL ) /* utp peer connection */ |
---|
1208 | { |
---|
1209 | /* UTP_RBDrained notifies libutp that your read buffer is emtpy. |
---|
1210 | * It opens up the congestion window by sending an ACK (soonish) |
---|
1211 | * if one was not going to be sent. */ |
---|
1212 | if( evbuffer_get_length( io->inbuf ) == 0 ) |
---|
1213 | UTP_RBDrained( io->utp_socket ); |
---|
1214 | } |
---|
1215 | else /* tcp peer connection */ |
---|
1216 | { |
---|
1217 | int e; |
---|
1218 | |
---|
1219 | EVUTIL_SET_SOCKET_ERROR( 0 ); |
---|
1220 | res = evbuffer_read( io->inbuf, io->socket, (int)howmuch ); |
---|
1221 | e = EVUTIL_SOCKET_ERROR( ); |
---|
1222 | |
---|
1223 | dbgmsg( io, "read %d from peer (%s)", res, (res==-1?tr_strerror(e):"") ); |
---|
1224 | |
---|
1225 | if( evbuffer_get_length( io->inbuf ) ) |
---|
1226 | canReadWrapper( io ); |
---|
1227 | |
---|
1228 | if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) ) |
---|
1229 | { |
---|
1230 | char errstr[512]; |
---|
1231 | short what = BEV_EVENT_READING | BEV_EVENT_ERROR; |
---|
1232 | if( res == 0 ) |
---|
1233 | what |= BEV_EVENT_EOF; |
---|
1234 | dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", |
---|
1235 | res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) ); |
---|
1236 | io->gotError( io, what, io->userData ); |
---|
1237 | } |
---|
1238 | } |
---|
1239 | } |
---|
1240 | |
---|
1241 | return res; |
---|
1242 | } |
---|
1243 | |
---|
1244 | static int |
---|
1245 | tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch ) |
---|
1246 | { |
---|
1247 | int n = 0; |
---|
1248 | const size_t old_len = evbuffer_get_length( io->outbuf ); |
---|
1249 | dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch ); |
---|
1250 | |
---|
1251 | if( howmuch > old_len ) |
---|
1252 | howmuch = old_len; |
---|
1253 | |
---|
1254 | if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch ))) |
---|
1255 | { |
---|
1256 | if( io->utp_socket != NULL ) /* utp peer connection */ |
---|
1257 | { |
---|
1258 | const size_t old_len = evbuffer_get_length( io->outbuf ); |
---|
1259 | UTP_Write( io->utp_socket, howmuch ); |
---|
1260 | n = old_len - evbuffer_get_length( io->outbuf ); |
---|
1261 | } |
---|
1262 | else |
---|
1263 | { |
---|
1264 | int e; |
---|
1265 | |
---|
1266 | EVUTIL_SET_SOCKET_ERROR( 0 ); |
---|
1267 | n = tr_evbuffer_write( io, io->socket, howmuch ); |
---|
1268 | e = EVUTIL_SOCKET_ERROR( ); |
---|
1269 | |
---|
1270 | if( n > 0 ) |
---|
1271 | didWriteWrapper( io, n ); |
---|
1272 | |
---|
1273 | if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) ) |
---|
1274 | { |
---|
1275 | char errstr[512]; |
---|
1276 | const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR; |
---|
1277 | |
---|
1278 | dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", |
---|
1279 | n, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) ); |
---|
1280 | |
---|
1281 | if( io->gotError != NULL ) |
---|
1282 | io->gotError( io, what, io->userData ); |
---|
1283 | } |
---|
1284 | } |
---|
1285 | } |
---|
1286 | |
---|
1287 | return n; |
---|
1288 | } |
---|
1289 | |
---|
1290 | int |
---|
1291 | tr_peerIoFlush( tr_peerIo * io, tr_direction dir, size_t limit ) |
---|
1292 | { |
---|
1293 | int bytesUsed = 0; |
---|
1294 | |
---|
1295 | assert( tr_isPeerIo( io ) ); |
---|
1296 | assert( tr_isDirection( dir ) ); |
---|
1297 | |
---|
1298 | if( dir == TR_DOWN ) |
---|
1299 | bytesUsed = tr_peerIoTryRead( io, limit ); |
---|
1300 | else |
---|
1301 | bytesUsed = tr_peerIoTryWrite( io, limit ); |
---|
1302 | |
---|
1303 | dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed ); |
---|
1304 | return bytesUsed; |
---|
1305 | } |
---|
1306 | |
---|
1307 | int |
---|
1308 | tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io ) |
---|
1309 | { |
---|
1310 | size_t byteCount = 0; |
---|
1311 | const struct tr_datatype * it; |
---|
1312 | |
---|
1313 | /* count up how many bytes are used by non-piece-data messages |
---|
1314 | at the front of our outbound queue */ |
---|
1315 | for( it=io->outbuf_datatypes; it!=NULL; it=it->next ) |
---|
1316 | if( it->isPieceData ) |
---|
1317 | break; |
---|
1318 | else |
---|
1319 | byteCount += it->length; |
---|
1320 | |
---|
1321 | return tr_peerIoFlush( io, TR_UP, byteCount ); |
---|
1322 | } |
---|