source: trunk/libtransmission/peer-io.c @ 11930

Last change on this file since 11930 was 11930, checked in by jch, 11 years ago

Implement pacing of reads.

This should cause uTP sockets to respect read bandwidth limits. I'm not so
sure about the values we return for the read buffer size -- perhaps we
should allow some slack for network latency?

  • Property svn:keywords set to Date Rev Author Id
File size: 31.9 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11930 2011-02-18 00:36:09Z jch $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28#include <libutp/utp.h>
29
30#include "transmission.h"
31#include "session.h"
32#include "bandwidth.h"
33#include "crypto.h"
34#include "list.h"
35#include "net.h"
36#include "peer-common.h" /* MAX_BLOCK_SIZE */
37#include "peer-io.h"
38#include "trevent.h" /* tr_runInEventThread() */
39#include "utils.h"
40
41#define MAGIC_NUMBER 206745
42
43#ifdef WIN32
44 #define EAGAIN       WSAEWOULDBLOCK
45 #define EINTR        WSAEINTR
46 #define EINPROGRESS  WSAEINPROGRESS
47 #define EPIPE        WSAECONNRESET
48#endif
49
50/* The amount of read bufferring that we allow for uTP sockets. */
51
52#define UTP_READ_BUFFER_SIZE (256 * 1024)
53
54static size_t
55guessPacketOverhead( size_t d )
56{
57    /**
58     * http://sd.wareonearth.com/~phil/net/overhead/
59     *
60     * TCP over Ethernet:
61     * Assuming no header compression (e.g. not PPP)
62     * Add 20 IPv4 header or 40 IPv6 header (no options)
63     * Add 20 TCP header
64     * Add 12 bytes optional TCP timestamps
65     * Max TCP Payload data rates over ethernet are thus:
66     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
67     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
68     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
69     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
70     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
71     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
72     */
73    const double assumed_payload_data_rate = 94.0;
74
75    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
76}
77
78/**
79***
80**/
81
82#define dbgmsg( io, ... ) \
83    do { \
84        if( tr_deepLoggingIsActive( ) ) \
85            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
86    } while( 0 )
87
88struct tr_datatype
89{
90    tr_bool  isPieceData;
91    size_t   length;
92};
93
94
95/***
96****
97***/
98
99static void
100didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
101{
102     while( bytes_transferred && tr_isPeerIo( io ) )
103     {
104        struct tr_datatype * next = io->outbuf_datatypes->data;
105
106        const unsigned int payload = MIN( next->length, bytes_transferred );
107        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
108        const unsigned int overhead =
109            io->socket ? guessPacketOverhead( payload ) : 0;
110        const uint64_t now = tr_sessionGetTimeMsec( io->session );
111
112        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
113
114        if( overhead > 0 )
115            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
116
117        if( io->didWrite )
118            io->didWrite( io, payload, next->isPieceData, io->userData );
119
120        if( tr_isPeerIo( io ) )
121        {
122            bytes_transferred -= payload;
123            next->length -= payload;
124            if( !next->length ) {
125                tr_list_pop_front( &io->outbuf_datatypes );
126                tr_free( next );
127            }
128        }
129    }
130}
131
132static void
133canReadWrapper( tr_peerIo * io )
134{
135    tr_bool err = 0;
136    tr_bool done = 0;
137    tr_session * session;
138
139    dbgmsg( io, "canRead" );
140
141    assert( tr_isPeerIo( io ) );
142    assert( tr_isSession( io->session ) );
143    tr_peerIoRef( io );
144
145    session = io->session;
146
147    /* try to consume the input buffer */
148    if( io->canRead )
149    {
150        const uint64_t now = tr_sessionGetTimeMsec( io->session );
151
152        tr_sessionLock( session );
153
154        while( !done && !err )
155        {
156            size_t piece = 0;
157            const size_t oldLen = evbuffer_get_length( io->inbuf );
158            const int ret = io->canRead( io, io->userData, &piece );
159            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
160            const unsigned int overhead = guessPacketOverhead( used );
161
162            assert( tr_isPeerIo( io ) );
163
164            if( piece || (piece!=used) )
165            {
166                if( piece )
167                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
168
169                if( used != piece )
170                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
171            }
172
173            if( overhead > 0 )
174                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
175
176            switch( ret )
177            {
178                case READ_NOW:
179                    if( evbuffer_get_length( io->inbuf ) )
180                        continue;
181                    done = 1;
182                    break;
183
184                case READ_LATER:
185                    done = 1;
186                    break;
187
188                case READ_ERR:
189                    err = 1;
190                    break;
191            }
192
193            assert( tr_isPeerIo( io ) );
194        }
195
196        tr_sessionUnlock( session );
197    }
198
199    assert( tr_isPeerIo( io ) );
200    tr_peerIoUnref( io );
201}
202
203tr_bool
204tr_isPeerIo( const tr_peerIo * io )
205{
206    return ( io != NULL )
207        && ( io->magicNumber == MAGIC_NUMBER )
208        && ( io->refCount >= 0 )
209        && ( tr_isBandwidth( &io->bandwidth ) )
210        && ( tr_isAddress( &io->addr ) );
211}
212
213static void
214event_read_cb( int fd, short event UNUSED, void * vio )
215{
216    int res;
217    int e;
218    tr_peerIo * io = vio;
219
220    /* Limit the input buffer to 256K, so it doesn't grow too large */
221    unsigned int howmuch;
222    unsigned int curlen;
223    const tr_direction dir = TR_DOWN;
224    const unsigned int max = 256 * 1024;
225
226    assert( tr_isPeerIo( io ) );
227    assert( io->socket >= 0 );
228
229    io->pendingEvents &= ~EV_READ;
230
231    curlen = evbuffer_get_length( io->inbuf );
232    howmuch = curlen >= max ? 0 : max - curlen;
233    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
234
235    dbgmsg( io, "libevent says this peer is ready to read" );
236
237    /* if we don't have any bandwidth left, stop reading */
238    if( howmuch < 1 ) {
239        tr_peerIoSetEnabled( io, dir, FALSE );
240        return;
241    }
242
243    EVUTIL_SET_SOCKET_ERROR( 0 );
244    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
245    e = EVUTIL_SOCKET_ERROR( );
246
247    if( res > 0 )
248    {
249        tr_peerIoSetEnabled( io, dir, TRUE );
250
251        /* Invoke the user callback - must always be called last */
252        canReadWrapper( io );
253    }
254    else
255    {
256        char errstr[512];
257        short what = BEV_EVENT_READING;
258
259        if( res == 0 ) /* EOF */
260            what |= BEV_EVENT_EOF;
261        else if( res == -1 ) {
262            if( e == EAGAIN || e == EINTR ) {
263                tr_peerIoSetEnabled( io, dir, TRUE );
264                return;
265            }
266            what |= BEV_EVENT_ERROR;
267        }
268
269        tr_net_strerror( errstr, sizeof( errstr ), e );
270        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
271
272        if( io->gotError != NULL )
273            io->gotError( io, what, io->userData );
274    }
275}
276
277static int
278tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
279{
280    int e;
281    int n;
282    char errstr[256];
283
284    EVUTIL_SET_SOCKET_ERROR( 0 );
285    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
286    e = EVUTIL_SOCKET_ERROR( );
287    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
288
289    return n;
290}
291
292static void
293event_write_cb( int fd, short event UNUSED, void * vio )
294{
295    int res = 0;
296    int e;
297    short what = BEV_EVENT_WRITING;
298    tr_peerIo * io = vio;
299    size_t howmuch;
300    const tr_direction dir = TR_UP;
301
302    assert( tr_isPeerIo( io ) );
303    assert( io->socket >= 0 );
304
305    io->pendingEvents &= ~EV_WRITE;
306
307    dbgmsg( io, "libevent says this peer is ready to write" );
308
309    /* Write as much as possible, since the socket is non-blocking, write() will
310     * return if it can't write any more data without blocking */
311    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
312
313    /* if we don't have any bandwidth left, stop writing */
314    if( howmuch < 1 ) {
315        tr_peerIoSetEnabled( io, dir, FALSE );
316        return;
317    }
318
319    EVUTIL_SET_SOCKET_ERROR( 0 );
320    res = tr_evbuffer_write( io, fd, howmuch );
321    e = EVUTIL_SOCKET_ERROR( );
322
323    if (res == -1) {
324        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
325            goto reschedule;
326        /* error case */
327        what |= BEV_EVENT_ERROR;
328    } else if (res == 0) {
329        /* eof case */
330        what |= BEV_EVENT_EOF;
331    }
332    if (res <= 0)
333        goto error;
334
335    if( evbuffer_get_length( io->outbuf ) )
336        tr_peerIoSetEnabled( io, dir, TRUE );
337
338    didWriteWrapper( io, res );
339    return;
340
341 reschedule:
342    if( evbuffer_get_length( io->outbuf ) )
343        tr_peerIoSetEnabled( io, dir, TRUE );
344    return;
345
346 error:
347
348    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
349
350    if( io->gotError != NULL )
351        io->gotError( io, what, io->userData );
352}
353
354/**
355***
356**/
357
358static void
359maybeSetCongestionAlgorithm( int socket, const char * algorithm )
360{
361    if( algorithm && *algorithm )
362    {
363        const int rc = tr_netSetCongestionControl( socket, algorithm );
364
365        if( rc < 0 )
366            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
367                     algorithm, tr_strerror( errno ));
368    }
369}
370
371/* UTP callbacks */
372
373static void
374utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
375{
376    int rc;
377    tr_peerIo *io = (tr_peerIo *)closure;
378    assert( tr_isPeerIo( io ) );
379
380    tr_ndbg( "UTP", "On read: %ld", (long)buflen );
381
382    rc = evbuffer_add( io->inbuf, buf, buflen );
383    if( rc < 0 ) {
384        tr_nerr( "UTP", "On read evbuffer_add" );
385        return;
386    }
387
388    tr_peerIoSetEnabled( io, TR_DOWN, TRUE );
389    canReadWrapper( io );
390}
391
392static void
393utp_on_write(void *closure, unsigned char *buf, size_t buflen)
394{
395    tr_peerIo *io = (tr_peerIo *)closure;
396    int rc;
397
398    assert( tr_isPeerIo( io ) );
399    tr_ndbg( "UTP", "On write: %ld", (long)buflen );
400
401    rc = evbuffer_remove( io->outbuf, buf, buflen );
402    if( rc < (long)buflen ) {
403        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
404    }
405}
406
407static size_t
408utp_get_rb_size(void *closure)
409{
410    tr_peerIo *io = (tr_peerIo *)closure;
411    size_t bytes;
412    assert( tr_isPeerIo( io ) );
413
414    if( io->read_enabled )
415        bytes =
416            tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
417    else
418        bytes = 0;
419
420    tr_ndbg( "UTP", "Get RB size %ld", (long)bytes);
421
422    return UTP_READ_BUFFER_SIZE - bytes;
423}
424
425static void
426utp_on_state_change(void *closure, int state)
427{
428    tr_peerIo *io = (tr_peerIo *)closure;
429    assert( tr_isPeerIo( io ) );
430
431    tr_ndbg( "UTP", "On state change: %d", state );
432
433    if( state == UTP_STATE_CONNECT || state == UTP_STATE_WRITABLE ) {
434        size_t count = evbuffer_get_length( io->outbuf );
435        if( count > 0 )
436            UTP_Write( io->utp_socket, count );
437    } else if( state == UTP_STATE_EOF ) {
438        if( io->gotError )
439            io->gotError( io, BEV_EVENT_EOF, io->userData );
440    } else if( state == UTP_STATE_DESTROYING ) {
441        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
442        return;
443    } else {
444        tr_nerr( "UTP", "Unknown state %d", state );
445    }
446}
447
448static void
449utp_on_error(void *closure, int errcode)
450{
451    tr_peerIo *io = (tr_peerIo *)closure;
452    assert( tr_isPeerIo( io ) );
453
454    tr_ndbg( "UTP", "Error callback: %s", tr_strerror( errcode ) );
455
456    if( io->gotError ) {
457        errno = errcode;
458        io->gotError( io, BEV_EVENT_ERROR, io->userData );
459    }
460}
461
462static void
463utp_on_overhead(void *closure, bool send, size_t count, int type)
464{
465    tr_peerIo *io = (tr_peerIo *)closure;
466    assert( tr_isPeerIo( io ) );
467
468    tr_ndbg( "UTP", "On overhead: %d %ld %d", (int)send, (long)count, type );
469    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
470                      count, FALSE, tr_time_msec() );
471}
472
473static struct UTPFunctionTable utp_function_table = {
474    .on_read = utp_on_read,
475    .on_write = utp_on_write,
476    .get_rb_size = utp_get_rb_size,
477    .on_state = utp_on_state_change,
478    .on_error = utp_on_error,
479    .on_overhead = utp_on_overhead
480};
481
482/* Dummy UTP callbacks. */
483/* We switch a UTP socket to use these after the associated peerIo has been
484   destroyed -- see io_dtor. */
485
486static void
487dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
488{
489    /* This cannot happen, as far as I'm aware. */
490    tr_nerr( "UTP", "On_read called on closed socket" );
491
492}
493
494static void
495dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
496{
497    /* This can very well happen if we've shut down a peer connection that
498       had unflushed buffers.  Complain and send zeroes. */
499    tr_ndbg( "UTP", "On_write called on closed socket" );
500    memset( buf, 0, buflen );
501}
502
503static size_t
504dummy_get_rb_size( void * closure UNUSED )
505{
506    return 0;
507}
508
509static void
510dummy_on_state_change(void * closure UNUSED, int state UNUSED )
511{
512    return;
513}
514
515static void
516dummy_on_error( void * closure UNUSED, int errcode UNUSED )
517{
518    return;
519}
520
521static void
522dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
523{
524    return;
525}
526
527static struct UTPFunctionTable dummy_utp_function_table = {
528    .on_read = dummy_read,
529    .on_write = dummy_write,
530    .get_rb_size = dummy_get_rb_size,
531    .on_state = dummy_on_state_change,
532    .on_error = dummy_on_error,
533    .on_overhead = dummy_on_overhead
534};
535
536static tr_peerIo*
537tr_peerIoNew( tr_session       * session,
538              tr_bandwidth     * parent,
539              const tr_address * addr,
540              tr_port            port,
541              const uint8_t    * torrentHash,
542              tr_bool            isIncoming,
543              tr_bool            isSeed,
544              int                socket,
545              struct UTPSocket * utp_socket)
546{
547    tr_peerIo * io;
548
549    assert( session != NULL );
550    assert( session->events != NULL );
551    assert( tr_isBool( isIncoming ) );
552    assert( tr_isBool( isSeed ) );
553    assert( tr_amInEventThread( session ) );
554    assert( (socket < 0) == (utp_socket != NULL) );
555
556    if( socket >= 0 ) {
557        tr_netSetTOS( socket, session->peerSocketTOS );
558        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
559    }
560
561    io = tr_new0( tr_peerIo, 1 );
562    io->magicNumber = MAGIC_NUMBER;
563    io->refCount = 1;
564    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
565    io->session = session;
566    io->addr = *addr;
567    io->isSeed = isSeed;
568    io->port = port;
569    io->socket = socket;
570    io->utp_socket = utp_socket;
571    io->isIncoming = isIncoming != 0;
572    io->timeCreated = tr_time( );
573    io->inbuf = evbuffer_new( );
574    io->outbuf = evbuffer_new( );
575    tr_bandwidthConstruct( &io->bandwidth, session, parent );
576    tr_bandwidthSetPeer( &io->bandwidth, io );
577    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
578
579    if( io->socket >= 0 ) {
580        io->event_read = event_new( session->event_base,
581                                    io->socket, EV_READ, event_read_cb, io );
582        io->event_write = event_new( session->event_base,
583                                     io->socket, EV_WRITE, event_write_cb, io );
584    } else {
585        tr_ndbg( "UTP", "New %s connection",
586                 isIncoming ? "incoming" : "outgoing" );
587        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
588        UTP_SetCallbacks( utp_socket,
589                          &utp_function_table,
590                          io );
591       
592    }
593
594    io->write_enabled = 1;
595    io->read_enabled = 1;
596
597    return io;
598}
599
600tr_peerIo*
601tr_peerIoNewIncoming( tr_session        * session,
602                      tr_bandwidth      * parent,
603                      const tr_address  * addr,
604                      tr_port             port,
605                      int                 fd,
606                      struct UTPSocket  * utp_socket )
607{
608    assert( session );
609    assert( tr_isAddress( addr ) );
610
611    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, 
612                         fd, utp_socket );
613}
614
615tr_peerIo*
616tr_peerIoNewOutgoing( tr_session        * session,
617                      tr_bandwidth      * parent,
618                      const tr_address  * addr,
619                      tr_port             port,
620                      const uint8_t     * torrentHash,
621                      tr_bool             isSeed )
622{
623    int fd;
624
625    assert( session );
626    assert( tr_isAddress( addr ) );
627    assert( torrentHash );
628
629    fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
630    dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
631
632    return fd < 0 ? NULL
633                  : tr_peerIoNew( session, parent, addr, port,
634                                  torrentHash, FALSE, isSeed, fd, NULL );
635}
636
637/***
638****
639***/
640
641static void
642event_enable( tr_peerIo * io, short event )
643{
644    assert( tr_amInEventThread( io->session ) );
645    assert( io->session != NULL );
646    assert( io->session->events != NULL );
647
648    if( io->socket < 0 )
649        return;
650
651    assert( io->session->events != NULL );
652    assert( event_initialized( io->event_read ) );
653    assert( event_initialized( io->event_write ) );
654
655    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
656    {
657        dbgmsg( io, "enabling libevent ready-to-read polling" );
658        event_add( io->event_read, NULL );
659        io->pendingEvents |= EV_READ;
660    }
661
662    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
663    {
664        dbgmsg( io, "enabling libevent ready-to-write polling" );
665        event_add( io->event_write, NULL );
666        io->pendingEvents |= EV_WRITE;
667    }
668}
669
670static void
671event_disable( struct tr_peerIo * io, short event )
672{
673    assert( tr_amInEventThread( io->session ) );
674    assert( io->session != NULL );
675
676    if( io->socket < 0 )
677        return;
678
679    assert( io->session->events != NULL );
680    assert( event_initialized( io->event_read ) );
681    assert( event_initialized( io->event_write ) );
682
683    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
684    {
685        dbgmsg( io, "disabling libevent ready-to-read polling" );
686        event_del( io->event_read );
687        io->pendingEvents &= ~EV_READ;
688    }
689
690    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
691    {
692        dbgmsg( io, "disabling libevent ready-to-write polling" );
693        event_del( io->event_write );
694        io->pendingEvents &= ~EV_WRITE;
695    }
696}
697
698void
699tr_peerIoSetEnabled( tr_peerIo    * io,
700                     tr_direction   dir,
701                     tr_bool        isEnabled )
702{
703    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
704
705    assert( tr_isPeerIo( io ) );
706    assert( tr_isDirection( dir ) );
707    assert( tr_amInEventThread( io->session ) );
708    assert( io->session->events != NULL );
709
710    if( isEnabled )
711        event_enable( io, event );
712    else
713        event_disable( io, event );
714
715    if( dir == TR_UP )
716        io->write_enabled = isEnabled;
717    else if( dir == TR_DOWN ) {
718        io->read_enabled = isEnabled;
719        if( io->utp_socket && isEnabled )
720            UTP_RBDrained(io->utp_socket);
721    }
722}
723
724/***
725****
726***/
727
728static void
729io_dtor( void * vio )
730{
731    tr_peerIo * io = vio;
732
733    assert( tr_isPeerIo( io ) );
734    assert( tr_amInEventThread( io->session ) );
735    assert( io->session->events != NULL );
736
737    dbgmsg( io, "in tr_peerIo destructor" );
738    event_disable( io, EV_READ | EV_WRITE );
739    tr_bandwidthDestruct( &io->bandwidth );
740    evbuffer_free( io->outbuf );
741    evbuffer_free( io->inbuf );
742    if( io->socket >= 0 ) {
743        event_free( io->event_read );
744        event_free( io->event_write );
745        tr_netClose( io->session, io->socket );
746    }
747    if( io->utp_socket != NULL ) {
748        tr_ndbg( "UTP", "Destroying connection");
749        UTP_SetCallbacks( io->utp_socket,
750                          &dummy_utp_function_table,
751                          NULL );
752        UTP_Close( io->utp_socket );
753    }
754    tr_cryptoFree( io->crypto );
755    tr_list_free( &io->outbuf_datatypes, tr_free );
756
757    memset( io, ~0, sizeof( tr_peerIo ) );
758    tr_free( io );
759}
760
761static void
762tr_peerIoFree( tr_peerIo * io )
763{
764    if( io )
765    {
766        dbgmsg( io, "in tr_peerIoFree" );
767        io->canRead = NULL;
768        io->didWrite = NULL;
769        io->gotError = NULL;
770        tr_runInEventThread( io->session, io_dtor, io );
771    }
772}
773
774void
775tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
776{
777    assert( tr_isPeerIo( io ) );
778
779    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
780                file, line, io->refCount, io->refCount+1 );
781
782    ++io->refCount;
783}
784
785void
786tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
787{
788    assert( tr_isPeerIo( io ) );
789
790    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
791                file, line, io->refCount, io->refCount-1 );
792
793    if( !--io->refCount )
794        tr_peerIoFree( io );
795}
796
797const tr_address*
798tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
799{
800    assert( tr_isPeerIo( io ) );
801
802    if( port )
803        *port = io->port;
804
805    return &io->addr;
806}
807
808const char*
809tr_peerIoAddrStr( const tr_address * addr, tr_port port )
810{
811    static char buf[512];
812
813    if( addr->type == TR_AF_INET )
814        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
815    else
816        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
817    return buf;
818}
819
820const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
821{
822    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
823}
824
825void
826tr_peerIoSetIOFuncs( tr_peerIo        * io,
827                     tr_can_read_cb     readcb,
828                     tr_did_write_cb    writecb,
829                     tr_net_error_cb    errcb,
830                     void             * userData )
831{
832    io->canRead = readcb;
833    io->didWrite = writecb;
834    io->gotError = errcb;
835    io->userData = userData;
836}
837
838void
839tr_peerIoClear( tr_peerIo * io )
840{
841    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
842    tr_peerIoSetEnabled( io, TR_UP, FALSE );
843    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
844}
845
846int
847tr_peerIoReconnect( tr_peerIo * io )
848{
849    short int pendingEvents;
850    tr_session * session;
851
852    assert( tr_isPeerIo( io ) );
853    assert( !tr_peerIoIsIncoming( io ) );
854
855    session = tr_peerIoGetSession( io );
856
857    pendingEvents = io->pendingEvents;
858    event_disable( io, EV_READ | EV_WRITE );
859
860    if( io->socket >= 0 ) {
861        tr_netClose( session, io->socket );
862        io->socket = -1;
863    }
864    if( io->utp_socket != NULL ) {
865        UTP_SetCallbacks( io->utp_socket,
866                          &dummy_utp_function_table,
867                          NULL );
868        UTP_Close(io->utp_socket);
869        io->utp_socket = NULL;
870    }
871
872    event_free( io->event_read );
873    event_free( io->event_write );
874    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
875    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
876    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
877
878    if( io->socket >= 0 )
879    {
880        event_enable( io, pendingEvents );
881        tr_netSetTOS( io->socket, session->peerSocketTOS );
882        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
883        return 0;
884    }
885
886    return -1;
887}
888
889/**
890***
891**/
892
893void
894tr_peerIoSetTorrentHash( tr_peerIo *     io,
895                         const uint8_t * hash )
896{
897    assert( tr_isPeerIo( io ) );
898
899    tr_cryptoSetTorrentHash( io->crypto, hash );
900}
901
902const uint8_t*
903tr_peerIoGetTorrentHash( tr_peerIo * io )
904{
905    assert( tr_isPeerIo( io ) );
906    assert( io->crypto );
907
908    return tr_cryptoGetTorrentHash( io->crypto );
909}
910
911int
912tr_peerIoHasTorrentHash( const tr_peerIo * io )
913{
914    assert( tr_isPeerIo( io ) );
915    assert( io->crypto );
916
917    return tr_cryptoHasTorrentHash( io->crypto );
918}
919
920/**
921***
922**/
923
924void
925tr_peerIoSetPeersId( tr_peerIo *     io,
926                     const uint8_t * peer_id )
927{
928    assert( tr_isPeerIo( io ) );
929
930    if( ( io->peerIdIsSet = peer_id != NULL ) )
931        memcpy( io->peerId, peer_id, 20 );
932    else
933        memset( io->peerId, 0, 20 );
934}
935
936/**
937***
938**/
939
940static unsigned int
941getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
942{
943    /* this is all kind of arbitrary, but what seems to work well is
944     * being large enough to hold the next 20 seconds' worth of input,
945     * or a few blocks, whichever is bigger.
946     * It's okay to tweak this as needed */
947    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
948    const unsigned int period = 15u; /* arbitrary */
949    /* the 3 is arbitrary; the .5 is to leave room for messages */
950    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
951    return MAX( ceiling, currentSpeed_Bps*period );
952}
953
954size_t
955tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
956{
957    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
958    const size_t currentLen = evbuffer_get_length( io->outbuf );
959    size_t freeSpace = 0;
960
961    if( desiredLen > currentLen )
962        freeSpace = desiredLen - currentLen;
963
964    return freeSpace;
965}
966
967/**
968***
969**/
970
971void
972tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
973{
974    assert( tr_isPeerIo( io ) );
975    assert( encryptionMode == PEER_ENCRYPTION_NONE
976         || encryptionMode == PEER_ENCRYPTION_RC4 );
977
978    io->encryptionMode = encryptionMode;
979}
980
981/**
982***
983**/
984
985static void
986addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
987{
988    struct tr_datatype * d;
989
990    d = tr_new( struct tr_datatype, 1 );
991    d->isPieceData = isPieceData != 0;
992    d->length = byteCount;
993    tr_list_append( &io->outbuf_datatypes, d );
994}
995
996static struct evbuffer_iovec *
997evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
998{
999    const size_t byteCount = evbuffer_get_length( buf );
1000    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
1001    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
1002    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
1003    assert( n == vecCount );
1004    *setme_vecCount = n;
1005    return iovec;
1006}
1007
1008static void
1009maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
1010{
1011    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
1012    {
1013        size_t i, n;
1014        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
1015
1016        for( i=0; i<n; ++i )
1017            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
1018
1019        tr_free( iovec );
1020    }
1021}
1022
1023void
1024tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
1025{
1026    const size_t byteCount = evbuffer_get_length( buf );
1027    maybeEncryptBuffer( io, buf );
1028    evbuffer_add_buffer( io->outbuf, buf );
1029    addDatatype( io, byteCount, isPieceData );
1030    if( io->utp_socket )
1031        UTP_Write( io->utp_socket, byteCount );
1032}
1033
1034void
1035tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
1036{
1037    struct evbuffer * buf = evbuffer_new( );
1038    evbuffer_add( buf, bytes, byteCount );
1039    tr_peerIoWriteBuf( io, buf, isPieceData );
1040    evbuffer_free( buf );
1041}
1042
1043/***
1044****
1045***/
1046
1047void
1048evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1049{
1050    evbuffer_add( outbuf, &byte, 1 );
1051}
1052
1053void
1054evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1055{
1056    const uint16_t ns = htons( addme_hs );
1057    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1058}
1059
1060void
1061evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1062{
1063    const uint32_t nl = htonl( addme_hl );
1064    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1065}
1066
1067/***
1068****
1069***/
1070
1071void
1072tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1073{
1074    assert( tr_isPeerIo( io ) );
1075    assert( evbuffer_get_length( inbuf )  >= byteCount );
1076
1077    switch( io->encryptionMode )
1078    {
1079        case PEER_ENCRYPTION_NONE:
1080            evbuffer_remove( inbuf, bytes, byteCount );
1081            break;
1082
1083        case PEER_ENCRYPTION_RC4:
1084            evbuffer_remove( inbuf, bytes, byteCount );
1085            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1086            break;
1087
1088        default:
1089            assert( 0 );
1090    }
1091}
1092
1093void
1094tr_peerIoReadUint16( tr_peerIo        * io,
1095                     struct evbuffer  * inbuf,
1096                     uint16_t         * setme )
1097{
1098    uint16_t tmp;
1099    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1100    *setme = ntohs( tmp );
1101}
1102
1103void tr_peerIoReadUint32( tr_peerIo        * io,
1104                          struct evbuffer  * inbuf,
1105                          uint32_t         * setme )
1106{
1107    uint32_t tmp;
1108    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1109    *setme = ntohl( tmp );
1110}
1111
1112void
1113tr_peerIoDrain( tr_peerIo       * io,
1114                struct evbuffer * inbuf,
1115                size_t            byteCount )
1116{
1117    void * buf = tr_sessionGetBuffer( io->session );
1118    const size_t buflen = SESSION_BUFFER_SIZE;
1119
1120    while( byteCount > 0 )
1121    {
1122        const size_t thisPass = MIN( byteCount, buflen );
1123        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1124        byteCount -= thisPass;
1125    }
1126
1127    tr_sessionReleaseBuffer( io->session );
1128}
1129
1130/***
1131****
1132***/
1133
1134static int
1135tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1136{
1137    int res = 0;
1138
1139    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1140    {
1141        int e;
1142
1143        EVUTIL_SET_SOCKET_ERROR( 0 );
1144        res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1145        e = EVUTIL_SOCKET_ERROR( );
1146
1147        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1148
1149        if( evbuffer_get_length( io->inbuf ) )
1150            canReadWrapper( io );
1151
1152        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1153        {
1154            char errstr[512];
1155            short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1156            if( res == 0 )
1157                what |= BEV_EVENT_EOF;
1158            tr_net_strerror( errstr, sizeof( errstr ), e );
1159            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1160            io->gotError( io, what, io->userData );
1161        }
1162    }
1163
1164    return res;
1165}
1166
1167static int
1168tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1169{
1170    int n = 0;
1171
1172    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1173    {
1174        int e;
1175        EVUTIL_SET_SOCKET_ERROR( 0 );
1176        n = tr_evbuffer_write( io, io->socket, howmuch );
1177        e = EVUTIL_SOCKET_ERROR( );
1178
1179        if( n > 0 )
1180            didWriteWrapper( io, n );
1181
1182        if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1183        {
1184            char errstr[512];
1185            const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1186
1187            tr_net_strerror( errstr, sizeof( errstr ), e );
1188            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1189
1190            if( io->gotError != NULL )
1191                io->gotError( io, what, io->userData );
1192        }
1193    }
1194
1195    return n;
1196}
1197
1198int
1199tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1200{
1201    int bytesUsed = 0;
1202
1203    assert( tr_isPeerIo( io ) );
1204    assert( tr_isDirection( dir ) );
1205
1206    if( dir == TR_DOWN )
1207        bytesUsed = tr_peerIoTryRead( io, limit );
1208    else
1209        bytesUsed = tr_peerIoTryWrite( io, limit );
1210
1211    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1212    return bytesUsed;
1213}
1214
1215int
1216tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1217{
1218    size_t byteCount = 0;
1219    tr_list * it;
1220
1221    /* count up how many bytes are used by non-piece-data messages
1222       at the front of our outbound queue */
1223    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1224    {
1225        struct tr_datatype * d = it->data;
1226
1227        if( d->isPieceData )
1228            break;
1229
1230        byteCount += d->length;
1231    }
1232
1233    return tr_peerIoFlush( io, TR_UP, byteCount );
1234}
Note: See TracBrowser for help on using the repository browser.