source: trunk/libtransmission/peer-io.c @ 12305

Last change on this file since 12305 was 12305, checked in by jordan, 11 years ago

(trunk libT) more heap pruning: avoid an unnecessary malloc() + free() when encrypting outbound messages

  • Property svn:keywords set to Date Rev Author Id
File size: 32.9 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 12305 2011-04-04 16:53:15Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h>
16
17#include <event2/event.h>
18#include <event2/buffer.h>
19#include <event2/bufferevent.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "session.h"
25#include "bandwidth.h"
26#include "crypto.h"
27#include "list.h"
28#include "net.h"
29#include "peer-common.h" /* MAX_BLOCK_SIZE */
30#include "peer-io.h"
31#include "trevent.h" /* tr_runInEventThread() */
32#include "utils.h"
33
34
35#ifdef WIN32
36 #define EAGAIN       WSAEWOULDBLOCK
37 #define EINTR        WSAEINTR
38 #define EINPROGRESS  WSAEINPROGRESS
39 #define EPIPE        WSAECONNRESET
40#endif
41
42/* The amount of read bufferring that we allow for uTP sockets. */
43
44#define UTP_READ_BUFFER_SIZE (256 * 1024)
45
46static size_t
47guessPacketOverhead( size_t d )
48{
49    /**
50     * http://sd.wareonearth.com/~phil/net/overhead/
51     *
52     * TCP over Ethernet:
53     * Assuming no header compression (e.g. not PPP)
54     * Add 20 IPv4 header or 40 IPv6 header (no options)
55     * Add 20 TCP header
56     * Add 12 bytes optional TCP timestamps
57     * Max TCP Payload data rates over ethernet are thus:
58     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
59     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
60     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
61     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
62     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
63     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
64     */
65    const double assumed_payload_data_rate = 94.0;
66
67    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
68}
69
70/**
71***
72**/
73
74#define dbgmsg( io, ... ) \
75    do { \
76        if( tr_deepLoggingIsActive( ) ) \
77            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
78    } while( 0 )
79
80struct tr_datatype
81{
82    bool    isPieceData;
83    size_t  length;
84};
85
86
87/***
88****
89***/
90
91static void
92didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
93{
94     while( bytes_transferred && tr_isPeerIo( io ) )
95     {
96        struct tr_datatype * next = io->outbuf_datatypes->data;
97
98        const unsigned int payload = MIN( next->length, bytes_transferred );
99        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
100        const unsigned int overhead =
101            io->socket ? guessPacketOverhead( payload ) : 0;
102        const uint64_t now = tr_time_msec( );
103
104        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
105
106        if( overhead > 0 )
107            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
108
109        if( io->didWrite )
110            io->didWrite( io, payload, next->isPieceData, io->userData );
111
112        if( tr_isPeerIo( io ) )
113        {
114            bytes_transferred -= payload;
115            next->length -= payload;
116            if( !next->length ) {
117                tr_list_pop_front( &io->outbuf_datatypes );
118                tr_free( next );
119            }
120        }
121    }
122}
123
124static void
125canReadWrapper( tr_peerIo * io )
126{
127    bool err = 0;
128    bool done = 0;
129    tr_session * session;
130
131    dbgmsg( io, "canRead" );
132
133    tr_peerIoRef( io );
134
135    session = io->session;
136
137    /* try to consume the input buffer */
138    if( io->canRead )
139    {
140        const uint64_t now = tr_time_msec( );
141
142        tr_sessionLock( session );
143
144        while( !done && !err )
145        {
146            size_t piece = 0;
147            const size_t oldLen = evbuffer_get_length( io->inbuf );
148            const int ret = io->canRead( io, io->userData, &piece );
149            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
150            const unsigned int overhead = guessPacketOverhead( used );
151
152            if( piece || (piece!=used) )
153            {
154                if( piece )
155                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now );
156
157                if( used != piece )
158                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now );
159            }
160
161            if( overhead > 0 )
162                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
163
164            switch( ret )
165            {
166                case READ_NOW:
167                    if( evbuffer_get_length( io->inbuf ) )
168                        continue;
169                    done = 1;
170                    break;
171
172                case READ_LATER:
173                    done = 1;
174                    break;
175
176                case READ_ERR:
177                    err = 1;
178                    break;
179            }
180
181            assert( tr_isPeerIo( io ) );
182        }
183
184        tr_sessionUnlock( session );
185    }
186
187    tr_peerIoUnref( io );
188}
189
190static void
191event_read_cb( int fd, short event UNUSED, void * vio )
192{
193    int res;
194    int e;
195    tr_peerIo * io = vio;
196
197    /* Limit the input buffer to 256K, so it doesn't grow too large */
198    unsigned int howmuch;
199    unsigned int curlen;
200    const tr_direction dir = TR_DOWN;
201    const unsigned int max = 256 * 1024;
202
203    assert( tr_isPeerIo( io ) );
204    assert( io->socket >= 0 );
205
206    io->pendingEvents &= ~EV_READ;
207
208    curlen = evbuffer_get_length( io->inbuf );
209    howmuch = curlen >= max ? 0 : max - curlen;
210    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
211
212    dbgmsg( io, "libevent says this peer is ready to read" );
213
214    /* if we don't have any bandwidth left, stop reading */
215    if( howmuch < 1 ) {
216        tr_peerIoSetEnabled( io, dir, false );
217        return;
218    }
219
220    EVUTIL_SET_SOCKET_ERROR( 0 );
221    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
222    e = EVUTIL_SOCKET_ERROR( );
223
224    if( res > 0 )
225    {
226        tr_peerIoSetEnabled( io, dir, true );
227
228        /* Invoke the user callback - must always be called last */
229        canReadWrapper( io );
230    }
231    else
232    {
233        char errstr[512];
234        short what = BEV_EVENT_READING;
235
236        if( res == 0 ) /* EOF */
237            what |= BEV_EVENT_EOF;
238        else if( res == -1 ) {
239            if( e == EAGAIN || e == EINTR ) {
240                tr_peerIoSetEnabled( io, dir, true );
241                return;
242            }
243            what |= BEV_EVENT_ERROR;
244        }
245
246        tr_net_strerror( errstr, sizeof( errstr ), e );
247        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
248
249        if( io->gotError != NULL )
250            io->gotError( io, what, io->userData );
251    }
252}
253
254static int
255tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
256{
257    int e;
258    int n;
259    char errstr[256];
260
261    EVUTIL_SET_SOCKET_ERROR( 0 );
262    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
263    e = EVUTIL_SOCKET_ERROR( );
264    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
265
266    return n;
267}
268
269static void
270event_write_cb( int fd, short event UNUSED, void * vio )
271{
272    int res = 0;
273    int e;
274    short what = BEV_EVENT_WRITING;
275    tr_peerIo * io = vio;
276    size_t howmuch;
277    const tr_direction dir = TR_UP;
278
279    assert( tr_isPeerIo( io ) );
280    assert( io->socket >= 0 );
281
282    io->pendingEvents &= ~EV_WRITE;
283
284    dbgmsg( io, "libevent says this peer is ready to write" );
285
286    /* Write as much as possible, since the socket is non-blocking, write() will
287     * return if it can't write any more data without blocking */
288    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
289
290    /* if we don't have any bandwidth left, stop writing */
291    if( howmuch < 1 ) {
292        tr_peerIoSetEnabled( io, dir, false );
293        return;
294    }
295
296    EVUTIL_SET_SOCKET_ERROR( 0 );
297    res = tr_evbuffer_write( io, fd, howmuch );
298    e = EVUTIL_SOCKET_ERROR( );
299
300    if (res == -1) {
301        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
302            goto reschedule;
303        /* error case */
304        what |= BEV_EVENT_ERROR;
305    } else if (res == 0) {
306        /* eof case */
307        what |= BEV_EVENT_EOF;
308    }
309    if (res <= 0)
310        goto error;
311
312    if( evbuffer_get_length( io->outbuf ) )
313        tr_peerIoSetEnabled( io, dir, true );
314
315    didWriteWrapper( io, res );
316    return;
317
318 reschedule:
319    if( evbuffer_get_length( io->outbuf ) )
320        tr_peerIoSetEnabled( io, dir, true );
321    return;
322
323 error:
324
325    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
326
327    if( io->gotError != NULL )
328        io->gotError( io, what, io->userData );
329}
330
331/**
332***
333**/
334
335static void
336maybeSetCongestionAlgorithm( int socket, const char * algorithm )
337{
338    if( algorithm && *algorithm )
339    {
340        const int rc = tr_netSetCongestionControl( socket, algorithm );
341
342        if( rc < 0 )
343            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
344                     algorithm, tr_strerror( errno ));
345    }
346}
347
348#ifdef WITH_UTP
349/* UTP callbacks */
350
351static void
352utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
353{
354    int rc;
355    tr_peerIo *io = closure;
356    assert( tr_isPeerIo( io ) );
357
358    rc = evbuffer_add( io->inbuf, buf, buflen );
359    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
360
361    if( rc < 0 ) {
362        tr_nerr( "UTP", "On read evbuffer_add" );
363        return;
364    }
365
366    tr_peerIoSetEnabled( io, TR_DOWN, true );
367    canReadWrapper( io );
368}
369
370static void
371utp_on_write(void *closure, unsigned char *buf, size_t buflen)
372{
373    int rc;
374    tr_peerIo *io = closure;
375    assert( tr_isPeerIo( io ) );
376
377    rc = evbuffer_remove( io->outbuf, buf, buflen );
378    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
379    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
380    if( rc < (long)buflen ) {
381        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
382    }
383
384    didWriteWrapper( io, buflen );
385}
386
387static size_t
388utp_get_rb_size(void *closure)
389{
390    size_t bytes;
391    tr_peerIo *io = closure;
392    assert( tr_isPeerIo( io ) );
393
394    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
395
396    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
397    return UTP_READ_BUFFER_SIZE - bytes;
398}
399
400static void
401utp_on_state_change(void *closure, int state)
402{
403    tr_peerIo *io = closure;
404    assert( tr_isPeerIo( io ) );
405
406    if( state == UTP_STATE_CONNECT ) {
407        dbgmsg( io, "utp_on_state_change -- changed to connected" );
408        io->utpSupported = true;
409    } else if( state == UTP_STATE_WRITABLE ) {
410        dbgmsg( io, "utp_on_state_change -- changed to writable" );
411    } else if( state == UTP_STATE_EOF ) {
412        if( io->gotError )
413            io->gotError( io, BEV_EVENT_EOF, io->userData );
414    } else if( state == UTP_STATE_DESTROYING ) {
415        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
416        return;
417    } else {
418        tr_nerr( "UTP", "Unknown state %d", state );
419    }
420}
421
422static void
423utp_on_error(void *closure, int errcode)
424{
425    tr_peerIo *io = closure;
426    assert( tr_isPeerIo( io ) );
427
428    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
429
430    if( io->gotError ) {
431        errno = errcode;
432        io->gotError( io, BEV_EVENT_ERROR, io->userData );
433    }
434}
435
436static void
437utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
438{
439    tr_peerIo *io = closure;
440    assert( tr_isPeerIo( io ) );
441
442    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
443
444    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
445                      count, false, tr_time_msec() );
446}
447
448static struct UTPFunctionTable utp_function_table = {
449    .on_read = utp_on_read,
450    .on_write = utp_on_write,
451    .get_rb_size = utp_get_rb_size,
452    .on_state = utp_on_state_change,
453    .on_error = utp_on_error,
454    .on_overhead = utp_on_overhead
455};
456
457
458/* Dummy UTP callbacks. */
459/* We switch a UTP socket to use these after the associated peerIo has been
460   destroyed -- see io_dtor. */
461
462static void
463dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
464{
465    /* This cannot happen, as far as I'm aware. */
466    tr_nerr( "UTP", "On_read called on closed socket" );
467
468}
469
470static void
471dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
472{
473    /* This can very well happen if we've shut down a peer connection that
474       had unflushed buffers.  Complain and send zeroes. */
475    tr_ndbg( "UTP", "On_write called on closed socket" );
476    memset( buf, 0, buflen );
477}
478
479static size_t
480dummy_get_rb_size( void * closure UNUSED )
481{
482    return 0;
483}
484
485static void
486dummy_on_state_change(void * closure UNUSED, int state UNUSED )
487{
488    return;
489}
490
491static void
492dummy_on_error( void * closure UNUSED, int errcode UNUSED )
493{
494    return;
495}
496
497static void
498dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
499{
500    return;
501}
502
503static struct UTPFunctionTable dummy_utp_function_table = {
504    .on_read = dummy_read,
505    .on_write = dummy_write,
506    .get_rb_size = dummy_get_rb_size,
507    .on_state = dummy_on_state_change,
508    .on_error = dummy_on_error,
509    .on_overhead = dummy_on_overhead
510};
511
512#endif /* #ifdef WITH_UTP */
513
514static tr_peerIo*
515tr_peerIoNew( tr_session       * session,
516              tr_bandwidth     * parent,
517              const tr_address * addr,
518              tr_port            port,
519              const uint8_t    * torrentHash,
520              bool               isIncoming,
521              bool               isSeed,
522              int                socket,
523              struct UTPSocket * utp_socket)
524{
525    tr_peerIo * io;
526
527    assert( session != NULL );
528    assert( session->events != NULL );
529    assert( tr_isBool( isIncoming ) );
530    assert( tr_isBool( isSeed ) );
531    assert( tr_amInEventThread( session ) );
532    assert( (socket < 0) == (utp_socket != NULL) );
533#ifndef WITH_UTP
534    assert( socket >= 0 );
535#endif
536
537    if( socket >= 0 ) {
538        tr_netSetTOS( socket, session->peerSocketTOS );
539        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
540    }
541
542    io = tr_new0( tr_peerIo, 1 );
543    io->magicNumber = PEER_IO_MAGIC_NUMBER;
544    io->refCount = 1;
545    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
546    io->session = session;
547    io->addr = *addr;
548    io->isSeed = isSeed;
549    io->port = port;
550    io->socket = socket;
551    io->utp_socket = utp_socket;
552    io->isIncoming = isIncoming != 0;
553    io->timeCreated = tr_time( );
554    io->inbuf = evbuffer_new( );
555    io->outbuf = evbuffer_new( );
556    tr_bandwidthConstruct( &io->bandwidth, session, parent );
557    tr_bandwidthSetPeer( &io->bandwidth, io );
558    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
559    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
560
561    if( io->socket >= 0 ) {
562        io->event_read = event_new( session->event_base,
563                                    io->socket, EV_READ, event_read_cb, io );
564        io->event_write = event_new( session->event_base,
565                                     io->socket, EV_WRITE, event_write_cb, io );
566    }
567#ifdef WITH_UTP
568    else {
569        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
570        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
571        UTP_SetCallbacks( utp_socket,
572                          &utp_function_table,
573                          io );
574        if( !isIncoming ) {
575            dbgmsg( io, "%s", "calling UTP_Connect" );
576            UTP_Connect( utp_socket );
577        }
578    }
579#endif
580
581    return io;
582}
583
584tr_peerIo*
585tr_peerIoNewIncoming( tr_session        * session,
586                      tr_bandwidth      * parent,
587                      const tr_address  * addr,
588                      tr_port             port,
589                      int                 fd,
590                      struct UTPSocket  * utp_socket )
591{
592    assert( session );
593    assert( tr_address_is_valid( addr ) );
594
595    return tr_peerIoNew( session, parent, addr, port, NULL, true, false,
596                         fd, utp_socket );
597}
598
599tr_peerIo*
600tr_peerIoNewOutgoing( tr_session        * session,
601                      tr_bandwidth      * parent,
602                      const tr_address  * addr,
603                      tr_port             port,
604                      const uint8_t     * torrentHash,
605                      bool                isSeed,
606                      bool                utp )
607{
608    int fd = -1;
609    struct UTPSocket * utp_socket = NULL;
610
611    assert( session );
612    assert( tr_address_is_valid( addr ) );
613    assert( torrentHash );
614
615    if( utp )
616        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
617
618    if( !utp_socket ) {
619        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
620        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
621    }
622
623    if( fd < 0 && utp_socket == NULL )
624        return NULL;
625
626    return tr_peerIoNew( session, parent, addr, port,
627                         torrentHash, false, isSeed, fd, utp_socket );
628}
629
630/***
631****
632***/
633
634static void
635event_enable( tr_peerIo * io, short event )
636{
637    assert( tr_amInEventThread( io->session ) );
638    assert( io->session != NULL );
639    assert( io->session->events != NULL );
640
641    if( io->socket < 0 )
642        return;
643
644    assert( io->session->events != NULL );
645    assert( event_initialized( io->event_read ) );
646    assert( event_initialized( io->event_write ) );
647
648    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
649    {
650        dbgmsg( io, "enabling libevent ready-to-read polling" );
651        event_add( io->event_read, NULL );
652        io->pendingEvents |= EV_READ;
653    }
654
655    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
656    {
657        dbgmsg( io, "enabling libevent ready-to-write polling" );
658        event_add( io->event_write, NULL );
659        io->pendingEvents |= EV_WRITE;
660    }
661}
662
663static void
664event_disable( struct tr_peerIo * io, short event )
665{
666    assert( tr_amInEventThread( io->session ) );
667    assert( io->session != NULL );
668
669    if( io->socket < 0 )
670        return;
671
672    assert( io->session->events != NULL );
673    assert( event_initialized( io->event_read ) );
674    assert( event_initialized( io->event_write ) );
675
676    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
677    {
678        dbgmsg( io, "disabling libevent ready-to-read polling" );
679        event_del( io->event_read );
680        io->pendingEvents &= ~EV_READ;
681    }
682
683    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
684    {
685        dbgmsg( io, "disabling libevent ready-to-write polling" );
686        event_del( io->event_write );
687        io->pendingEvents &= ~EV_WRITE;
688    }
689}
690
691void
692tr_peerIoSetEnabled( tr_peerIo    * io,
693                     tr_direction   dir,
694                     bool           isEnabled )
695{
696    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
697
698    assert( tr_isPeerIo( io ) );
699    assert( tr_isDirection( dir ) );
700    assert( tr_amInEventThread( io->session ) );
701    assert( io->session->events != NULL );
702
703    if( isEnabled )
704        event_enable( io, event );
705    else
706        event_disable( io, event );
707}
708
709/***
710****
711***/
712static void
713io_close_socket( tr_peerIo * io )
714{
715    if( io->socket >= 0 ) {
716        tr_netClose( io->session, io->socket );
717        io->socket = -1;
718    }
719
720    if( io->event_read != NULL) {
721        event_free( io->event_read );
722        io->event_read = NULL;
723    }
724
725    if( io->event_write != NULL) {
726        event_free( io->event_write );
727        io->event_write = NULL;
728    }
729
730#ifdef WITH_UTP
731    if( io->utp_socket ) {
732        UTP_SetCallbacks( io->utp_socket,
733                          &dummy_utp_function_table,
734                          NULL );
735        UTP_Close( io->utp_socket );
736
737        io->utp_socket = NULL;
738    }
739#endif
740}
741
742static void
743io_dtor( void * vio )
744{
745    tr_peerIo * io = vio;
746
747    assert( tr_isPeerIo( io ) );
748    assert( tr_amInEventThread( io->session ) );
749    assert( io->session->events != NULL );
750
751    dbgmsg( io, "in tr_peerIo destructor" );
752    event_disable( io, EV_READ | EV_WRITE );
753    tr_bandwidthDestruct( &io->bandwidth );
754    evbuffer_free( io->outbuf );
755    evbuffer_free( io->inbuf );
756    io_close_socket( io );
757    tr_cryptoFree( io->crypto );
758    tr_list_free( &io->outbuf_datatypes, tr_free );
759
760    memset( io, ~0, sizeof( tr_peerIo ) );
761    tr_free( io );
762}
763
764static void
765tr_peerIoFree( tr_peerIo * io )
766{
767    if( io )
768    {
769        dbgmsg( io, "in tr_peerIoFree" );
770        io->canRead = NULL;
771        io->didWrite = NULL;
772        io->gotError = NULL;
773        tr_runInEventThread( io->session, io_dtor, io );
774    }
775}
776
777void
778tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
779{
780    assert( tr_isPeerIo( io ) );
781
782    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
783                file, line, io->refCount, io->refCount+1 );
784
785    ++io->refCount;
786}
787
788void
789tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
790{
791    assert( tr_isPeerIo( io ) );
792
793    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
794                file, line, io->refCount, io->refCount-1 );
795
796    if( !--io->refCount )
797        tr_peerIoFree( io );
798}
799
800const tr_address*
801tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
802{
803    assert( tr_isPeerIo( io ) );
804
805    if( port )
806        *port = io->port;
807
808    return &io->addr;
809}
810
811const char*
812tr_peerIoAddrStr( const tr_address * addr, tr_port port )
813{
814    static char buf[512];
815    tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_address_to_string( addr ), ntohs( port ) );
816    return buf;
817}
818
819const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
820{
821    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
822}
823
824void
825tr_peerIoSetIOFuncs( tr_peerIo        * io,
826                     tr_can_read_cb     readcb,
827                     tr_did_write_cb    writecb,
828                     tr_net_error_cb    errcb,
829                     void             * userData )
830{
831    io->canRead = readcb;
832    io->didWrite = writecb;
833    io->gotError = errcb;
834    io->userData = userData;
835}
836
837void
838tr_peerIoClear( tr_peerIo * io )
839{
840    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
841    tr_peerIoSetEnabled( io, TR_UP, false );
842    tr_peerIoSetEnabled( io, TR_DOWN, false );
843}
844
845int
846tr_peerIoReconnect( tr_peerIo * io )
847{
848    short int pendingEvents;
849    tr_session * session;
850
851    assert( tr_isPeerIo( io ) );
852    assert( !tr_peerIoIsIncoming( io ) );
853
854    session = tr_peerIoGetSession( io );
855
856    pendingEvents = io->pendingEvents;
857    event_disable( io, EV_READ | EV_WRITE );
858
859    io_close_socket( io );
860
861    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
862    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
863    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
864
865    if( io->socket >= 0 )
866    {
867        event_enable( io, pendingEvents );
868        tr_netSetTOS( io->socket, session->peerSocketTOS );
869        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
870        return 0;
871    }
872
873    return -1;
874}
875
876/**
877***
878**/
879
880void
881tr_peerIoSetTorrentHash( tr_peerIo *     io,
882                         const uint8_t * hash )
883{
884    assert( tr_isPeerIo( io ) );
885
886    tr_cryptoSetTorrentHash( io->crypto, hash );
887}
888
889const uint8_t*
890tr_peerIoGetTorrentHash( tr_peerIo * io )
891{
892    assert( tr_isPeerIo( io ) );
893    assert( io->crypto );
894
895    return tr_cryptoGetTorrentHash( io->crypto );
896}
897
898int
899tr_peerIoHasTorrentHash( const tr_peerIo * io )
900{
901    assert( tr_isPeerIo( io ) );
902    assert( io->crypto );
903
904    return tr_cryptoHasTorrentHash( io->crypto );
905}
906
907/**
908***
909**/
910
911void
912tr_peerIoSetPeersId( tr_peerIo *     io,
913                     const uint8_t * peer_id )
914{
915    assert( tr_isPeerIo( io ) );
916
917    if( ( io->peerIdIsSet = peer_id != NULL ) )
918        memcpy( io->peerId, peer_id, 20 );
919    else
920        memset( io->peerId, 0, 20 );
921}
922
923/**
924***
925**/
926
927static unsigned int
928getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
929{
930    /* this is all kind of arbitrary, but what seems to work well is
931     * being large enough to hold the next 20 seconds' worth of input,
932     * or a few blocks, whichever is bigger.
933     * It's okay to tweak this as needed */
934    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
935    const unsigned int period = 15u; /* arbitrary */
936    /* the 3 is arbitrary; the .5 is to leave room for messages */
937    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
938    return MAX( ceiling, currentSpeed_Bps*period );
939}
940
941size_t
942tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
943{
944    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
945    const size_t currentLen = evbuffer_get_length( io->outbuf );
946    size_t freeSpace = 0;
947
948    if( desiredLen > currentLen )
949        freeSpace = desiredLen - currentLen;
950
951    return freeSpace;
952}
953
954/**
955***
956**/
957
958void
959tr_peerIoSetEncryption( tr_peerIo * io, tr_encryption_type encryption_type )
960{
961    assert( tr_isPeerIo( io ) );
962    assert( encryption_type == PEER_ENCRYPTION_NONE
963         || encryption_type == PEER_ENCRYPTION_RC4 );
964
965    io->encryption_type = encryption_type;
966}
967
968/**
969***
970**/
971
972static void
973addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData )
974{
975    struct tr_datatype * d;
976
977    d = tr_new( struct tr_datatype, 1 );
978    d->isPieceData = isPieceData != 0;
979    d->length = byteCount;
980    tr_list_append( &io->outbuf_datatypes, d );
981}
982
983static void
984maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
985{
986    if( io->encryption_type == PEER_ENCRYPTION_RC4 )
987    {
988        struct evbuffer_ptr pos;
989        struct evbuffer_iovec iovec;
990        evbuffer_ptr_set( buf, &pos, 0, EVBUFFER_PTR_SET );
991        do {
992            evbuffer_peek( buf, -1, &pos, &iovec, 1 );
993            tr_cryptoEncrypt( io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
994        } while( !evbuffer_ptr_set( buf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
995    }
996}
997
998void
999tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData )
1000{
1001    const size_t byteCount = evbuffer_get_length( buf );
1002    maybeEncryptBuffer( io, buf );
1003    evbuffer_add_buffer( io->outbuf, buf );
1004    addDatatype( io, byteCount, isPieceData );
1005}
1006
1007void
1008tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData )
1009{
1010    struct evbuffer * buf = evbuffer_new( );
1011    evbuffer_add( buf, bytes, byteCount );
1012    tr_peerIoWriteBuf( io, buf, isPieceData );
1013    evbuffer_free( buf );
1014}
1015
1016/***
1017****
1018***/
1019
1020void
1021evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1022{
1023    evbuffer_add( outbuf, &byte, 1 );
1024}
1025
1026void
1027evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1028{
1029    const uint16_t ns = htons( addme_hs );
1030    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1031}
1032
1033void
1034evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1035{
1036    const uint32_t nl = htonl( addme_hl );
1037    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1038}
1039
1040void
1041evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll )
1042{
1043    const uint64_t nll = tr_htonll( addme_hll );
1044    evbuffer_add( outbuf, &nll, sizeof( nll ) );
1045}
1046
1047/***
1048****
1049***/
1050
1051void
1052tr_peerIoReadBytesToBuf( tr_peerIo * io, struct evbuffer * inbuf, struct evbuffer * outbuf, size_t byteCount )
1053{
1054    const size_t old_length = evbuffer_get_length( outbuf );
1055
1056    assert( tr_isPeerIo( io ) );
1057    assert( evbuffer_get_length( inbuf )  >= byteCount );
1058
1059    /* append it to outbuf */
1060    evbuffer_remove_buffer( inbuf, outbuf, byteCount );
1061
1062    /* decrypt if needed */
1063    if( io->encryption_type == PEER_ENCRYPTION_RC4 ) {
1064        struct evbuffer_ptr pos;
1065        struct evbuffer_iovec iovec;
1066        evbuffer_ptr_set( outbuf, &pos, old_length, EVBUFFER_PTR_SET );
1067        do {
1068           evbuffer_peek( outbuf, byteCount, &pos, &iovec, 1 );
1069            tr_cryptoDecrypt( io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1070        } while( !evbuffer_ptr_set( outbuf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1071    }
1072}
1073
1074void
1075tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1076{
1077    assert( tr_isPeerIo( io ) );
1078    assert( evbuffer_get_length( inbuf )  >= byteCount );
1079
1080    switch( io->encryption_type )
1081    {
1082        case PEER_ENCRYPTION_NONE:
1083            evbuffer_remove( inbuf, bytes, byteCount );
1084            break;
1085
1086        case PEER_ENCRYPTION_RC4:
1087            evbuffer_remove( inbuf, bytes, byteCount );
1088            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1089            break;
1090
1091        default:
1092            assert( 0 );
1093    }
1094}
1095
1096void
1097tr_peerIoReadUint16( tr_peerIo        * io,
1098                     struct evbuffer  * inbuf,
1099                     uint16_t         * setme )
1100{
1101    uint16_t tmp;
1102    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1103    *setme = ntohs( tmp );
1104}
1105
1106void tr_peerIoReadUint32( tr_peerIo        * io,
1107                          struct evbuffer  * inbuf,
1108                          uint32_t         * setme )
1109{
1110    uint32_t tmp;
1111    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1112    *setme = ntohl( tmp );
1113}
1114
1115void
1116tr_peerIoDrain( tr_peerIo       * io,
1117                struct evbuffer * inbuf,
1118                size_t            byteCount )
1119{
1120    char buf[4096];
1121    const size_t buflen = sizeof( buf );
1122
1123    while( byteCount > 0 )
1124    {
1125        const size_t thisPass = MIN( byteCount, buflen );
1126        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1127        byteCount -= thisPass;
1128    }
1129}
1130
1131/***
1132****
1133***/
1134
1135static int
1136tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1137{
1138    int res = 0;
1139
1140    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1141    {
1142        if( io->utp_socket != NULL ) /* utp peer connection */
1143        {
1144            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1145             * It opens up the congestion window by sending an ACK (soonish)
1146             * if one was not going to be sent. */
1147            if( evbuffer_get_length( io->inbuf ) == 0 )
1148                UTP_RBDrained( io->utp_socket );
1149        }
1150        else /* tcp peer connection */
1151        {
1152            int e;
1153
1154            EVUTIL_SET_SOCKET_ERROR( 0 );
1155            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1156            e = EVUTIL_SOCKET_ERROR( );
1157
1158            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1159
1160            if( evbuffer_get_length( io->inbuf ) )
1161                canReadWrapper( io );
1162
1163            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1164            {
1165                char errstr[512];
1166                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1167                if( res == 0 )
1168                    what |= BEV_EVENT_EOF;
1169                tr_net_strerror( errstr, sizeof( errstr ), e );
1170                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1171                io->gotError( io, what, io->userData );
1172            }
1173        }
1174    }
1175
1176    return res;
1177}
1178
1179static int
1180tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1181{
1182    int n = 0;
1183    const size_t old_len = evbuffer_get_length( io->outbuf );
1184    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1185
1186    if( howmuch > old_len )
1187        howmuch = old_len;
1188
1189    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1190    {
1191        if( io->utp_socket != NULL ) /* utp peer connection */
1192        {
1193            const size_t old_len = evbuffer_get_length( io->outbuf );
1194            UTP_Write( io->utp_socket, howmuch );
1195            n = old_len - evbuffer_get_length( io->outbuf );
1196        }
1197        else
1198        {
1199            int e;
1200
1201            EVUTIL_SET_SOCKET_ERROR( 0 );
1202            n = tr_evbuffer_write( io, io->socket, howmuch );
1203            e = EVUTIL_SOCKET_ERROR( );
1204
1205            if( n > 0 )
1206                didWriteWrapper( io, n );
1207
1208            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1209            {
1210                char errstr[512];
1211                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1212
1213                tr_net_strerror( errstr, sizeof( errstr ), e );
1214                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1215
1216                if( io->gotError != NULL )
1217                    io->gotError( io, what, io->userData );
1218            }
1219        }
1220    }
1221
1222    return n;
1223}
1224
1225int
1226tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1227{
1228    int bytesUsed = 0;
1229
1230    assert( tr_isPeerIo( io ) );
1231    assert( tr_isDirection( dir ) );
1232
1233    if( dir == TR_DOWN )
1234        bytesUsed = tr_peerIoTryRead( io, limit );
1235    else
1236        bytesUsed = tr_peerIoTryWrite( io, limit );
1237
1238    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1239    return bytesUsed;
1240}
1241
1242int
1243tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1244{
1245    size_t byteCount = 0;
1246    tr_list * it;
1247
1248    /* count up how many bytes are used by non-piece-data messages
1249       at the front of our outbound queue */
1250    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1251    {
1252        struct tr_datatype * d = it->data;
1253
1254        if( d->isPieceData )
1255            break;
1256
1257        byteCount += d->length;
1258    }
1259
1260    return tr_peerIoFlush( io, TR_UP, byteCount );
1261}
Note: See TracBrowser for help on using the repository browser.