source: trunk/libtransmission/peer-io.c @ 11927

Last change on this file since 11927 was 11927, checked in by jch, 11 years ago

Compute accurate overhead for uTP packets.

  • Property svn:keywords set to Date Rev Author Id
File size: 31.2 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11927 2011-02-18 00:36:02Z jch $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28#include <libutp/utp.h>
29
30#include "transmission.h"
31#include "session.h"
32#include "bandwidth.h"
33#include "crypto.h"
34#include "list.h"
35#include "net.h"
36#include "peer-common.h" /* MAX_BLOCK_SIZE */
37#include "peer-io.h"
38#include "trevent.h" /* tr_runInEventThread() */
39#include "utils.h"
40
41#define MAGIC_NUMBER 206745
42
43#ifdef WIN32
44 #define EAGAIN       WSAEWOULDBLOCK
45 #define EINTR        WSAEINTR
46 #define EINPROGRESS  WSAEINPROGRESS
47 #define EPIPE        WSAECONNRESET
48#endif
49
50static size_t
51guessPacketOverhead( size_t d )
52{
53    /**
54     * http://sd.wareonearth.com/~phil/net/overhead/
55     *
56     * TCP over Ethernet:
57     * Assuming no header compression (e.g. not PPP)
58     * Add 20 IPv4 header or 40 IPv6 header (no options)
59     * Add 20 TCP header
60     * Add 12 bytes optional TCP timestamps
61     * Max TCP Payload data rates over ethernet are thus:
62     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
63     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
64     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
65     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
66     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
67     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
68     */
69    const double assumed_payload_data_rate = 94.0;
70
71    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
72}
73
74/**
75***
76**/
77
78#define dbgmsg( io, ... ) \
79    do { \
80        if( tr_deepLoggingIsActive( ) ) \
81            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
82    } while( 0 )
83
84struct tr_datatype
85{
86    tr_bool  isPieceData;
87    size_t   length;
88};
89
90
91/***
92****
93***/
94
95static void
96didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
97{
98     while( bytes_transferred && tr_isPeerIo( io ) )
99     {
100        struct tr_datatype * next = io->outbuf_datatypes->data;
101
102        const unsigned int payload = MIN( next->length, bytes_transferred );
103        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
104        const unsigned int overhead =
105            io->socket ? guessPacketOverhead( payload ) : 0;
106        const uint64_t now = tr_sessionGetTimeMsec( io->session );
107
108        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
109
110        if( overhead > 0 )
111            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
112
113        if( io->didWrite )
114            io->didWrite( io, payload, next->isPieceData, io->userData );
115
116        if( tr_isPeerIo( io ) )
117        {
118            bytes_transferred -= payload;
119            next->length -= payload;
120            if( !next->length ) {
121                tr_list_pop_front( &io->outbuf_datatypes );
122                tr_free( next );
123            }
124        }
125    }
126}
127
128static void
129canReadWrapper( tr_peerIo * io )
130{
131    tr_bool err = 0;
132    tr_bool done = 0;
133    tr_session * session;
134
135    dbgmsg( io, "canRead" );
136
137    assert( tr_isPeerIo( io ) );
138    assert( tr_isSession( io->session ) );
139    tr_peerIoRef( io );
140
141    session = io->session;
142
143    /* try to consume the input buffer */
144    if( io->canRead )
145    {
146        const uint64_t now = tr_sessionGetTimeMsec( io->session );
147
148        tr_sessionLock( session );
149
150        while( !done && !err )
151        {
152            size_t piece = 0;
153            const size_t oldLen = evbuffer_get_length( io->inbuf );
154            const int ret = io->canRead( io, io->userData, &piece );
155            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
156            const unsigned int overhead = guessPacketOverhead( used );
157
158            assert( tr_isPeerIo( io ) );
159
160            if( piece || (piece!=used) )
161            {
162                if( piece )
163                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
164
165                if( used != piece )
166                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
167            }
168
169            if( overhead > 0 )
170                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
171
172            switch( ret )
173            {
174                case READ_NOW:
175                    if( evbuffer_get_length( io->inbuf ) )
176                        continue;
177                    done = 1;
178                    break;
179
180                case READ_LATER:
181                    done = 1;
182                    break;
183
184                case READ_ERR:
185                    err = 1;
186                    break;
187            }
188
189            assert( tr_isPeerIo( io ) );
190        }
191
192        tr_sessionUnlock( session );
193    }
194
195    assert( tr_isPeerIo( io ) );
196    tr_peerIoUnref( io );
197}
198
199tr_bool
200tr_isPeerIo( const tr_peerIo * io )
201{
202    return ( io != NULL )
203        && ( io->magicNumber == MAGIC_NUMBER )
204        && ( io->refCount >= 0 )
205        && ( tr_isBandwidth( &io->bandwidth ) )
206        && ( tr_isAddress( &io->addr ) );
207}
208
209static void
210event_read_cb( int fd, short event UNUSED, void * vio )
211{
212    int res;
213    int e;
214    tr_peerIo * io = vio;
215
216    /* Limit the input buffer to 256K, so it doesn't grow too large */
217    unsigned int howmuch;
218    unsigned int curlen;
219    const tr_direction dir = TR_DOWN;
220    const unsigned int max = 256 * 1024;
221
222    assert( tr_isPeerIo( io ) );
223    assert( io->socket >= 0 );
224
225    io->pendingEvents &= ~EV_READ;
226
227    curlen = evbuffer_get_length( io->inbuf );
228    howmuch = curlen >= max ? 0 : max - curlen;
229    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
230
231    dbgmsg( io, "libevent says this peer is ready to read" );
232
233    /* if we don't have any bandwidth left, stop reading */
234    if( howmuch < 1 ) {
235        tr_peerIoSetEnabled( io, dir, FALSE );
236        return;
237    }
238
239    EVUTIL_SET_SOCKET_ERROR( 0 );
240    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
241    e = EVUTIL_SOCKET_ERROR( );
242
243    if( res > 0 )
244    {
245        tr_peerIoSetEnabled( io, dir, TRUE );
246
247        /* Invoke the user callback - must always be called last */
248        canReadWrapper( io );
249    }
250    else
251    {
252        char errstr[512];
253        short what = BEV_EVENT_READING;
254
255        if( res == 0 ) /* EOF */
256            what |= BEV_EVENT_EOF;
257        else if( res == -1 ) {
258            if( e == EAGAIN || e == EINTR ) {
259                tr_peerIoSetEnabled( io, dir, TRUE );
260                return;
261            }
262            what |= BEV_EVENT_ERROR;
263        }
264
265        tr_net_strerror( errstr, sizeof( errstr ), e );
266        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
267
268        if( io->gotError != NULL )
269            io->gotError( io, what, io->userData );
270    }
271}
272
273static int
274tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
275{
276    int e;
277    int n;
278    char errstr[256];
279
280    EVUTIL_SET_SOCKET_ERROR( 0 );
281    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
282    e = EVUTIL_SOCKET_ERROR( );
283    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
284
285    return n;
286}
287
288static void
289event_write_cb( int fd, short event UNUSED, void * vio )
290{
291    int res = 0;
292    int e;
293    short what = BEV_EVENT_WRITING;
294    tr_peerIo * io = vio;
295    size_t howmuch;
296    const tr_direction dir = TR_UP;
297
298    assert( tr_isPeerIo( io ) );
299    assert( io->socket >= 0 );
300
301    io->pendingEvents &= ~EV_WRITE;
302
303    dbgmsg( io, "libevent says this peer is ready to write" );
304
305    /* Write as much as possible, since the socket is non-blocking, write() will
306     * return if it can't write any more data without blocking */
307    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
308
309    /* if we don't have any bandwidth left, stop writing */
310    if( howmuch < 1 ) {
311        tr_peerIoSetEnabled( io, dir, FALSE );
312        return;
313    }
314
315    EVUTIL_SET_SOCKET_ERROR( 0 );
316    res = tr_evbuffer_write( io, fd, howmuch );
317    e = EVUTIL_SOCKET_ERROR( );
318
319    if (res == -1) {
320        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
321            goto reschedule;
322        /* error case */
323        what |= BEV_EVENT_ERROR;
324    } else if (res == 0) {
325        /* eof case */
326        what |= BEV_EVENT_EOF;
327    }
328    if (res <= 0)
329        goto error;
330
331    if( evbuffer_get_length( io->outbuf ) )
332        tr_peerIoSetEnabled( io, dir, TRUE );
333
334    didWriteWrapper( io, res );
335    return;
336
337 reschedule:
338    if( evbuffer_get_length( io->outbuf ) )
339        tr_peerIoSetEnabled( io, dir, TRUE );
340    return;
341
342 error:
343
344    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
345
346    if( io->gotError != NULL )
347        io->gotError( io, what, io->userData );
348}
349
350/**
351***
352**/
353
354static void
355maybeSetCongestionAlgorithm( int socket, const char * algorithm )
356{
357    if( algorithm && *algorithm )
358    {
359        const int rc = tr_netSetCongestionControl( socket, algorithm );
360
361        if( rc < 0 )
362            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
363                     algorithm, tr_strerror( errno ));
364    }
365}
366
367/* UTP callbacks */
368
369static void
370utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
371{
372    int rc;
373    tr_peerIo *io = (tr_peerIo *)closure;
374    assert( tr_isPeerIo( io ) );
375
376    tr_ndbg( "UTP", "On read: %ld", (long)buflen );
377
378    rc = evbuffer_add( io->inbuf, buf, buflen );
379    if( rc < 0 ) {
380        tr_nerr( "UTP", "On read evbuffer_add" );
381        return;
382    }
383
384    tr_peerIoSetEnabled( io, TR_DOWN, TRUE );
385    canReadWrapper( io );
386}
387
388static void
389utp_on_write(void *closure, unsigned char *buf, size_t buflen)
390{
391    tr_peerIo *io = (tr_peerIo *)closure;
392    int rc;
393
394    assert( tr_isPeerIo( io ) );
395    tr_ndbg( "UTP", "On write: %ld", (long)buflen );
396
397    rc = evbuffer_remove( io->outbuf, buf, buflen );
398    if( rc < (long)buflen ) {
399        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
400    }
401}
402
403static size_t
404utp_get_rb_size(void *closure)
405{
406    tr_peerIo *io = (tr_peerIo *)closure;
407    assert( tr_isPeerIo( io ) );
408
409    tr_ndbg( "UTP", "Get RB size" );
410    return 0;
411}
412
413static void
414utp_on_state_change(void *closure, int state)
415{
416    tr_peerIo *io = (tr_peerIo *)closure;
417    assert( tr_isPeerIo( io ) );
418
419    tr_ndbg( "UTP", "On state change: %d", state );
420
421    if( state == UTP_STATE_CONNECT || state == UTP_STATE_WRITABLE ) {
422        size_t count = evbuffer_get_length( io->outbuf );
423        if( count > 0 )
424            UTP_Write( io->utp_socket, count );
425    } else if( state == UTP_STATE_EOF ) {
426        if( io->gotError )
427            io->gotError( io, BEV_EVENT_EOF, io->userData );
428    } else if( state == UTP_STATE_DESTROYING ) {
429        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
430        return;
431    } else {
432        tr_nerr( "UTP", "Unknown state %d", state );
433    }
434}
435
436static void
437utp_on_error(void *closure, int errcode)
438{
439    tr_peerIo *io = (tr_peerIo *)closure;
440    assert( tr_isPeerIo( io ) );
441
442    tr_ndbg( "UTP", "Error callback: %s", tr_strerror( errcode ) );
443
444    if( io->gotError ) {
445        errno = errcode;
446        io->gotError( io, BEV_EVENT_ERROR, io->userData );
447    }
448}
449
450static void
451utp_on_overhead(void *closure, bool send, size_t count, int type)
452{
453    tr_peerIo *io = (tr_peerIo *)closure;
454    assert( tr_isPeerIo( io ) );
455
456    tr_ndbg( "UTP", "On overhead: %d %ld %d", (int)send, (long)count, type );
457    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
458                      count, FALSE, tr_time_msec() );
459}
460
461static struct UTPFunctionTable utp_function_table = {
462    .on_read = utp_on_read,
463    .on_write = utp_on_write,
464    .get_rb_size = utp_get_rb_size,
465    .on_state = utp_on_state_change,
466    .on_error = utp_on_error,
467    .on_overhead = utp_on_overhead
468};
469
470/* Dummy UTP callbacks. */
471/* We switch a UTP socket to use these after the associated peerIo has been
472   destroyed -- see io_dtor. */
473
474static void
475dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
476{
477    /* This cannot happen, as far as I'm aware. */
478    tr_nerr( "UTP", "On_read called on closed socket" );
479
480}
481
482static void
483dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
484{
485    /* This can very well happen if we've shut down a peer connection that
486       had unflushed buffers.  Complain and send zeroes. */
487    tr_ndbg( "UTP", "On_write called on closed socket" );
488    memset( buf, 0, buflen );
489}
490
491static size_t
492dummy_get_rb_size( void * closure UNUSED )
493{
494    return 0;
495}
496
497static void
498dummy_on_state_change(void * closure UNUSED, int state UNUSED )
499{
500    return;
501}
502
503static void
504dummy_on_error( void * closure UNUSED, int errcode UNUSED )
505{
506    return;
507}
508
509static void
510dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
511{
512    return;
513}
514
515static struct UTPFunctionTable dummy_utp_function_table = {
516    .on_read = dummy_read,
517    .on_write = dummy_write,
518    .get_rb_size = dummy_get_rb_size,
519    .on_state = dummy_on_state_change,
520    .on_error = dummy_on_error,
521    .on_overhead = dummy_on_overhead
522};
523
524static tr_peerIo*
525tr_peerIoNew( tr_session       * session,
526              tr_bandwidth     * parent,
527              const tr_address * addr,
528              tr_port            port,
529              const uint8_t    * torrentHash,
530              tr_bool            isIncoming,
531              tr_bool            isSeed,
532              int                socket,
533              struct UTPSocket * utp_socket)
534{
535    tr_peerIo * io;
536
537    assert( session != NULL );
538    assert( session->events != NULL );
539    assert( tr_isBool( isIncoming ) );
540    assert( tr_isBool( isSeed ) );
541    assert( tr_amInEventThread( session ) );
542    assert( (socket < 0) == (utp_socket != NULL) );
543
544    if( socket >= 0 ) {
545        tr_netSetTOS( socket, session->peerSocketTOS );
546        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
547    }
548
549    io = tr_new0( tr_peerIo, 1 );
550    io->magicNumber = MAGIC_NUMBER;
551    io->refCount = 1;
552    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
553    io->session = session;
554    io->addr = *addr;
555    io->isSeed = isSeed;
556    io->port = port;
557    io->socket = socket;
558    io->utp_socket = utp_socket;
559    io->isIncoming = isIncoming != 0;
560    io->timeCreated = tr_time( );
561    io->inbuf = evbuffer_new( );
562    io->outbuf = evbuffer_new( );
563    tr_bandwidthConstruct( &io->bandwidth, session, parent );
564    tr_bandwidthSetPeer( &io->bandwidth, io );
565    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
566
567    if( io->socket >= 0 ) {
568        io->event_read = event_new( session->event_base,
569                                    io->socket, EV_READ, event_read_cb, io );
570        io->event_write = event_new( session->event_base,
571                                     io->socket, EV_WRITE, event_write_cb, io );
572    } else {
573        tr_ndbg( "UTP", "New %s connection",
574                 isIncoming ? "incoming" : "outgoing" );
575        UTP_SetCallbacks( utp_socket,
576                          &utp_function_table,
577                          io );
578    }
579
580    return io;
581}
582
583tr_peerIo*
584tr_peerIoNewIncoming( tr_session        * session,
585                      tr_bandwidth      * parent,
586                      const tr_address  * addr,
587                      tr_port             port,
588                      int                 fd,
589                      struct UTPSocket  * utp_socket )
590{
591    assert( session );
592    assert( tr_isAddress( addr ) );
593
594    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, 
595                         fd, utp_socket );
596}
597
598tr_peerIo*
599tr_peerIoNewOutgoing( tr_session        * session,
600                      tr_bandwidth      * parent,
601                      const tr_address  * addr,
602                      tr_port             port,
603                      const uint8_t     * torrentHash,
604                      tr_bool             isSeed )
605{
606    int fd;
607
608    assert( session );
609    assert( tr_isAddress( addr ) );
610    assert( torrentHash );
611
612    fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
613    dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
614
615    return fd < 0 ? NULL
616                  : tr_peerIoNew( session, parent, addr, port,
617                                  torrentHash, FALSE, isSeed, fd, NULL );
618}
619
620/***
621****
622***/
623
624static void
625event_enable( tr_peerIo * io, short event )
626{
627    assert( tr_amInEventThread( io->session ) );
628    assert( io->session != NULL );
629    assert( io->session->events != NULL );
630
631    if( io->socket < 0 )
632        return;
633
634    assert( io->session->events != NULL );
635    assert( event_initialized( io->event_read ) );
636    assert( event_initialized( io->event_write ) );
637
638    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
639    {
640        dbgmsg( io, "enabling libevent ready-to-read polling" );
641        event_add( io->event_read, NULL );
642        io->pendingEvents |= EV_READ;
643    }
644
645    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
646    {
647        dbgmsg( io, "enabling libevent ready-to-write polling" );
648        event_add( io->event_write, NULL );
649        io->pendingEvents |= EV_WRITE;
650    }
651}
652
653static void
654event_disable( struct tr_peerIo * io, short event )
655{
656    assert( tr_amInEventThread( io->session ) );
657    assert( io->session != NULL );
658
659    if( io->socket < 0 )
660        return;
661
662    assert( io->session->events != NULL );
663    assert( event_initialized( io->event_read ) );
664    assert( event_initialized( io->event_write ) );
665
666    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
667    {
668        dbgmsg( io, "disabling libevent ready-to-read polling" );
669        event_del( io->event_read );
670        io->pendingEvents &= ~EV_READ;
671    }
672
673    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
674    {
675        dbgmsg( io, "disabling libevent ready-to-write polling" );
676        event_del( io->event_write );
677        io->pendingEvents &= ~EV_WRITE;
678    }
679}
680
681void
682tr_peerIoSetEnabled( tr_peerIo    * io,
683                     tr_direction   dir,
684                     tr_bool        isEnabled )
685{
686    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
687
688    assert( tr_isPeerIo( io ) );
689    assert( tr_isDirection( dir ) );
690    assert( tr_amInEventThread( io->session ) );
691    assert( io->session->events != NULL );
692
693    if( isEnabled )
694        event_enable( io, event );
695    else
696        event_disable( io, event );
697}
698
699/***
700****
701***/
702
703static void
704io_dtor( void * vio )
705{
706    tr_peerIo * io = vio;
707
708    assert( tr_isPeerIo( io ) );
709    assert( tr_amInEventThread( io->session ) );
710    assert( io->session->events != NULL );
711
712    dbgmsg( io, "in tr_peerIo destructor" );
713    event_disable( io, EV_READ | EV_WRITE );
714    tr_bandwidthDestruct( &io->bandwidth );
715    evbuffer_free( io->outbuf );
716    evbuffer_free( io->inbuf );
717    if( io->socket >= 0 ) {
718        event_free( io->event_read );
719        event_free( io->event_write );
720        tr_netClose( io->session, io->socket );
721    }
722    if( io->utp_socket != NULL ) {
723        tr_ndbg( "UTP", "Destroying connection");
724        UTP_SetCallbacks( io->utp_socket,
725                          &dummy_utp_function_table,
726                          NULL );
727        UTP_Close( io->utp_socket );
728    }
729    tr_cryptoFree( io->crypto );
730    tr_list_free( &io->outbuf_datatypes, tr_free );
731
732    memset( io, ~0, sizeof( tr_peerIo ) );
733    tr_free( io );
734}
735
736static void
737tr_peerIoFree( tr_peerIo * io )
738{
739    if( io )
740    {
741        dbgmsg( io, "in tr_peerIoFree" );
742        io->canRead = NULL;
743        io->didWrite = NULL;
744        io->gotError = NULL;
745        tr_runInEventThread( io->session, io_dtor, io );
746    }
747}
748
749void
750tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
751{
752    assert( tr_isPeerIo( io ) );
753
754    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
755                file, line, io->refCount, io->refCount+1 );
756
757    ++io->refCount;
758}
759
760void
761tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
762{
763    assert( tr_isPeerIo( io ) );
764
765    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
766                file, line, io->refCount, io->refCount-1 );
767
768    if( !--io->refCount )
769        tr_peerIoFree( io );
770}
771
772const tr_address*
773tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
774{
775    assert( tr_isPeerIo( io ) );
776
777    if( port )
778        *port = io->port;
779
780    return &io->addr;
781}
782
783const char*
784tr_peerIoAddrStr( const tr_address * addr, tr_port port )
785{
786    static char buf[512];
787
788    if( addr->type == TR_AF_INET )
789        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
790    else
791        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
792    return buf;
793}
794
795const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
796{
797    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
798}
799
800void
801tr_peerIoSetIOFuncs( tr_peerIo        * io,
802                     tr_can_read_cb     readcb,
803                     tr_did_write_cb    writecb,
804                     tr_net_error_cb    errcb,
805                     void             * userData )
806{
807    io->canRead = readcb;
808    io->didWrite = writecb;
809    io->gotError = errcb;
810    io->userData = userData;
811}
812
813void
814tr_peerIoClear( tr_peerIo * io )
815{
816    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
817    tr_peerIoSetEnabled( io, TR_UP, FALSE );
818    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
819}
820
821int
822tr_peerIoReconnect( tr_peerIo * io )
823{
824    short int pendingEvents;
825    tr_session * session;
826
827    assert( tr_isPeerIo( io ) );
828    assert( !tr_peerIoIsIncoming( io ) );
829
830    session = tr_peerIoGetSession( io );
831
832    pendingEvents = io->pendingEvents;
833    event_disable( io, EV_READ | EV_WRITE );
834
835    if( io->socket >= 0 ) {
836        tr_netClose( session, io->socket );
837        io->socket = -1;
838    }
839    if( io->utp_socket != NULL ) {
840        UTP_SetCallbacks( io->utp_socket,
841                          &dummy_utp_function_table,
842                          NULL );
843        UTP_Close(io->utp_socket);
844        io->utp_socket = NULL;
845    }
846
847    event_free( io->event_read );
848    event_free( io->event_write );
849    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
850    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
851    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
852
853    if( io->socket >= 0 )
854    {
855        event_enable( io, pendingEvents );
856        tr_netSetTOS( io->socket, session->peerSocketTOS );
857        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
858        return 0;
859    }
860
861    return -1;
862}
863
864/**
865***
866**/
867
868void
869tr_peerIoSetTorrentHash( tr_peerIo *     io,
870                         const uint8_t * hash )
871{
872    assert( tr_isPeerIo( io ) );
873
874    tr_cryptoSetTorrentHash( io->crypto, hash );
875}
876
877const uint8_t*
878tr_peerIoGetTorrentHash( tr_peerIo * io )
879{
880    assert( tr_isPeerIo( io ) );
881    assert( io->crypto );
882
883    return tr_cryptoGetTorrentHash( io->crypto );
884}
885
886int
887tr_peerIoHasTorrentHash( const tr_peerIo * io )
888{
889    assert( tr_isPeerIo( io ) );
890    assert( io->crypto );
891
892    return tr_cryptoHasTorrentHash( io->crypto );
893}
894
895/**
896***
897**/
898
899void
900tr_peerIoSetPeersId( tr_peerIo *     io,
901                     const uint8_t * peer_id )
902{
903    assert( tr_isPeerIo( io ) );
904
905    if( ( io->peerIdIsSet = peer_id != NULL ) )
906        memcpy( io->peerId, peer_id, 20 );
907    else
908        memset( io->peerId, 0, 20 );
909}
910
911/**
912***
913**/
914
915static unsigned int
916getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
917{
918    /* this is all kind of arbitrary, but what seems to work well is
919     * being large enough to hold the next 20 seconds' worth of input,
920     * or a few blocks, whichever is bigger.
921     * It's okay to tweak this as needed */
922    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
923    const unsigned int period = 15u; /* arbitrary */
924    /* the 3 is arbitrary; the .5 is to leave room for messages */
925    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
926    return MAX( ceiling, currentSpeed_Bps*period );
927}
928
929size_t
930tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
931{
932    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
933    const size_t currentLen = evbuffer_get_length( io->outbuf );
934    size_t freeSpace = 0;
935
936    if( desiredLen > currentLen )
937        freeSpace = desiredLen - currentLen;
938
939    return freeSpace;
940}
941
942/**
943***
944**/
945
946void
947tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
948{
949    assert( tr_isPeerIo( io ) );
950    assert( encryptionMode == PEER_ENCRYPTION_NONE
951         || encryptionMode == PEER_ENCRYPTION_RC4 );
952
953    io->encryptionMode = encryptionMode;
954}
955
956/**
957***
958**/
959
960static void
961addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
962{
963    struct tr_datatype * d;
964
965    d = tr_new( struct tr_datatype, 1 );
966    d->isPieceData = isPieceData != 0;
967    d->length = byteCount;
968    tr_list_append( &io->outbuf_datatypes, d );
969}
970
971static struct evbuffer_iovec *
972evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
973{
974    const size_t byteCount = evbuffer_get_length( buf );
975    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
976    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
977    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
978    assert( n == vecCount );
979    *setme_vecCount = n;
980    return iovec;
981}
982
983static void
984maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
985{
986    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
987    {
988        size_t i, n;
989        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
990
991        for( i=0; i<n; ++i )
992            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
993
994        tr_free( iovec );
995    }
996}
997
998void
999tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
1000{
1001    const size_t byteCount = evbuffer_get_length( buf );
1002    maybeEncryptBuffer( io, buf );
1003    evbuffer_add_buffer( io->outbuf, buf );
1004    addDatatype( io, byteCount, isPieceData );
1005    if( io->utp_socket )
1006        UTP_Write( io->utp_socket, byteCount );
1007}
1008
1009void
1010tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
1011{
1012    struct evbuffer * buf = evbuffer_new( );
1013    evbuffer_add( buf, bytes, byteCount );
1014    tr_peerIoWriteBuf( io, buf, isPieceData );
1015    evbuffer_free( buf );
1016}
1017
1018/***
1019****
1020***/
1021
1022void
1023evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1024{
1025    evbuffer_add( outbuf, &byte, 1 );
1026}
1027
1028void
1029evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1030{
1031    const uint16_t ns = htons( addme_hs );
1032    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1033}
1034
1035void
1036evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1037{
1038    const uint32_t nl = htonl( addme_hl );
1039    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1040}
1041
1042/***
1043****
1044***/
1045
1046void
1047tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1048{
1049    assert( tr_isPeerIo( io ) );
1050    assert( evbuffer_get_length( inbuf )  >= byteCount );
1051
1052    switch( io->encryptionMode )
1053    {
1054        case PEER_ENCRYPTION_NONE:
1055            evbuffer_remove( inbuf, bytes, byteCount );
1056            break;
1057
1058        case PEER_ENCRYPTION_RC4:
1059            evbuffer_remove( inbuf, bytes, byteCount );
1060            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1061            break;
1062
1063        default:
1064            assert( 0 );
1065    }
1066}
1067
1068void
1069tr_peerIoReadUint16( tr_peerIo        * io,
1070                     struct evbuffer  * inbuf,
1071                     uint16_t         * setme )
1072{
1073    uint16_t tmp;
1074    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1075    *setme = ntohs( tmp );
1076}
1077
1078void tr_peerIoReadUint32( tr_peerIo        * io,
1079                          struct evbuffer  * inbuf,
1080                          uint32_t         * setme )
1081{
1082    uint32_t tmp;
1083    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1084    *setme = ntohl( tmp );
1085}
1086
1087void
1088tr_peerIoDrain( tr_peerIo       * io,
1089                struct evbuffer * inbuf,
1090                size_t            byteCount )
1091{
1092    void * buf = tr_sessionGetBuffer( io->session );
1093    const size_t buflen = SESSION_BUFFER_SIZE;
1094
1095    while( byteCount > 0 )
1096    {
1097        const size_t thisPass = MIN( byteCount, buflen );
1098        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1099        byteCount -= thisPass;
1100    }
1101
1102    tr_sessionReleaseBuffer( io->session );
1103}
1104
1105/***
1106****
1107***/
1108
1109static int
1110tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1111{
1112    int res = 0;
1113
1114    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1115    {
1116        int e;
1117
1118        EVUTIL_SET_SOCKET_ERROR( 0 );
1119        res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1120        e = EVUTIL_SOCKET_ERROR( );
1121
1122        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1123
1124        if( evbuffer_get_length( io->inbuf ) )
1125            canReadWrapper( io );
1126
1127        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1128        {
1129            char errstr[512];
1130            short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1131            if( res == 0 )
1132                what |= BEV_EVENT_EOF;
1133            tr_net_strerror( errstr, sizeof( errstr ), e );
1134            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1135            io->gotError( io, what, io->userData );
1136        }
1137    }
1138
1139    return res;
1140}
1141
1142static int
1143tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1144{
1145    int n = 0;
1146
1147    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1148    {
1149        int e;
1150        EVUTIL_SET_SOCKET_ERROR( 0 );
1151        n = tr_evbuffer_write( io, io->socket, howmuch );
1152        e = EVUTIL_SOCKET_ERROR( );
1153
1154        if( n > 0 )
1155            didWriteWrapper( io, n );
1156
1157        if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1158        {
1159            char errstr[512];
1160            const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1161
1162            tr_net_strerror( errstr, sizeof( errstr ), e );
1163            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1164
1165            if( io->gotError != NULL )
1166                io->gotError( io, what, io->userData );
1167        }
1168    }
1169
1170    return n;
1171}
1172
1173int
1174tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1175{
1176    int bytesUsed = 0;
1177
1178    assert( tr_isPeerIo( io ) );
1179    assert( tr_isDirection( dir ) );
1180
1181    if( dir == TR_DOWN )
1182        bytesUsed = tr_peerIoTryRead( io, limit );
1183    else
1184        bytesUsed = tr_peerIoTryWrite( io, limit );
1185
1186    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1187    return bytesUsed;
1188}
1189
1190int
1191tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1192{
1193    size_t byteCount = 0;
1194    tr_list * it;
1195
1196    /* count up how many bytes are used by non-piece-data messages
1197       at the front of our outbound queue */
1198    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1199    {
1200        struct tr_datatype * d = it->data;
1201
1202        if( d->isPieceData )
1203            break;
1204
1205        byteCount += d->length;
1206    }
1207
1208    return tr_peerIoFlush( io, TR_UP, byteCount );
1209}
Note: See TracBrowser for help on using the repository browser.