source: trunk/libtransmission/peer-io.c @ 11960

Last change on this file since 11960 was 11960, checked in by jordan, 11 years ago

add configure script switch to enable/disable utp

  • Property svn:keywords set to Date Rev Author Id
File size: 33.0 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11960 2011-02-18 00:45:44Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28#include <libutp/utp.h>
29
30#include "transmission.h"
31#include "session.h"
32#include "bandwidth.h"
33#include "crypto.h"
34#include "list.h"
35#include "net.h"
36#include "peer-common.h" /* MAX_BLOCK_SIZE */
37#include "peer-io.h"
38#include "trevent.h" /* tr_runInEventThread() */
39#include "utils.h"
40
41
42#define MAGIC_NUMBER 206745
43
44#ifdef WIN32
45 #define EAGAIN       WSAEWOULDBLOCK
46 #define EINTR        WSAEINTR
47 #define EINPROGRESS  WSAEINPROGRESS
48 #define EPIPE        WSAECONNRESET
49#endif
50
51/* The amount of read bufferring that we allow for uTP sockets. */
52
53#define UTP_READ_BUFFER_SIZE (256 * 1024)
54
55static size_t
56guessPacketOverhead( size_t d )
57{
58    /**
59     * http://sd.wareonearth.com/~phil/net/overhead/
60     *
61     * TCP over Ethernet:
62     * Assuming no header compression (e.g. not PPP)
63     * Add 20 IPv4 header or 40 IPv6 header (no options)
64     * Add 20 TCP header
65     * Add 12 bytes optional TCP timestamps
66     * Max TCP Payload data rates over ethernet are thus:
67     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
68     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
69     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
70     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
71     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
72     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
73     */
74    const double assumed_payload_data_rate = 94.0;
75
76    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
77}
78
79/**
80***
81**/
82
83#define dbgmsg( io, ... ) \
84    do { \
85        if( tr_deepLoggingIsActive( ) ) \
86            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
87    } while( 0 )
88
89struct tr_datatype
90{
91    tr_bool  isPieceData;
92    size_t   length;
93};
94
95
96/***
97****
98***/
99
100static void
101didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
102{
103     while( bytes_transferred && tr_isPeerIo( io ) )
104     {
105        struct tr_datatype * next = io->outbuf_datatypes->data;
106
107        const unsigned int payload = MIN( next->length, bytes_transferred );
108        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
109        const unsigned int overhead =
110            io->socket ? guessPacketOverhead( payload ) : 0;
111        const uint64_t now = tr_sessionGetTimeMsec( io->session );
112
113        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
114
115        if( overhead > 0 )
116            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
117
118        if( io->didWrite )
119            io->didWrite( io, payload, next->isPieceData, io->userData );
120
121        if( tr_isPeerIo( io ) )
122        {
123            bytes_transferred -= payload;
124            next->length -= payload;
125            if( !next->length ) {
126                tr_list_pop_front( &io->outbuf_datatypes );
127                tr_free( next );
128            }
129        }
130    }
131}
132
133static void
134canReadWrapper( tr_peerIo * io )
135{
136    tr_bool err = 0;
137    tr_bool done = 0;
138    tr_session * session;
139
140    dbgmsg( io, "canRead" );
141
142    assert( tr_isPeerIo( io ) );
143    assert( tr_isSession( io->session ) );
144    tr_peerIoRef( io );
145
146    session = io->session;
147
148    /* try to consume the input buffer */
149    if( io->canRead )
150    {
151        const uint64_t now = tr_sessionGetTimeMsec( io->session );
152
153        tr_sessionLock( session );
154
155        while( !done && !err )
156        {
157            size_t piece = 0;
158            const size_t oldLen = evbuffer_get_length( io->inbuf );
159            const int ret = io->canRead( io, io->userData, &piece );
160            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
161            const unsigned int overhead = guessPacketOverhead( used );
162
163            assert( tr_isPeerIo( io ) );
164
165            if( piece || (piece!=used) )
166            {
167                if( piece )
168                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
169
170                if( used != piece )
171                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
172            }
173
174            if( overhead > 0 )
175                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
176
177            switch( ret )
178            {
179                case READ_NOW:
180                    if( evbuffer_get_length( io->inbuf ) )
181                        continue;
182                    done = 1;
183                    break;
184
185                case READ_LATER:
186                    done = 1;
187                    break;
188
189                case READ_ERR:
190                    err = 1;
191                    break;
192            }
193
194            assert( tr_isPeerIo( io ) );
195        }
196
197        tr_sessionUnlock( session );
198    }
199
200    assert( tr_isPeerIo( io ) );
201    tr_peerIoUnref( io );
202}
203
204tr_bool
205tr_isPeerIo( const tr_peerIo * io )
206{
207    return ( io != NULL )
208        && ( io->magicNumber == MAGIC_NUMBER )
209        && ( io->refCount >= 0 )
210        && ( tr_isBandwidth( &io->bandwidth ) )
211        && ( tr_isAddress( &io->addr ) );
212}
213
214static void
215event_read_cb( int fd, short event UNUSED, void * vio )
216{
217    int res;
218    int e;
219    tr_peerIo * io = vio;
220
221    /* Limit the input buffer to 256K, so it doesn't grow too large */
222    unsigned int howmuch;
223    unsigned int curlen;
224    const tr_direction dir = TR_DOWN;
225    const unsigned int max = 256 * 1024;
226
227    assert( tr_isPeerIo( io ) );
228    assert( io->socket >= 0 );
229
230    io->pendingEvents &= ~EV_READ;
231
232    curlen = evbuffer_get_length( io->inbuf );
233    howmuch = curlen >= max ? 0 : max - curlen;
234    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
235
236    dbgmsg( io, "libevent says this peer is ready to read" );
237
238    /* if we don't have any bandwidth left, stop reading */
239    if( howmuch < 1 ) {
240        tr_peerIoSetEnabled( io, dir, FALSE );
241        return;
242    }
243
244    EVUTIL_SET_SOCKET_ERROR( 0 );
245    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
246    e = EVUTIL_SOCKET_ERROR( );
247
248    if( res > 0 )
249    {
250        tr_peerIoSetEnabled( io, dir, TRUE );
251
252        /* Invoke the user callback - must always be called last */
253        canReadWrapper( io );
254    }
255    else
256    {
257        char errstr[512];
258        short what = BEV_EVENT_READING;
259
260        if( res == 0 ) /* EOF */
261            what |= BEV_EVENT_EOF;
262        else if( res == -1 ) {
263            if( e == EAGAIN || e == EINTR ) {
264                tr_peerIoSetEnabled( io, dir, TRUE );
265                return;
266            }
267            what |= BEV_EVENT_ERROR;
268        }
269
270        tr_net_strerror( errstr, sizeof( errstr ), e );
271        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
272
273        if( io->gotError != NULL )
274            io->gotError( io, what, io->userData );
275    }
276}
277
278static int
279tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
280{
281    int e;
282    int n;
283    char errstr[256];
284
285    EVUTIL_SET_SOCKET_ERROR( 0 );
286    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
287    e = EVUTIL_SOCKET_ERROR( );
288    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
289
290    return n;
291}
292
293static void
294event_write_cb( int fd, short event UNUSED, void * vio )
295{
296    int res = 0;
297    int e;
298    short what = BEV_EVENT_WRITING;
299    tr_peerIo * io = vio;
300    size_t howmuch;
301    const tr_direction dir = TR_UP;
302
303    assert( tr_isPeerIo( io ) );
304    assert( io->socket >= 0 );
305
306    io->pendingEvents &= ~EV_WRITE;
307
308    dbgmsg( io, "libevent says this peer is ready to write" );
309
310    /* Write as much as possible, since the socket is non-blocking, write() will
311     * return if it can't write any more data without blocking */
312    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
313
314    /* if we don't have any bandwidth left, stop writing */
315    if( howmuch < 1 ) {
316        tr_peerIoSetEnabled( io, dir, FALSE );
317        return;
318    }
319
320    EVUTIL_SET_SOCKET_ERROR( 0 );
321    res = tr_evbuffer_write( io, fd, howmuch );
322    e = EVUTIL_SOCKET_ERROR( );
323
324    if (res == -1) {
325        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
326            goto reschedule;
327        /* error case */
328        what |= BEV_EVENT_ERROR;
329    } else if (res == 0) {
330        /* eof case */
331        what |= BEV_EVENT_EOF;
332    }
333    if (res <= 0)
334        goto error;
335
336    if( evbuffer_get_length( io->outbuf ) )
337        tr_peerIoSetEnabled( io, dir, TRUE );
338
339    didWriteWrapper( io, res );
340    return;
341
342 reschedule:
343    if( evbuffer_get_length( io->outbuf ) )
344        tr_peerIoSetEnabled( io, dir, TRUE );
345    return;
346
347 error:
348
349    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
350
351    if( io->gotError != NULL )
352        io->gotError( io, what, io->userData );
353}
354
355/**
356***
357**/
358
359static void
360maybeSetCongestionAlgorithm( int socket, const char * algorithm )
361{
362    if( algorithm && *algorithm )
363    {
364        const int rc = tr_netSetCongestionControl( socket, algorithm );
365
366        if( rc < 0 )
367            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
368                     algorithm, tr_strerror( errno ));
369    }
370}
371
372#ifdef WITH_UTP
373/* UTP callbacks */
374
375static void
376utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
377{
378    int rc;
379    tr_peerIo *io = (tr_peerIo *)closure;
380    assert( tr_isPeerIo( io ) );
381
382    rc = evbuffer_add( io->inbuf, buf, buflen );
383    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
384
385    if( rc < 0 ) {
386        tr_nerr( "UTP", "On read evbuffer_add" );
387        return;
388    }
389
390    tr_peerIoSetEnabled( io, TR_DOWN, TRUE );
391    canReadWrapper( io );
392}
393
394static void
395utp_on_write(void *closure, unsigned char *buf, size_t buflen)
396{
397    tr_peerIo *io = (tr_peerIo *)closure;
398    int rc;
399    assert( tr_isPeerIo( io ) );
400
401    rc = evbuffer_remove( io->outbuf, buf, buflen );
402    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
403    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
404    if( rc < (long)buflen ) {
405        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
406    }
407
408    didWriteWrapper( io, buflen );
409}
410
411static size_t
412utp_get_rb_size(void *closure)
413{
414    tr_peerIo *io = (tr_peerIo *)closure;
415    size_t bytes;
416    assert( tr_isPeerIo( io ) );
417
418    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
419
420    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
421    return UTP_READ_BUFFER_SIZE - bytes;
422}
423
424static void
425utp_on_state_change(void *closure, int state)
426{
427    tr_peerIo *io = (tr_peerIo *)closure;
428    assert( tr_isPeerIo( io ) );
429
430    if( state == UTP_STATE_CONNECT ) {
431        dbgmsg( io, "utp_on_state_change -- changed to connected" );
432        io->utpSupported = TRUE;
433    } else if( state == UTP_STATE_WRITABLE ) {
434        dbgmsg( io, "utp_on_state_change -- changed to writable" );
435    } else if( state == UTP_STATE_EOF ) {
436        if( io->gotError )
437            io->gotError( io, BEV_EVENT_EOF, io->userData );
438    } else if( state == UTP_STATE_DESTROYING ) {
439        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
440        return;
441    } else {
442        tr_nerr( "UTP", "Unknown state %d", state );
443    }
444}
445
446static void
447utp_on_error(void *closure, int errcode)
448{
449    tr_peerIo *io = (tr_peerIo *)closure;
450    assert( tr_isPeerIo( io ) );
451
452    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
453
454    if( io->gotError ) {
455        errno = errcode;
456        io->gotError( io, BEV_EVENT_ERROR, io->userData );
457    }
458}
459
460static void
461utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
462{
463    tr_peerIo *io = (tr_peerIo *)closure;
464    assert( tr_isPeerIo( io ) );
465
466    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
467
468    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
469                      count, FALSE, tr_time_msec() );
470}
471
472static struct UTPFunctionTable utp_function_table = {
473    .on_read = utp_on_read,
474    .on_write = utp_on_write,
475    .get_rb_size = utp_get_rb_size,
476    .on_state = utp_on_state_change,
477    .on_error = utp_on_error,
478    .on_overhead = utp_on_overhead
479};
480
481
482/* Dummy UTP callbacks. */
483/* We switch a UTP socket to use these after the associated peerIo has been
484   destroyed -- see io_dtor. */
485
486static void
487dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
488{
489    /* This cannot happen, as far as I'm aware. */
490    tr_nerr( "UTP", "On_read called on closed socket" );
491
492}
493
494static void
495dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
496{
497    /* This can very well happen if we've shut down a peer connection that
498       had unflushed buffers.  Complain and send zeroes. */
499    tr_ndbg( "UTP", "On_write called on closed socket" );
500    memset( buf, 0, buflen );
501}
502
503static size_t
504dummy_get_rb_size( void * closure UNUSED )
505{
506    return 0;
507}
508
509static void
510dummy_on_state_change(void * closure UNUSED, int state UNUSED )
511{
512    return;
513}
514
515static void
516dummy_on_error( void * closure UNUSED, int errcode UNUSED )
517{
518    return;
519}
520
521static void
522dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
523{
524    return;
525}
526
527static struct UTPFunctionTable dummy_utp_function_table = {
528    .on_read = dummy_read,
529    .on_write = dummy_write,
530    .get_rb_size = dummy_get_rb_size,
531    .on_state = dummy_on_state_change,
532    .on_error = dummy_on_error,
533    .on_overhead = dummy_on_overhead
534};
535
536#endif /* #ifdef WITH_UTP */
537
538static tr_peerIo*
539tr_peerIoNew( tr_session       * session,
540              tr_bandwidth     * parent,
541              const tr_address * addr,
542              tr_port            port,
543              const uint8_t    * torrentHash,
544              tr_bool            isIncoming,
545              tr_bool            isSeed,
546              int                socket,
547              struct UTPSocket * utp_socket)
548{
549    tr_peerIo * io;
550
551    assert( session != NULL );
552    assert( session->events != NULL );
553    assert( tr_isBool( isIncoming ) );
554    assert( tr_isBool( isSeed ) );
555    assert( tr_amInEventThread( session ) );
556    assert( (socket < 0) == (utp_socket != NULL) );
557#ifndef WITH_UTP
558    assert( socket >= 0 );
559#endif
560
561    if( socket >= 0 ) {
562        tr_netSetTOS( socket, session->peerSocketTOS );
563        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
564    }
565
566    io = tr_new0( tr_peerIo, 1 );
567    io->magicNumber = MAGIC_NUMBER;
568    io->refCount = 1;
569    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
570    io->session = session;
571    io->addr = *addr;
572    io->isSeed = isSeed;
573    io->port = port;
574    io->socket = socket;
575    io->utp_socket = utp_socket;
576    io->isIncoming = isIncoming != 0;
577    io->timeCreated = tr_time( );
578    io->inbuf = evbuffer_new( );
579    io->outbuf = evbuffer_new( );
580    tr_bandwidthConstruct( &io->bandwidth, session, parent );
581    tr_bandwidthSetPeer( &io->bandwidth, io );
582    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
583    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
584
585    if( io->socket >= 0 ) {
586        io->event_read = event_new( session->event_base,
587                                    io->socket, EV_READ, event_read_cb, io );
588        io->event_write = event_new( session->event_base,
589                                     io->socket, EV_WRITE, event_write_cb, io );
590    }
591#ifdef WITH_UTP
592    else {
593        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
594        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
595        UTP_SetCallbacks( utp_socket,
596                          &utp_function_table,
597                          io );
598        if( !isIncoming ) {
599            dbgmsg( io, "%s", "calling UTP_Connect" );
600            UTP_Connect( utp_socket );
601        }
602    }
603#endif
604
605    return io;
606}
607
608tr_peerIo*
609tr_peerIoNewIncoming( tr_session        * session,
610                      tr_bandwidth      * parent,
611                      const tr_address  * addr,
612                      tr_port             port,
613                      int                 fd,
614                      struct UTPSocket  * utp_socket )
615{
616    assert( session );
617    assert( tr_isAddress( addr ) );
618
619    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, 
620                         fd, utp_socket );
621}
622
623tr_peerIo*
624tr_peerIoNewOutgoing( tr_session        * session,
625                      tr_bandwidth      * parent,
626                      const tr_address  * addr,
627                      tr_port             port,
628                      const uint8_t     * torrentHash,
629                      tr_bool             isSeed,
630                      tr_bool             utp UNUSED )
631{
632    int fd = -1;
633    struct UTPSocket *utp_socket = NULL;
634
635    assert( session );
636    assert( tr_isAddress( addr ) );
637    assert( torrentHash );
638
639#ifdef WITH_UTP
640    if( utp )
641        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
642#endif
643    if( !utp_socket ) {
644        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
645        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
646    }
647
648    if( fd < 0 && utp_socket == NULL )
649        return NULL;
650
651    return tr_peerIoNew( session, parent, addr, port,
652                         torrentHash, FALSE, isSeed, fd, utp_socket );
653}
654
655/***
656****
657***/
658
659static void
660event_enable( tr_peerIo * io, short event )
661{
662    assert( tr_amInEventThread( io->session ) );
663    assert( io->session != NULL );
664    assert( io->session->events != NULL );
665
666    if( io->socket < 0 )
667        return;
668
669    assert( io->session->events != NULL );
670    assert( event_initialized( io->event_read ) );
671    assert( event_initialized( io->event_write ) );
672
673    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
674    {
675        dbgmsg( io, "enabling libevent ready-to-read polling" );
676        event_add( io->event_read, NULL );
677        io->pendingEvents |= EV_READ;
678    }
679
680    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
681    {
682        dbgmsg( io, "enabling libevent ready-to-write polling" );
683        event_add( io->event_write, NULL );
684        io->pendingEvents |= EV_WRITE;
685    }
686}
687
688static void
689event_disable( struct tr_peerIo * io, short event )
690{
691    assert( tr_amInEventThread( io->session ) );
692    assert( io->session != NULL );
693
694    if( io->socket < 0 )
695        return;
696
697    assert( io->session->events != NULL );
698    assert( event_initialized( io->event_read ) );
699    assert( event_initialized( io->event_write ) );
700
701    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
702    {
703        dbgmsg( io, "disabling libevent ready-to-read polling" );
704        event_del( io->event_read );
705        io->pendingEvents &= ~EV_READ;
706    }
707
708    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
709    {
710        dbgmsg( io, "disabling libevent ready-to-write polling" );
711        event_del( io->event_write );
712        io->pendingEvents &= ~EV_WRITE;
713    }
714}
715
716void
717tr_peerIoSetEnabled( tr_peerIo    * io,
718                     tr_direction   dir,
719                     tr_bool        isEnabled )
720{
721    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
722
723    assert( tr_isPeerIo( io ) );
724    assert( tr_isDirection( dir ) );
725    assert( tr_amInEventThread( io->session ) );
726    assert( io->session->events != NULL );
727
728    if( isEnabled )
729        event_enable( io, event );
730    else
731        event_disable( io, event );
732}
733
734/***
735****
736***/
737static void
738io_close_socket( tr_peerIo * io )
739{
740    if( io->socket >= 0 ) {
741        tr_netClose( io->session, io->socket );
742        io->socket = -1;
743        event_free( io->event_read );
744        event_free( io->event_write );
745    }
746
747#ifdef WITH_UTP
748    if( io->utp_socket ) {
749        UTP_SetCallbacks( io->utp_socket,
750                          &dummy_utp_function_table,
751                          NULL );
752        UTP_Close( io->utp_socket );
753
754        io->utp_socket = NULL;
755    }
756#endif
757}
758
759static void
760io_dtor( void * vio )
761{
762    tr_peerIo * io = vio;
763
764    assert( tr_isPeerIo( io ) );
765    assert( tr_amInEventThread( io->session ) );
766    assert( io->session->events != NULL );
767
768    dbgmsg( io, "in tr_peerIo destructor" );
769    event_disable( io, EV_READ | EV_WRITE );
770    tr_bandwidthDestruct( &io->bandwidth );
771    evbuffer_free( io->outbuf );
772    evbuffer_free( io->inbuf );
773    io_close_socket( io );
774    tr_cryptoFree( io->crypto );
775    tr_list_free( &io->outbuf_datatypes, tr_free );
776
777    memset( io, ~0, sizeof( tr_peerIo ) );
778    tr_free( io );
779}
780
781static void
782tr_peerIoFree( tr_peerIo * io )
783{
784    if( io )
785    {
786        dbgmsg( io, "in tr_peerIoFree" );
787        io->canRead = NULL;
788        io->didWrite = NULL;
789        io->gotError = NULL;
790        tr_runInEventThread( io->session, io_dtor, io );
791    }
792}
793
794void
795tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
796{
797    assert( tr_isPeerIo( io ) );
798
799    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
800                file, line, io->refCount, io->refCount+1 );
801
802    ++io->refCount;
803}
804
805void
806tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
807{
808    assert( tr_isPeerIo( io ) );
809
810    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
811                file, line, io->refCount, io->refCount-1 );
812
813    if( !--io->refCount )
814        tr_peerIoFree( io );
815}
816
817const tr_address*
818tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
819{
820    assert( tr_isPeerIo( io ) );
821
822    if( port )
823        *port = io->port;
824
825    return &io->addr;
826}
827
828const char*
829tr_peerIoAddrStr( const tr_address * addr, tr_port port )
830{
831    static char buf[512];
832
833    if( addr->type == TR_AF_INET )
834        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
835    else
836        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
837    return buf;
838}
839
840const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
841{
842    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
843}
844
845void
846tr_peerIoSetIOFuncs( tr_peerIo        * io,
847                     tr_can_read_cb     readcb,
848                     tr_did_write_cb    writecb,
849                     tr_net_error_cb    errcb,
850                     void             * userData )
851{
852    io->canRead = readcb;
853    io->didWrite = writecb;
854    io->gotError = errcb;
855    io->userData = userData;
856}
857
858void
859tr_peerIoClear( tr_peerIo * io )
860{
861    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
862    tr_peerIoSetEnabled( io, TR_UP, FALSE );
863    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
864}
865
866int
867tr_peerIoReconnect( tr_peerIo * io )
868{
869    short int pendingEvents;
870    tr_session * session;
871
872    assert( tr_isPeerIo( io ) );
873    assert( !tr_peerIoIsIncoming( io ) );
874
875    session = tr_peerIoGetSession( io );
876
877    pendingEvents = io->pendingEvents;
878    event_disable( io, EV_READ | EV_WRITE );
879
880    io_close_socket( io );
881
882    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
883    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
884    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
885
886    if( io->socket >= 0 )
887    {
888        event_enable( io, pendingEvents );
889        tr_netSetTOS( io->socket, session->peerSocketTOS );
890        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
891        return 0;
892    }
893
894    return -1;
895}
896
897/**
898***
899**/
900
901void
902tr_peerIoSetTorrentHash( tr_peerIo *     io,
903                         const uint8_t * hash )
904{
905    assert( tr_isPeerIo( io ) );
906
907    tr_cryptoSetTorrentHash( io->crypto, hash );
908}
909
910const uint8_t*
911tr_peerIoGetTorrentHash( tr_peerIo * io )
912{
913    assert( tr_isPeerIo( io ) );
914    assert( io->crypto );
915
916    return tr_cryptoGetTorrentHash( io->crypto );
917}
918
919int
920tr_peerIoHasTorrentHash( const tr_peerIo * io )
921{
922    assert( tr_isPeerIo( io ) );
923    assert( io->crypto );
924
925    return tr_cryptoHasTorrentHash( io->crypto );
926}
927
928/**
929***
930**/
931
932void
933tr_peerIoSetPeersId( tr_peerIo *     io,
934                     const uint8_t * peer_id )
935{
936    assert( tr_isPeerIo( io ) );
937
938    if( ( io->peerIdIsSet = peer_id != NULL ) )
939        memcpy( io->peerId, peer_id, 20 );
940    else
941        memset( io->peerId, 0, 20 );
942}
943
944/**
945***
946**/
947
948static unsigned int
949getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
950{
951    /* this is all kind of arbitrary, but what seems to work well is
952     * being large enough to hold the next 20 seconds' worth of input,
953     * or a few blocks, whichever is bigger.
954     * It's okay to tweak this as needed */
955    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
956    const unsigned int period = 15u; /* arbitrary */
957    /* the 3 is arbitrary; the .5 is to leave room for messages */
958    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
959    return MAX( ceiling, currentSpeed_Bps*period );
960}
961
962size_t
963tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
964{
965    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
966    const size_t currentLen = evbuffer_get_length( io->outbuf );
967    size_t freeSpace = 0;
968
969    if( desiredLen > currentLen )
970        freeSpace = desiredLen - currentLen;
971
972    return freeSpace;
973}
974
975/**
976***
977**/
978
979void
980tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
981{
982    assert( tr_isPeerIo( io ) );
983    assert( encryptionMode == PEER_ENCRYPTION_NONE
984         || encryptionMode == PEER_ENCRYPTION_RC4 );
985
986    io->encryptionMode = encryptionMode;
987}
988
989/**
990***
991**/
992
993static void
994addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
995{
996    struct tr_datatype * d;
997
998    d = tr_new( struct tr_datatype, 1 );
999    d->isPieceData = isPieceData != 0;
1000    d->length = byteCount;
1001    tr_list_append( &io->outbuf_datatypes, d );
1002}
1003
1004static struct evbuffer_iovec *
1005evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
1006{
1007    const size_t byteCount = evbuffer_get_length( buf );
1008    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
1009    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
1010    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
1011    assert( n == vecCount );
1012    *setme_vecCount = n;
1013    return iovec;
1014}
1015
1016static void
1017maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
1018{
1019    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
1020    {
1021        size_t i, n;
1022        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
1023
1024        for( i=0; i<n; ++i )
1025            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
1026
1027        tr_free( iovec );
1028    }
1029}
1030
1031void
1032tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
1033{
1034    const size_t byteCount = evbuffer_get_length( buf );
1035    maybeEncryptBuffer( io, buf );
1036    evbuffer_add_buffer( io->outbuf, buf );
1037    addDatatype( io, byteCount, isPieceData );
1038}
1039
1040void
1041tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
1042{
1043    struct evbuffer * buf = evbuffer_new( );
1044    evbuffer_add( buf, bytes, byteCount );
1045    tr_peerIoWriteBuf( io, buf, isPieceData );
1046    evbuffer_free( buf );
1047}
1048
1049/***
1050****
1051***/
1052
1053void
1054evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1055{
1056    evbuffer_add( outbuf, &byte, 1 );
1057}
1058
1059void
1060evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1061{
1062    const uint16_t ns = htons( addme_hs );
1063    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1064}
1065
1066void
1067evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1068{
1069    const uint32_t nl = htonl( addme_hl );
1070    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1071}
1072
1073/***
1074****
1075***/
1076
1077void
1078tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1079{
1080    assert( tr_isPeerIo( io ) );
1081    assert( evbuffer_get_length( inbuf )  >= byteCount );
1082
1083    switch( io->encryptionMode )
1084    {
1085        case PEER_ENCRYPTION_NONE:
1086            evbuffer_remove( inbuf, bytes, byteCount );
1087            break;
1088
1089        case PEER_ENCRYPTION_RC4:
1090            evbuffer_remove( inbuf, bytes, byteCount );
1091            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1092            break;
1093
1094        default:
1095            assert( 0 );
1096    }
1097}
1098
1099void
1100tr_peerIoReadUint16( tr_peerIo        * io,
1101                     struct evbuffer  * inbuf,
1102                     uint16_t         * setme )
1103{
1104    uint16_t tmp;
1105    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1106    *setme = ntohs( tmp );
1107}
1108
1109void tr_peerIoReadUint32( tr_peerIo        * io,
1110                          struct evbuffer  * inbuf,
1111                          uint32_t         * setme )
1112{
1113    uint32_t tmp;
1114    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1115    *setme = ntohl( tmp );
1116}
1117
1118void
1119tr_peerIoDrain( tr_peerIo       * io,
1120                struct evbuffer * inbuf,
1121                size_t            byteCount )
1122{
1123    void * buf = tr_sessionGetBuffer( io->session );
1124    const size_t buflen = SESSION_BUFFER_SIZE;
1125
1126    while( byteCount > 0 )
1127    {
1128        const size_t thisPass = MIN( byteCount, buflen );
1129        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1130        byteCount -= thisPass;
1131    }
1132
1133    tr_sessionReleaseBuffer( io->session );
1134}
1135
1136/***
1137****
1138***/
1139
1140static int
1141tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1142{
1143    int res = 0;
1144
1145    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1146    {
1147#ifdef WITH_UTP
1148        if( io->utp_socket != NULL ) /* utp peer connection */
1149        {
1150            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1151             * It opens up the congestion window by sending an ACK (soonish)
1152             * if one was not going to be sent. */
1153            if( evbuffer_get_length( io->inbuf ) == 0 )
1154                UTP_RBDrained( io->utp_socket );
1155        }
1156        else /* tcp peer connection */
1157#endif
1158        {
1159            int e;
1160
1161            EVUTIL_SET_SOCKET_ERROR( 0 );
1162            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1163            e = EVUTIL_SOCKET_ERROR( );
1164
1165            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1166
1167            if( evbuffer_get_length( io->inbuf ) )
1168                canReadWrapper( io );
1169
1170            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1171            {
1172                char errstr[512];
1173                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1174                if( res == 0 )
1175                    what |= BEV_EVENT_EOF;
1176                tr_net_strerror( errstr, sizeof( errstr ), e );
1177                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1178                io->gotError( io, what, io->userData );
1179            }
1180        }
1181    }
1182
1183    return res;
1184}
1185
1186static int
1187tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1188{
1189    int n = 0;
1190    const size_t old_len = evbuffer_get_length( io->outbuf );
1191    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1192
1193    if( howmuch > old_len )
1194        howmuch = old_len;
1195
1196    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1197    {
1198#ifdef WITH_UTP
1199        if( io->utp_socket != NULL ) /* utp peer connection */
1200        {
1201            const size_t old_len = evbuffer_get_length( io->outbuf );
1202            UTP_Write( io->utp_socket, howmuch );
1203            n = old_len - evbuffer_get_length( io->outbuf );
1204        }
1205        else
1206#endif
1207        {
1208            int e;
1209
1210            EVUTIL_SET_SOCKET_ERROR( 0 );
1211            n = tr_evbuffer_write( io, io->socket, howmuch );
1212            e = EVUTIL_SOCKET_ERROR( );
1213
1214            if( n > 0 )
1215                didWriteWrapper( io, n );
1216
1217            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1218            {
1219                char errstr[512];
1220                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1221
1222                tr_net_strerror( errstr, sizeof( errstr ), e );
1223                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1224
1225                if( io->gotError != NULL )
1226                    io->gotError( io, what, io->userData );
1227            }
1228        }
1229    }
1230
1231    return n;
1232}
1233
1234int
1235tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1236{
1237    int bytesUsed = 0;
1238
1239    assert( tr_isPeerIo( io ) );
1240    assert( tr_isDirection( dir ) );
1241
1242    if( dir == TR_DOWN )
1243        bytesUsed = tr_peerIoTryRead( io, limit );
1244    else
1245        bytesUsed = tr_peerIoTryWrite( io, limit );
1246
1247    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1248    return bytesUsed;
1249}
1250
1251int
1252tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1253{
1254    size_t byteCount = 0;
1255    tr_list * it;
1256
1257    /* count up how many bytes are used by non-piece-data messages
1258       at the front of our outbound queue */
1259    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1260    {
1261        struct tr_datatype * d = it->data;
1262
1263        if( d->isPieceData )
1264            break;
1265
1266        byteCount += d->length;
1267    }
1268
1269    return tr_peerIoFlush( io, TR_UP, byteCount );
1270}
Note: See TracBrowser for help on using the repository browser.