source: trunk/libtransmission/peer-io.c @ 11945

Last change on this file since 11945 was 11945, checked in by jordan, 11 years ago

if we successfully finish a handshake using uTP, mark the peer as supporting uTP

  • Property svn:keywords set to Date Rev Author Id
File size: 33.0 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11945 2011-02-18 00:41:06Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28#include <libutp/utp.h>
29
30#include "transmission.h"
31#include "session.h"
32#include "bandwidth.h"
33#include "crypto.h"
34#include "list.h"
35#include "net.h"
36#include "peer-common.h" /* MAX_BLOCK_SIZE */
37#include "peer-io.h"
38#include "trevent.h" /* tr_runInEventThread() */
39#include "utils.h"
40
41#define MAGIC_NUMBER 206745
42
43#ifdef WIN32
44 #define EAGAIN       WSAEWOULDBLOCK
45 #define EINTR        WSAEINTR
46 #define EINPROGRESS  WSAEINPROGRESS
47 #define EPIPE        WSAECONNRESET
48#endif
49
50/* The amount of read bufferring that we allow for uTP sockets. */
51
52#define UTP_READ_BUFFER_SIZE (256 * 1024)
53
54static size_t
55guessPacketOverhead( size_t d )
56{
57    /**
58     * http://sd.wareonearth.com/~phil/net/overhead/
59     *
60     * TCP over Ethernet:
61     * Assuming no header compression (e.g. not PPP)
62     * Add 20 IPv4 header or 40 IPv6 header (no options)
63     * Add 20 TCP header
64     * Add 12 bytes optional TCP timestamps
65     * Max TCP Payload data rates over ethernet are thus:
66     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
67     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
68     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
69     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
70     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
71     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
72     */
73    const double assumed_payload_data_rate = 94.0;
74
75    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
76}
77
78/**
79***
80**/
81
82#define dbgmsg( io, ... ) \
83    do { \
84        if( tr_deepLoggingIsActive( ) ) \
85            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
86    } while( 0 )
87
88struct tr_datatype
89{
90    tr_bool  isPieceData;
91    size_t   length;
92};
93
94
95/***
96****
97***/
98
99static void
100didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
101{
102     while( bytes_transferred && tr_isPeerIo( io ) )
103     {
104        struct tr_datatype * next = io->outbuf_datatypes->data;
105
106        const unsigned int payload = MIN( next->length, bytes_transferred );
107        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
108        const unsigned int overhead =
109            io->socket ? guessPacketOverhead( payload ) : 0;
110        const uint64_t now = tr_sessionGetTimeMsec( io->session );
111
112        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
113
114        if( overhead > 0 )
115            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
116
117        if( io->didWrite )
118            io->didWrite( io, payload, next->isPieceData, io->userData );
119
120        if( tr_isPeerIo( io ) )
121        {
122            bytes_transferred -= payload;
123            next->length -= payload;
124            if( !next->length ) {
125                tr_list_pop_front( &io->outbuf_datatypes );
126                tr_free( next );
127            }
128        }
129    }
130}
131
132static void
133canReadWrapper( tr_peerIo * io )
134{
135    tr_bool err = 0;
136    tr_bool done = 0;
137    tr_session * session;
138
139    dbgmsg( io, "canRead" );
140
141    assert( tr_isPeerIo( io ) );
142    assert( tr_isSession( io->session ) );
143    tr_peerIoRef( io );
144
145    session = io->session;
146
147    /* try to consume the input buffer */
148    if( io->canRead )
149    {
150        const uint64_t now = tr_sessionGetTimeMsec( io->session );
151
152        tr_sessionLock( session );
153
154        while( !done && !err )
155        {
156            size_t piece = 0;
157            const size_t oldLen = evbuffer_get_length( io->inbuf );
158            const int ret = io->canRead( io, io->userData, &piece );
159            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
160            const unsigned int overhead = guessPacketOverhead( used );
161
162            assert( tr_isPeerIo( io ) );
163
164            if( piece || (piece!=used) )
165            {
166                if( piece )
167                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
168
169                if( used != piece )
170                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
171            }
172
173            if( overhead > 0 )
174                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
175
176            switch( ret )
177            {
178                case READ_NOW:
179                    if( evbuffer_get_length( io->inbuf ) )
180                        continue;
181                    done = 1;
182                    break;
183
184                case READ_LATER:
185                    done = 1;
186                    break;
187
188                case READ_ERR:
189                    err = 1;
190                    break;
191            }
192
193            assert( tr_isPeerIo( io ) );
194        }
195
196        tr_sessionUnlock( session );
197    }
198
199    assert( tr_isPeerIo( io ) );
200    tr_peerIoUnref( io );
201}
202
203tr_bool
204tr_isPeerIo( const tr_peerIo * io )
205{
206    return ( io != NULL )
207        && ( io->magicNumber == MAGIC_NUMBER )
208        && ( io->refCount >= 0 )
209        && ( tr_isBandwidth( &io->bandwidth ) )
210        && ( tr_isAddress( &io->addr ) );
211}
212
213static void
214event_read_cb( int fd, short event UNUSED, void * vio )
215{
216    int res;
217    int e;
218    tr_peerIo * io = vio;
219
220    /* Limit the input buffer to 256K, so it doesn't grow too large */
221    unsigned int howmuch;
222    unsigned int curlen;
223    const tr_direction dir = TR_DOWN;
224    const unsigned int max = 256 * 1024;
225
226    assert( tr_isPeerIo( io ) );
227    assert( io->socket >= 0 );
228
229    io->pendingEvents &= ~EV_READ;
230
231    curlen = evbuffer_get_length( io->inbuf );
232    howmuch = curlen >= max ? 0 : max - curlen;
233    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
234
235    dbgmsg( io, "libevent says this peer is ready to read" );
236
237    /* if we don't have any bandwidth left, stop reading */
238    if( howmuch < 1 ) {
239        tr_peerIoSetEnabled( io, dir, FALSE );
240        return;
241    }
242
243    EVUTIL_SET_SOCKET_ERROR( 0 );
244    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
245    e = EVUTIL_SOCKET_ERROR( );
246
247    if( res > 0 )
248    {
249        tr_peerIoSetEnabled( io, dir, TRUE );
250
251        /* Invoke the user callback - must always be called last */
252        canReadWrapper( io );
253    }
254    else
255    {
256        char errstr[512];
257        short what = BEV_EVENT_READING;
258
259        if( res == 0 ) /* EOF */
260            what |= BEV_EVENT_EOF;
261        else if( res == -1 ) {
262            if( e == EAGAIN || e == EINTR ) {
263                tr_peerIoSetEnabled( io, dir, TRUE );
264                return;
265            }
266            what |= BEV_EVENT_ERROR;
267        }
268
269        tr_net_strerror( errstr, sizeof( errstr ), e );
270        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
271
272        if( io->gotError != NULL )
273            io->gotError( io, what, io->userData );
274    }
275}
276
277static int
278tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
279{
280    int e;
281    int n;
282    char errstr[256];
283
284    EVUTIL_SET_SOCKET_ERROR( 0 );
285    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
286    e = EVUTIL_SOCKET_ERROR( );
287    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
288
289    return n;
290}
291
292static void
293event_write_cb( int fd, short event UNUSED, void * vio )
294{
295    int res = 0;
296    int e;
297    short what = BEV_EVENT_WRITING;
298    tr_peerIo * io = vio;
299    size_t howmuch;
300    const tr_direction dir = TR_UP;
301
302    assert( tr_isPeerIo( io ) );
303    assert( io->socket >= 0 );
304
305    io->pendingEvents &= ~EV_WRITE;
306
307    dbgmsg( io, "libevent says this peer is ready to write" );
308
309    /* Write as much as possible, since the socket is non-blocking, write() will
310     * return if it can't write any more data without blocking */
311    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
312
313    /* if we don't have any bandwidth left, stop writing */
314    if( howmuch < 1 ) {
315        tr_peerIoSetEnabled( io, dir, FALSE );
316        return;
317    }
318
319    EVUTIL_SET_SOCKET_ERROR( 0 );
320    res = tr_evbuffer_write( io, fd, howmuch );
321    e = EVUTIL_SOCKET_ERROR( );
322
323    if (res == -1) {
324        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
325            goto reschedule;
326        /* error case */
327        what |= BEV_EVENT_ERROR;
328    } else if (res == 0) {
329        /* eof case */
330        what |= BEV_EVENT_EOF;
331    }
332    if (res <= 0)
333        goto error;
334
335    if( evbuffer_get_length( io->outbuf ) )
336        tr_peerIoSetEnabled( io, dir, TRUE );
337
338    didWriteWrapper( io, res );
339    return;
340
341 reschedule:
342    if( evbuffer_get_length( io->outbuf ) )
343        tr_peerIoSetEnabled( io, dir, TRUE );
344    return;
345
346 error:
347
348    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
349
350    if( io->gotError != NULL )
351        io->gotError( io, what, io->userData );
352}
353
354/**
355***
356**/
357
358static void
359maybeSetCongestionAlgorithm( int socket, const char * algorithm )
360{
361    if( algorithm && *algorithm )
362    {
363        const int rc = tr_netSetCongestionControl( socket, algorithm );
364
365        if( rc < 0 )
366            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
367                     algorithm, tr_strerror( errno ));
368    }
369}
370
371/* UTP callbacks */
372
373static void
374utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
375{
376    int rc;
377    tr_peerIo *io = (tr_peerIo *)closure;
378    assert( tr_isPeerIo( io ) );
379
380    rc = evbuffer_add( io->inbuf, buf, buflen );
381    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
382
383    if( rc < 0 ) {
384        tr_nerr( "UTP", "On read evbuffer_add" );
385        return;
386    }
387
388    tr_peerIoSetEnabled( io, TR_DOWN, TRUE );
389    canReadWrapper( io );
390}
391
392static void
393utp_on_write(void *closure, unsigned char *buf, size_t buflen)
394{
395    tr_peerIo *io = (tr_peerIo *)closure;
396    int rc;
397    assert( tr_isPeerIo( io ) );
398
399    rc = evbuffer_remove( io->outbuf, buf, buflen );
400    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
401    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
402    if( rc < (long)buflen ) {
403        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
404    }
405
406    didWriteWrapper( io, buflen );
407}
408
409static size_t
410utp_get_rb_size(void *closure)
411{
412    tr_peerIo *io = (tr_peerIo *)closure;
413    size_t bytes;
414    assert( tr_isPeerIo( io ) );
415
416    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
417
418    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
419    return UTP_READ_BUFFER_SIZE - bytes;
420}
421
422static void
423utp_on_state_change(void *closure, int state)
424{
425    tr_peerIo *io = (tr_peerIo *)closure;
426    assert( tr_isPeerIo( io ) );
427
428    if( state == UTP_STATE_CONNECT ) {
429        dbgmsg( io, "utp_on_state_change -- changed to connected" );
430        io->dhtSupported = TRUE;
431    } else if( state == UTP_STATE_WRITABLE ) {
432        dbgmsg( io, "utp_on_state_change -- changed to writable" );
433    } else if( state == UTP_STATE_EOF ) {
434        if( io->gotError )
435            io->gotError( io, BEV_EVENT_EOF, io->userData );
436    } else if( state == UTP_STATE_DESTROYING ) {
437        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
438        return;
439    } else {
440        tr_nerr( "UTP", "Unknown state %d", state );
441    }
442}
443
444static void
445utp_on_error(void *closure, int errcode)
446{
447    tr_peerIo *io = (tr_peerIo *)closure;
448    assert( tr_isPeerIo( io ) );
449
450    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
451
452    if( io->gotError ) {
453        errno = errcode;
454        io->gotError( io, BEV_EVENT_ERROR, io->userData );
455    }
456}
457
458static void
459utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
460{
461    tr_peerIo *io = (tr_peerIo *)closure;
462    assert( tr_isPeerIo( io ) );
463
464    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
465
466    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
467                      count, FALSE, tr_time_msec() );
468}
469
470static struct UTPFunctionTable utp_function_table = {
471    .on_read = utp_on_read,
472    .on_write = utp_on_write,
473    .get_rb_size = utp_get_rb_size,
474    .on_state = utp_on_state_change,
475    .on_error = utp_on_error,
476    .on_overhead = utp_on_overhead
477};
478
479/* Dummy UTP callbacks. */
480/* We switch a UTP socket to use these after the associated peerIo has been
481   destroyed -- see io_dtor. */
482
483static void
484dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
485{
486    /* This cannot happen, as far as I'm aware. */
487    tr_nerr( "UTP", "On_read called on closed socket" );
488
489}
490
491static void
492dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
493{
494    /* This can very well happen if we've shut down a peer connection that
495       had unflushed buffers.  Complain and send zeroes. */
496    tr_ndbg( "UTP", "On_write called on closed socket" );
497    memset( buf, 0, buflen );
498}
499
500static size_t
501dummy_get_rb_size( void * closure UNUSED )
502{
503    return 0;
504}
505
506static void
507dummy_on_state_change(void * closure UNUSED, int state UNUSED )
508{
509    return;
510}
511
512static void
513dummy_on_error( void * closure UNUSED, int errcode UNUSED )
514{
515    return;
516}
517
518static void
519dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
520{
521    return;
522}
523
524static struct UTPFunctionTable dummy_utp_function_table = {
525    .on_read = dummy_read,
526    .on_write = dummy_write,
527    .get_rb_size = dummy_get_rb_size,
528    .on_state = dummy_on_state_change,
529    .on_error = dummy_on_error,
530    .on_overhead = dummy_on_overhead
531};
532
533static tr_peerIo*
534tr_peerIoNew( tr_session       * session,
535              tr_bandwidth     * parent,
536              const tr_address * addr,
537              tr_port            port,
538              const uint8_t    * torrentHash,
539              tr_bool            isIncoming,
540              tr_bool            isSeed,
541              int                socket,
542              struct UTPSocket * utp_socket)
543{
544    tr_peerIo * io;
545
546    assert( session != NULL );
547    assert( session->events != NULL );
548    assert( tr_isBool( isIncoming ) );
549    assert( tr_isBool( isSeed ) );
550    assert( tr_amInEventThread( session ) );
551    assert( (socket < 0) == (utp_socket != NULL) );
552
553    if( socket >= 0 ) {
554        tr_netSetTOS( socket, session->peerSocketTOS );
555        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
556    }
557
558    io = tr_new0( tr_peerIo, 1 );
559    io->magicNumber = MAGIC_NUMBER;
560    io->refCount = 1;
561    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
562    io->session = session;
563    io->addr = *addr;
564    io->isSeed = isSeed;
565    io->port = port;
566    io->socket = socket;
567    io->utp_socket = utp_socket;
568    io->isIncoming = isIncoming != 0;
569    io->timeCreated = tr_time( );
570    io->inbuf = evbuffer_new( );
571    io->outbuf = evbuffer_new( );
572    tr_bandwidthConstruct( &io->bandwidth, session, parent );
573    tr_bandwidthSetPeer( &io->bandwidth, io );
574    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
575    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
576
577    if( io->socket >= 0 ) {
578        io->event_read = event_new( session->event_base,
579                                    io->socket, EV_READ, event_read_cb, io );
580        io->event_write = event_new( session->event_base,
581                                     io->socket, EV_WRITE, event_write_cb, io );
582    } else {
583        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
584        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
585        UTP_SetCallbacks( utp_socket,
586                          &utp_function_table,
587                          io );
588        if( !isIncoming ) {
589            dbgmsg( io, "%s", "calling UTP_Connect" );
590            UTP_Connect( utp_socket );
591        }
592    }
593
594    return io;
595}
596
597tr_peerIo*
598tr_peerIoNewIncoming( tr_session        * session,
599                      tr_bandwidth      * parent,
600                      const tr_address  * addr,
601                      tr_port             port,
602                      int                 fd,
603                      struct UTPSocket  * utp_socket )
604{
605    assert( session );
606    assert( tr_isAddress( addr ) );
607
608    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, 
609                         fd, utp_socket );
610}
611
612tr_peerIo*
613tr_peerIoNewOutgoing( tr_session        * session,
614                      tr_bandwidth      * parent,
615                      const tr_address  * addr,
616                      tr_port             port,
617                      const uint8_t     * torrentHash,
618                      tr_bool             isSeed,
619                      tr_bool             utp )
620{
621    int fd = -1;
622    struct UTPSocket *utp_socket = NULL;
623
624    assert( session );
625    assert( tr_isAddress( addr ) );
626    assert( torrentHash );
627
628    if( !utp ) {
629        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
630        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
631    } else {
632        utp_socket =
633            tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
634    }
635
636    if( fd < 0 && utp_socket == NULL )
637        return NULL;
638
639    return tr_peerIoNew( session, parent, addr, port,
640                         torrentHash, FALSE, isSeed, fd, utp_socket );
641}
642
643/***
644****
645***/
646
647static void
648event_enable( tr_peerIo * io, short event )
649{
650    assert( tr_amInEventThread( io->session ) );
651    assert( io->session != NULL );
652    assert( io->session->events != NULL );
653
654    if( io->socket < 0 )
655        return;
656
657    assert( io->session->events != NULL );
658    assert( event_initialized( io->event_read ) );
659    assert( event_initialized( io->event_write ) );
660
661    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
662    {
663        dbgmsg( io, "enabling libevent ready-to-read polling" );
664        event_add( io->event_read, NULL );
665        io->pendingEvents |= EV_READ;
666    }
667
668    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
669    {
670        dbgmsg( io, "enabling libevent ready-to-write polling" );
671        event_add( io->event_write, NULL );
672        io->pendingEvents |= EV_WRITE;
673    }
674}
675
676static void
677event_disable( struct tr_peerIo * io, short event )
678{
679    assert( tr_amInEventThread( io->session ) );
680    assert( io->session != NULL );
681
682    if( io->socket < 0 )
683        return;
684
685    assert( io->session->events != NULL );
686    assert( event_initialized( io->event_read ) );
687    assert( event_initialized( io->event_write ) );
688
689    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
690    {
691        dbgmsg( io, "disabling libevent ready-to-read polling" );
692        event_del( io->event_read );
693        io->pendingEvents &= ~EV_READ;
694    }
695
696    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
697    {
698        dbgmsg( io, "disabling libevent ready-to-write polling" );
699        event_del( io->event_write );
700        io->pendingEvents &= ~EV_WRITE;
701    }
702}
703
704void
705tr_peerIoSetEnabled( tr_peerIo    * io,
706                     tr_direction   dir,
707                     tr_bool        isEnabled )
708{
709    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
710
711    assert( tr_isPeerIo( io ) );
712    assert( tr_isDirection( dir ) );
713    assert( tr_amInEventThread( io->session ) );
714    assert( io->session->events != NULL );
715
716    if( isEnabled )
717        event_enable( io, event );
718    else
719        event_disable( io, event );
720}
721
722/***
723****
724***/
725
726static void
727io_dtor( void * vio )
728{
729    tr_peerIo * io = vio;
730
731    assert( tr_isPeerIo( io ) );
732    assert( tr_amInEventThread( io->session ) );
733    assert( io->session->events != NULL );
734
735    dbgmsg( io, "in tr_peerIo destructor" );
736    event_disable( io, EV_READ | EV_WRITE );
737    tr_bandwidthDestruct( &io->bandwidth );
738    evbuffer_free( io->outbuf );
739    evbuffer_free( io->inbuf );
740    if( io->socket >= 0 ) {
741        event_free( io->event_read );
742        event_free( io->event_write );
743        tr_netClose( io->session, io->socket );
744    }
745    if( io->utp_socket != NULL ) {
746        UTP_SetCallbacks( io->utp_socket,
747                          &dummy_utp_function_table,
748                          NULL );
749        UTP_Close( io->utp_socket );
750    }
751    tr_cryptoFree( io->crypto );
752    tr_list_free( &io->outbuf_datatypes, tr_free );
753
754    memset( io, ~0, sizeof( tr_peerIo ) );
755    tr_free( io );
756}
757
758static void
759tr_peerIoFree( tr_peerIo * io )
760{
761    if( io )
762    {
763        dbgmsg( io, "in tr_peerIoFree" );
764        io->canRead = NULL;
765        io->didWrite = NULL;
766        io->gotError = NULL;
767        tr_runInEventThread( io->session, io_dtor, io );
768    }
769}
770
771void
772tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
773{
774    assert( tr_isPeerIo( io ) );
775
776    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
777                file, line, io->refCount, io->refCount+1 );
778
779    ++io->refCount;
780}
781
782void
783tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
784{
785    assert( tr_isPeerIo( io ) );
786
787    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
788                file, line, io->refCount, io->refCount-1 );
789
790    if( !--io->refCount )
791        tr_peerIoFree( io );
792}
793
794const tr_address*
795tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
796{
797    assert( tr_isPeerIo( io ) );
798
799    if( port )
800        *port = io->port;
801
802    return &io->addr;
803}
804
805const char*
806tr_peerIoAddrStr( const tr_address * addr, tr_port port )
807{
808    static char buf[512];
809
810    if( addr->type == TR_AF_INET )
811        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
812    else
813        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
814    return buf;
815}
816
817const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
818{
819    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
820}
821
822void
823tr_peerIoSetIOFuncs( tr_peerIo        * io,
824                     tr_can_read_cb     readcb,
825                     tr_did_write_cb    writecb,
826                     tr_net_error_cb    errcb,
827                     void             * userData )
828{
829    io->canRead = readcb;
830    io->didWrite = writecb;
831    io->gotError = errcb;
832    io->userData = userData;
833}
834
835void
836tr_peerIoClear( tr_peerIo * io )
837{
838    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
839    tr_peerIoSetEnabled( io, TR_UP, FALSE );
840    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
841}
842
843int
844tr_peerIoReconnect( tr_peerIo * io )
845{
846    short int pendingEvents;
847    tr_session * session;
848
849    assert( tr_isPeerIo( io ) );
850    assert( !tr_peerIoIsIncoming( io ) );
851
852    session = tr_peerIoGetSession( io );
853
854    pendingEvents = io->pendingEvents;
855    event_disable( io, EV_READ | EV_WRITE );
856
857    if( io->socket >= 0 ) {
858        tr_netClose( session, io->socket );
859        io->socket = -1;
860        event_free( io->event_read );
861        event_free( io->event_write );
862    }
863    if( io->utp_socket != NULL ) {
864        UTP_SetCallbacks( io->utp_socket,
865                          &dummy_utp_function_table,
866                          NULL );
867        UTP_Close(io->utp_socket);
868        io->utp_socket = NULL;
869    }
870
871    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
872    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
873    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
874
875    if( io->socket >= 0 )
876    {
877        event_enable( io, pendingEvents );
878        tr_netSetTOS( io->socket, session->peerSocketTOS );
879        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
880        return 0;
881    }
882
883    return -1;
884}
885
886/**
887***
888**/
889
890void
891tr_peerIoSetTorrentHash( tr_peerIo *     io,
892                         const uint8_t * hash )
893{
894    assert( tr_isPeerIo( io ) );
895
896    tr_cryptoSetTorrentHash( io->crypto, hash );
897}
898
899const uint8_t*
900tr_peerIoGetTorrentHash( tr_peerIo * io )
901{
902    assert( tr_isPeerIo( io ) );
903    assert( io->crypto );
904
905    return tr_cryptoGetTorrentHash( io->crypto );
906}
907
908int
909tr_peerIoHasTorrentHash( const tr_peerIo * io )
910{
911    assert( tr_isPeerIo( io ) );
912    assert( io->crypto );
913
914    return tr_cryptoHasTorrentHash( io->crypto );
915}
916
917/**
918***
919**/
920
921void
922tr_peerIoSetPeersId( tr_peerIo *     io,
923                     const uint8_t * peer_id )
924{
925    assert( tr_isPeerIo( io ) );
926
927    if( ( io->peerIdIsSet = peer_id != NULL ) )
928        memcpy( io->peerId, peer_id, 20 );
929    else
930        memset( io->peerId, 0, 20 );
931}
932
933/**
934***
935**/
936
937static unsigned int
938getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
939{
940    /* this is all kind of arbitrary, but what seems to work well is
941     * being large enough to hold the next 20 seconds' worth of input,
942     * or a few blocks, whichever is bigger.
943     * It's okay to tweak this as needed */
944    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
945    const unsigned int period = 15u; /* arbitrary */
946    /* the 3 is arbitrary; the .5 is to leave room for messages */
947    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
948    return MAX( ceiling, currentSpeed_Bps*period );
949}
950
951size_t
952tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
953{
954    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
955    const size_t currentLen = evbuffer_get_length( io->outbuf );
956    size_t freeSpace = 0;
957
958    if( desiredLen > currentLen )
959        freeSpace = desiredLen - currentLen;
960
961    return freeSpace;
962}
963
964/**
965***
966**/
967
968void
969tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
970{
971    assert( tr_isPeerIo( io ) );
972    assert( encryptionMode == PEER_ENCRYPTION_NONE
973         || encryptionMode == PEER_ENCRYPTION_RC4 );
974
975    io->encryptionMode = encryptionMode;
976}
977
978/**
979***
980**/
981
982static void
983addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
984{
985    struct tr_datatype * d;
986
987    d = tr_new( struct tr_datatype, 1 );
988    d->isPieceData = isPieceData != 0;
989    d->length = byteCount;
990    tr_list_append( &io->outbuf_datatypes, d );
991}
992
993static struct evbuffer_iovec *
994evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
995{
996    const size_t byteCount = evbuffer_get_length( buf );
997    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
998    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
999    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
1000    assert( n == vecCount );
1001    *setme_vecCount = n;
1002    return iovec;
1003}
1004
1005static void
1006maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
1007{
1008    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
1009    {
1010        size_t i, n;
1011        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
1012
1013        for( i=0; i<n; ++i )
1014            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
1015
1016        tr_free( iovec );
1017    }
1018}
1019
1020void
1021tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
1022{
1023    const size_t byteCount = evbuffer_get_length( buf );
1024    maybeEncryptBuffer( io, buf );
1025    evbuffer_add_buffer( io->outbuf, buf );
1026    addDatatype( io, byteCount, isPieceData );
1027}
1028
1029void
1030tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
1031{
1032    struct evbuffer * buf = evbuffer_new( );
1033    evbuffer_add( buf, bytes, byteCount );
1034    tr_peerIoWriteBuf( io, buf, isPieceData );
1035    evbuffer_free( buf );
1036}
1037
1038/***
1039****
1040***/
1041
1042void
1043evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1044{
1045    evbuffer_add( outbuf, &byte, 1 );
1046}
1047
1048void
1049evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1050{
1051    const uint16_t ns = htons( addme_hs );
1052    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1053}
1054
1055void
1056evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1057{
1058    const uint32_t nl = htonl( addme_hl );
1059    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1060}
1061
1062/***
1063****
1064***/
1065
1066void
1067tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1068{
1069    assert( tr_isPeerIo( io ) );
1070    assert( evbuffer_get_length( inbuf )  >= byteCount );
1071
1072    switch( io->encryptionMode )
1073    {
1074        case PEER_ENCRYPTION_NONE:
1075            evbuffer_remove( inbuf, bytes, byteCount );
1076            break;
1077
1078        case PEER_ENCRYPTION_RC4:
1079            evbuffer_remove( inbuf, bytes, byteCount );
1080            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1081            break;
1082
1083        default:
1084            assert( 0 );
1085    }
1086}
1087
1088void
1089tr_peerIoReadUint16( tr_peerIo        * io,
1090                     struct evbuffer  * inbuf,
1091                     uint16_t         * setme )
1092{
1093    uint16_t tmp;
1094    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1095    *setme = ntohs( tmp );
1096}
1097
1098void tr_peerIoReadUint32( tr_peerIo        * io,
1099                          struct evbuffer  * inbuf,
1100                          uint32_t         * setme )
1101{
1102    uint32_t tmp;
1103    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1104    *setme = ntohl( tmp );
1105}
1106
1107void
1108tr_peerIoDrain( tr_peerIo       * io,
1109                struct evbuffer * inbuf,
1110                size_t            byteCount )
1111{
1112    void * buf = tr_sessionGetBuffer( io->session );
1113    const size_t buflen = SESSION_BUFFER_SIZE;
1114
1115    while( byteCount > 0 )
1116    {
1117        const size_t thisPass = MIN( byteCount, buflen );
1118        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1119        byteCount -= thisPass;
1120    }
1121
1122    tr_sessionReleaseBuffer( io->session );
1123}
1124
1125/***
1126****
1127***/
1128
1129static int
1130tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1131{
1132    int res = 0;
1133
1134    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1135    {
1136        if( io->utp_socket != NULL ) /* utp peer connection */
1137        {
1138            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1139             * It opens up the congestion window by sending an ACK (soonish)
1140             * if one was not going to be sent. */
1141            if( evbuffer_get_length( io->inbuf ) == 0 )
1142                UTP_RBDrained( io->utp_socket );
1143        }
1144        else /* tcp peer connection */
1145        {
1146            int e;
1147
1148            EVUTIL_SET_SOCKET_ERROR( 0 );
1149            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1150            e = EVUTIL_SOCKET_ERROR( );
1151
1152            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1153
1154            if( evbuffer_get_length( io->inbuf ) )
1155                canReadWrapper( io );
1156
1157            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1158            {
1159                char errstr[512];
1160                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1161                if( res == 0 )
1162                    what |= BEV_EVENT_EOF;
1163                tr_net_strerror( errstr, sizeof( errstr ), e );
1164                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1165                io->gotError( io, what, io->userData );
1166            }
1167        }
1168    }
1169
1170    return res;
1171}
1172
1173static int
1174tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1175{
1176    int n = 0;
1177    const size_t old_len = evbuffer_get_length( io->outbuf );
1178    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1179
1180    if( howmuch > old_len )
1181        howmuch = old_len;
1182
1183    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1184    {
1185        if( io->utp_socket != NULL ) /* utp peer connection */
1186        {
1187            const size_t old_len = evbuffer_get_length( io->outbuf );
1188            UTP_Write( io->utp_socket, howmuch );
1189            n = old_len - evbuffer_get_length( io->outbuf );
1190        }
1191        else
1192        {
1193            int e;
1194
1195            EVUTIL_SET_SOCKET_ERROR( 0 );
1196            n = tr_evbuffer_write( io, io->socket, howmuch );
1197            e = EVUTIL_SOCKET_ERROR( );
1198
1199            if( n > 0 )
1200                didWriteWrapper( io, n );
1201
1202            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1203            {
1204                char errstr[512];
1205                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1206
1207                tr_net_strerror( errstr, sizeof( errstr ), e );
1208                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1209
1210                if( io->gotError != NULL )
1211                    io->gotError( io, what, io->userData );
1212            }
1213        }
1214    }
1215
1216    return n;
1217}
1218
1219int
1220tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1221{
1222    int bytesUsed = 0;
1223
1224    assert( tr_isPeerIo( io ) );
1225    assert( tr_isDirection( dir ) );
1226
1227    if( dir == TR_DOWN )
1228        bytesUsed = tr_peerIoTryRead( io, limit );
1229    else
1230        bytesUsed = tr_peerIoTryWrite( io, limit );
1231
1232    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1233    return bytesUsed;
1234}
1235
1236int
1237tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1238{
1239    size_t byteCount = 0;
1240    tr_list * it;
1241
1242    /* count up how many bytes are used by non-piece-data messages
1243       at the front of our outbound queue */
1244    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1245    {
1246        struct tr_datatype * d = it->data;
1247
1248        if( d->isPieceData )
1249            break;
1250
1251        byteCount += d->length;
1252    }
1253
1254    return tr_peerIoFlush( io, TR_UP, byteCount );
1255}
Note: See TracBrowser for help on using the repository browser.