source: trunk/libtransmission/peer-io.c @ 12059

Last change on this file since 12059 was 12059, checked in by jordan, 11 years ago

(trunk libT) #4047 "transfer speed shown as 0" -- revert r11783 (#3950) because it caused speed misreporting.

  • Property svn:keywords set to Date Rev Author Id
File size: 32.9 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 12059 2011-03-01 15:23:13Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28#include <libutp/utp.h>
29
30#include "transmission.h"
31#include "session.h"
32#include "bandwidth.h"
33#include "crypto.h"
34#include "list.h"
35#include "net.h"
36#include "peer-common.h" /* MAX_BLOCK_SIZE */
37#include "peer-io.h"
38#include "trevent.h" /* tr_runInEventThread() */
39#include "utils.h"
40
41
42#define MAGIC_NUMBER 206745
43
44#ifdef WIN32
45 #define EAGAIN       WSAEWOULDBLOCK
46 #define EINTR        WSAEINTR
47 #define EINPROGRESS  WSAEINPROGRESS
48 #define EPIPE        WSAECONNRESET
49#endif
50
51/* The amount of read bufferring that we allow for uTP sockets. */
52
53#define UTP_READ_BUFFER_SIZE (256 * 1024)
54
55static size_t
56guessPacketOverhead( size_t d )
57{
58    /**
59     * http://sd.wareonearth.com/~phil/net/overhead/
60     *
61     * TCP over Ethernet:
62     * Assuming no header compression (e.g. not PPP)
63     * Add 20 IPv4 header or 40 IPv6 header (no options)
64     * Add 20 TCP header
65     * Add 12 bytes optional TCP timestamps
66     * Max TCP Payload data rates over ethernet are thus:
67     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
68     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
69     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
70     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
71     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
72     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
73     */
74    const double assumed_payload_data_rate = 94.0;
75
76    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
77}
78
79/**
80***
81**/
82
83#define dbgmsg( io, ... ) \
84    do { \
85        if( tr_deepLoggingIsActive( ) ) \
86            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
87    } while( 0 )
88
89struct tr_datatype
90{
91    tr_bool  isPieceData;
92    size_t   length;
93};
94
95
96/***
97****
98***/
99
100static void
101didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
102{
103     while( bytes_transferred && tr_isPeerIo( io ) )
104     {
105        struct tr_datatype * next = io->outbuf_datatypes->data;
106
107        const unsigned int payload = MIN( next->length, bytes_transferred );
108        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
109        const unsigned int overhead =
110            io->socket ? guessPacketOverhead( payload ) : 0;
111        const uint64_t now = tr_time_msec( );
112
113        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
114
115        if( overhead > 0 )
116            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
117
118        if( io->didWrite )
119            io->didWrite( io, payload, next->isPieceData, io->userData );
120
121        if( tr_isPeerIo( io ) )
122        {
123            bytes_transferred -= payload;
124            next->length -= payload;
125            if( !next->length ) {
126                tr_list_pop_front( &io->outbuf_datatypes );
127                tr_free( next );
128            }
129        }
130    }
131}
132
133static void
134canReadWrapper( tr_peerIo * io )
135{
136    tr_bool err = 0;
137    tr_bool done = 0;
138    tr_session * session;
139
140    dbgmsg( io, "canRead" );
141
142    assert( tr_isPeerIo( io ) );
143    assert( tr_isSession( io->session ) );
144    tr_peerIoRef( io );
145
146    session = io->session;
147
148    /* try to consume the input buffer */
149    if( io->canRead )
150    {
151        const uint64_t now = tr_time_msec( );
152
153        tr_sessionLock( session );
154
155        while( !done && !err )
156        {
157            size_t piece = 0;
158            const size_t oldLen = evbuffer_get_length( io->inbuf );
159            const int ret = io->canRead( io, io->userData, &piece );
160            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
161            const unsigned int overhead = guessPacketOverhead( used );
162
163            assert( tr_isPeerIo( io ) );
164
165            if( piece || (piece!=used) )
166            {
167                if( piece )
168                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
169
170                if( used != piece )
171                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
172            }
173
174            if( overhead > 0 )
175                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
176
177            switch( ret )
178            {
179                case READ_NOW:
180                    if( evbuffer_get_length( io->inbuf ) )
181                        continue;
182                    done = 1;
183                    break;
184
185                case READ_LATER:
186                    done = 1;
187                    break;
188
189                case READ_ERR:
190                    err = 1;
191                    break;
192            }
193
194            assert( tr_isPeerIo( io ) );
195        }
196
197        tr_sessionUnlock( session );
198    }
199
200    assert( tr_isPeerIo( io ) );
201    tr_peerIoUnref( io );
202}
203
204tr_bool
205tr_isPeerIo( const tr_peerIo * io )
206{
207    return ( io != NULL )
208        && ( io->magicNumber == MAGIC_NUMBER )
209        && ( io->refCount >= 0 )
210        && ( tr_isBandwidth( &io->bandwidth ) )
211        && ( tr_isAddress( &io->addr ) );
212}
213
214static void
215event_read_cb( int fd, short event UNUSED, void * vio )
216{
217    int res;
218    int e;
219    tr_peerIo * io = vio;
220
221    /* Limit the input buffer to 256K, so it doesn't grow too large */
222    unsigned int howmuch;
223    unsigned int curlen;
224    const tr_direction dir = TR_DOWN;
225    const unsigned int max = 256 * 1024;
226
227    assert( tr_isPeerIo( io ) );
228    assert( io->socket >= 0 );
229
230    io->pendingEvents &= ~EV_READ;
231
232    curlen = evbuffer_get_length( io->inbuf );
233    howmuch = curlen >= max ? 0 : max - curlen;
234    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
235
236    dbgmsg( io, "libevent says this peer is ready to read" );
237
238    /* if we don't have any bandwidth left, stop reading */
239    if( howmuch < 1 ) {
240        tr_peerIoSetEnabled( io, dir, FALSE );
241        return;
242    }
243
244    EVUTIL_SET_SOCKET_ERROR( 0 );
245    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
246    e = EVUTIL_SOCKET_ERROR( );
247
248    if( res > 0 )
249    {
250        tr_peerIoSetEnabled( io, dir, TRUE );
251
252        /* Invoke the user callback - must always be called last */
253        canReadWrapper( io );
254    }
255    else
256    {
257        char errstr[512];
258        short what = BEV_EVENT_READING;
259
260        if( res == 0 ) /* EOF */
261            what |= BEV_EVENT_EOF;
262        else if( res == -1 ) {
263            if( e == EAGAIN || e == EINTR ) {
264                tr_peerIoSetEnabled( io, dir, TRUE );
265                return;
266            }
267            what |= BEV_EVENT_ERROR;
268        }
269
270        tr_net_strerror( errstr, sizeof( errstr ), e );
271        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
272
273        if( io->gotError != NULL )
274            io->gotError( io, what, io->userData );
275    }
276}
277
278static int
279tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
280{
281    int e;
282    int n;
283    char errstr[256];
284
285    EVUTIL_SET_SOCKET_ERROR( 0 );
286    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
287    e = EVUTIL_SOCKET_ERROR( );
288    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
289
290    return n;
291}
292
293static void
294event_write_cb( int fd, short event UNUSED, void * vio )
295{
296    int res = 0;
297    int e;
298    short what = BEV_EVENT_WRITING;
299    tr_peerIo * io = vio;
300    size_t howmuch;
301    const tr_direction dir = TR_UP;
302
303    assert( tr_isPeerIo( io ) );
304    assert( io->socket >= 0 );
305
306    io->pendingEvents &= ~EV_WRITE;
307
308    dbgmsg( io, "libevent says this peer is ready to write" );
309
310    /* Write as much as possible, since the socket is non-blocking, write() will
311     * return if it can't write any more data without blocking */
312    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
313
314    /* if we don't have any bandwidth left, stop writing */
315    if( howmuch < 1 ) {
316        tr_peerIoSetEnabled( io, dir, FALSE );
317        return;
318    }
319
320    EVUTIL_SET_SOCKET_ERROR( 0 );
321    res = tr_evbuffer_write( io, fd, howmuch );
322    e = EVUTIL_SOCKET_ERROR( );
323
324    if (res == -1) {
325        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
326            goto reschedule;
327        /* error case */
328        what |= BEV_EVENT_ERROR;
329    } else if (res == 0) {
330        /* eof case */
331        what |= BEV_EVENT_EOF;
332    }
333    if (res <= 0)
334        goto error;
335
336    if( evbuffer_get_length( io->outbuf ) )
337        tr_peerIoSetEnabled( io, dir, TRUE );
338
339    didWriteWrapper( io, res );
340    return;
341
342 reschedule:
343    if( evbuffer_get_length( io->outbuf ) )
344        tr_peerIoSetEnabled( io, dir, TRUE );
345    return;
346
347 error:
348
349    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
350
351    if( io->gotError != NULL )
352        io->gotError( io, what, io->userData );
353}
354
355/**
356***
357**/
358
359static void
360maybeSetCongestionAlgorithm( int socket, const char * algorithm )
361{
362    if( algorithm && *algorithm )
363    {
364        const int rc = tr_netSetCongestionControl( socket, algorithm );
365
366        if( rc < 0 )
367            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
368                     algorithm, tr_strerror( errno ));
369    }
370}
371
372#ifdef WITH_UTP
373/* UTP callbacks */
374
375static void
376utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
377{
378    int rc;
379    tr_peerIo *io = (tr_peerIo *)closure;
380    assert( tr_isPeerIo( io ) );
381
382    rc = evbuffer_add( io->inbuf, buf, buflen );
383    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
384
385    if( rc < 0 ) {
386        tr_nerr( "UTP", "On read evbuffer_add" );
387        return;
388    }
389
390    tr_peerIoSetEnabled( io, TR_DOWN, TRUE );
391    canReadWrapper( io );
392}
393
394static void
395utp_on_write(void *closure, unsigned char *buf, size_t buflen)
396{
397    tr_peerIo *io = (tr_peerIo *)closure;
398    int rc;
399    assert( tr_isPeerIo( io ) );
400
401    rc = evbuffer_remove( io->outbuf, buf, buflen );
402    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
403    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
404    if( rc < (long)buflen ) {
405        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
406    }
407
408    didWriteWrapper( io, buflen );
409}
410
411static size_t
412utp_get_rb_size(void *closure)
413{
414    tr_peerIo *io = (tr_peerIo *)closure;
415    size_t bytes;
416    assert( tr_isPeerIo( io ) );
417
418    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
419
420    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
421    return UTP_READ_BUFFER_SIZE - bytes;
422}
423
424static void
425utp_on_state_change(void *closure, int state)
426{
427    tr_peerIo *io = (tr_peerIo *)closure;
428    assert( tr_isPeerIo( io ) );
429
430    if( state == UTP_STATE_CONNECT ) {
431        dbgmsg( io, "utp_on_state_change -- changed to connected" );
432        io->utpSupported = TRUE;
433    } else if( state == UTP_STATE_WRITABLE ) {
434        dbgmsg( io, "utp_on_state_change -- changed to writable" );
435    } else if( state == UTP_STATE_EOF ) {
436        if( io->gotError )
437            io->gotError( io, BEV_EVENT_EOF, io->userData );
438    } else if( state == UTP_STATE_DESTROYING ) {
439        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
440        return;
441    } else {
442        tr_nerr( "UTP", "Unknown state %d", state );
443    }
444}
445
446static void
447utp_on_error(void *closure, int errcode)
448{
449    tr_peerIo *io = (tr_peerIo *)closure;
450    assert( tr_isPeerIo( io ) );
451
452    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
453
454    if( io->gotError ) {
455        errno = errcode;
456        io->gotError( io, BEV_EVENT_ERROR, io->userData );
457    }
458}
459
460static void
461utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
462{
463    tr_peerIo *io = (tr_peerIo *)closure;
464    assert( tr_isPeerIo( io ) );
465
466    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
467
468    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
469                      count, FALSE, tr_time_msec() );
470}
471
472static struct UTPFunctionTable utp_function_table = {
473    .on_read = utp_on_read,
474    .on_write = utp_on_write,
475    .get_rb_size = utp_get_rb_size,
476    .on_state = utp_on_state_change,
477    .on_error = utp_on_error,
478    .on_overhead = utp_on_overhead
479};
480
481
482/* Dummy UTP callbacks. */
483/* We switch a UTP socket to use these after the associated peerIo has been
484   destroyed -- see io_dtor. */
485
486static void
487dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
488{
489    /* This cannot happen, as far as I'm aware. */
490    tr_nerr( "UTP", "On_read called on closed socket" );
491
492}
493
494static void
495dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
496{
497    /* This can very well happen if we've shut down a peer connection that
498       had unflushed buffers.  Complain and send zeroes. */
499    tr_ndbg( "UTP", "On_write called on closed socket" );
500    memset( buf, 0, buflen );
501}
502
503static size_t
504dummy_get_rb_size( void * closure UNUSED )
505{
506    return 0;
507}
508
509static void
510dummy_on_state_change(void * closure UNUSED, int state UNUSED )
511{
512    return;
513}
514
515static void
516dummy_on_error( void * closure UNUSED, int errcode UNUSED )
517{
518    return;
519}
520
521static void
522dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
523{
524    return;
525}
526
527static struct UTPFunctionTable dummy_utp_function_table = {
528    .on_read = dummy_read,
529    .on_write = dummy_write,
530    .get_rb_size = dummy_get_rb_size,
531    .on_state = dummy_on_state_change,
532    .on_error = dummy_on_error,
533    .on_overhead = dummy_on_overhead
534};
535
536#endif /* #ifdef WITH_UTP */
537
538static tr_peerIo*
539tr_peerIoNew( tr_session       * session,
540              tr_bandwidth     * parent,
541              const tr_address * addr,
542              tr_port            port,
543              const uint8_t    * torrentHash,
544              tr_bool            isIncoming,
545              tr_bool            isSeed,
546              int                socket,
547              struct UTPSocket * utp_socket)
548{
549    tr_peerIo * io;
550
551    assert( session != NULL );
552    assert( session->events != NULL );
553    assert( tr_isBool( isIncoming ) );
554    assert( tr_isBool( isSeed ) );
555    assert( tr_amInEventThread( session ) );
556    assert( (socket < 0) == (utp_socket != NULL) );
557#ifndef WITH_UTP
558    assert( socket >= 0 );
559#endif
560
561    if( socket >= 0 ) {
562        tr_netSetTOS( socket, session->peerSocketTOS );
563        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
564    }
565
566    io = tr_new0( tr_peerIo, 1 );
567    io->magicNumber = MAGIC_NUMBER;
568    io->refCount = 1;
569    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
570    io->session = session;
571    io->addr = *addr;
572    io->isSeed = isSeed;
573    io->port = port;
574    io->socket = socket;
575    io->utp_socket = utp_socket;
576    io->isIncoming = isIncoming != 0;
577    io->timeCreated = tr_time( );
578    io->inbuf = evbuffer_new( );
579    io->outbuf = evbuffer_new( );
580    tr_bandwidthConstruct( &io->bandwidth, session, parent );
581    tr_bandwidthSetPeer( &io->bandwidth, io );
582    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
583    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
584
585    if( io->socket >= 0 ) {
586        io->event_read = event_new( session->event_base,
587                                    io->socket, EV_READ, event_read_cb, io );
588        io->event_write = event_new( session->event_base,
589                                     io->socket, EV_WRITE, event_write_cb, io );
590    }
591#ifdef WITH_UTP
592    else {
593        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
594        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
595        UTP_SetCallbacks( utp_socket,
596                          &utp_function_table,
597                          io );
598        if( !isIncoming ) {
599            dbgmsg( io, "%s", "calling UTP_Connect" );
600            UTP_Connect( utp_socket );
601        }
602    }
603#endif
604
605    return io;
606}
607
608tr_peerIo*
609tr_peerIoNewIncoming( tr_session        * session,
610                      tr_bandwidth      * parent,
611                      const tr_address  * addr,
612                      tr_port             port,
613                      int                 fd,
614                      struct UTPSocket  * utp_socket )
615{
616    assert( session );
617    assert( tr_isAddress( addr ) );
618
619    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, 
620                         fd, utp_socket );
621}
622
623tr_peerIo*
624tr_peerIoNewOutgoing( tr_session        * session,
625                      tr_bandwidth      * parent,
626                      const tr_address  * addr,
627                      tr_port             port,
628                      const uint8_t     * torrentHash,
629                      tr_bool             isSeed,
630                      tr_bool             utp )
631{
632    int fd = -1;
633    struct UTPSocket *utp_socket = NULL;
634
635    assert( session );
636    assert( tr_isAddress( addr ) );
637    assert( torrentHash );
638
639    if( utp )
640        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
641
642    if( !utp_socket ) {
643        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
644        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
645    }
646
647    if( fd < 0 && utp_socket == NULL )
648        return NULL;
649
650    return tr_peerIoNew( session, parent, addr, port,
651                         torrentHash, FALSE, isSeed, fd, utp_socket );
652}
653
654/***
655****
656***/
657
658static void
659event_enable( tr_peerIo * io, short event )
660{
661    assert( tr_amInEventThread( io->session ) );
662    assert( io->session != NULL );
663    assert( io->session->events != NULL );
664
665    if( io->socket < 0 )
666        return;
667
668    assert( io->session->events != NULL );
669    assert( event_initialized( io->event_read ) );
670    assert( event_initialized( io->event_write ) );
671
672    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
673    {
674        dbgmsg( io, "enabling libevent ready-to-read polling" );
675        event_add( io->event_read, NULL );
676        io->pendingEvents |= EV_READ;
677    }
678
679    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
680    {
681        dbgmsg( io, "enabling libevent ready-to-write polling" );
682        event_add( io->event_write, NULL );
683        io->pendingEvents |= EV_WRITE;
684    }
685}
686
687static void
688event_disable( struct tr_peerIo * io, short event )
689{
690    assert( tr_amInEventThread( io->session ) );
691    assert( io->session != NULL );
692
693    if( io->socket < 0 )
694        return;
695
696    assert( io->session->events != NULL );
697    assert( event_initialized( io->event_read ) );
698    assert( event_initialized( io->event_write ) );
699
700    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
701    {
702        dbgmsg( io, "disabling libevent ready-to-read polling" );
703        event_del( io->event_read );
704        io->pendingEvents &= ~EV_READ;
705    }
706
707    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
708    {
709        dbgmsg( io, "disabling libevent ready-to-write polling" );
710        event_del( io->event_write );
711        io->pendingEvents &= ~EV_WRITE;
712    }
713}
714
715void
716tr_peerIoSetEnabled( tr_peerIo    * io,
717                     tr_direction   dir,
718                     tr_bool        isEnabled )
719{
720    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
721
722    assert( tr_isPeerIo( io ) );
723    assert( tr_isDirection( dir ) );
724    assert( tr_amInEventThread( io->session ) );
725    assert( io->session->events != NULL );
726
727    if( isEnabled )
728        event_enable( io, event );
729    else
730        event_disable( io, event );
731}
732
733/***
734****
735***/
736static void
737io_close_socket( tr_peerIo * io )
738{
739    if( io->socket >= 0 ) {
740        tr_netClose( io->session, io->socket );
741        io->socket = -1;
742        event_free( io->event_read );
743        event_free( io->event_write );
744    }
745
746#ifdef WITH_UTP
747    if( io->utp_socket ) {
748        UTP_SetCallbacks( io->utp_socket,
749                          &dummy_utp_function_table,
750                          NULL );
751        UTP_Close( io->utp_socket );
752
753        io->utp_socket = NULL;
754    }
755#endif
756}
757
758static void
759io_dtor( void * vio )
760{
761    tr_peerIo * io = vio;
762
763    assert( tr_isPeerIo( io ) );
764    assert( tr_amInEventThread( io->session ) );
765    assert( io->session->events != NULL );
766
767    dbgmsg( io, "in tr_peerIo destructor" );
768    event_disable( io, EV_READ | EV_WRITE );
769    tr_bandwidthDestruct( &io->bandwidth );
770    evbuffer_free( io->outbuf );
771    evbuffer_free( io->inbuf );
772    io_close_socket( io );
773    tr_cryptoFree( io->crypto );
774    tr_list_free( &io->outbuf_datatypes, tr_free );
775
776    memset( io, ~0, sizeof( tr_peerIo ) );
777    tr_free( io );
778}
779
780static void
781tr_peerIoFree( tr_peerIo * io )
782{
783    if( io )
784    {
785        dbgmsg( io, "in tr_peerIoFree" );
786        io->canRead = NULL;
787        io->didWrite = NULL;
788        io->gotError = NULL;
789        tr_runInEventThread( io->session, io_dtor, io );
790    }
791}
792
793void
794tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
795{
796    assert( tr_isPeerIo( io ) );
797
798    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
799                file, line, io->refCount, io->refCount+1 );
800
801    ++io->refCount;
802}
803
804void
805tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
806{
807    assert( tr_isPeerIo( io ) );
808
809    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
810                file, line, io->refCount, io->refCount-1 );
811
812    if( !--io->refCount )
813        tr_peerIoFree( io );
814}
815
816const tr_address*
817tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
818{
819    assert( tr_isPeerIo( io ) );
820
821    if( port )
822        *port = io->port;
823
824    return &io->addr;
825}
826
827const char*
828tr_peerIoAddrStr( const tr_address * addr, tr_port port )
829{
830    static char buf[512];
831
832    if( addr->type == TR_AF_INET )
833        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
834    else
835        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
836    return buf;
837}
838
839const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
840{
841    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
842}
843
844void
845tr_peerIoSetIOFuncs( tr_peerIo        * io,
846                     tr_can_read_cb     readcb,
847                     tr_did_write_cb    writecb,
848                     tr_net_error_cb    errcb,
849                     void             * userData )
850{
851    io->canRead = readcb;
852    io->didWrite = writecb;
853    io->gotError = errcb;
854    io->userData = userData;
855}
856
857void
858tr_peerIoClear( tr_peerIo * io )
859{
860    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
861    tr_peerIoSetEnabled( io, TR_UP, FALSE );
862    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
863}
864
865int
866tr_peerIoReconnect( tr_peerIo * io )
867{
868    short int pendingEvents;
869    tr_session * session;
870
871    assert( tr_isPeerIo( io ) );
872    assert( !tr_peerIoIsIncoming( io ) );
873
874    session = tr_peerIoGetSession( io );
875
876    pendingEvents = io->pendingEvents;
877    event_disable( io, EV_READ | EV_WRITE );
878
879    io_close_socket( io );
880
881    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
882    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
883    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
884
885    if( io->socket >= 0 )
886    {
887        event_enable( io, pendingEvents );
888        tr_netSetTOS( io->socket, session->peerSocketTOS );
889        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
890        return 0;
891    }
892
893    return -1;
894}
895
896/**
897***
898**/
899
900void
901tr_peerIoSetTorrentHash( tr_peerIo *     io,
902                         const uint8_t * hash )
903{
904    assert( tr_isPeerIo( io ) );
905
906    tr_cryptoSetTorrentHash( io->crypto, hash );
907}
908
909const uint8_t*
910tr_peerIoGetTorrentHash( tr_peerIo * io )
911{
912    assert( tr_isPeerIo( io ) );
913    assert( io->crypto );
914
915    return tr_cryptoGetTorrentHash( io->crypto );
916}
917
918int
919tr_peerIoHasTorrentHash( const tr_peerIo * io )
920{
921    assert( tr_isPeerIo( io ) );
922    assert( io->crypto );
923
924    return tr_cryptoHasTorrentHash( io->crypto );
925}
926
927/**
928***
929**/
930
931void
932tr_peerIoSetPeersId( tr_peerIo *     io,
933                     const uint8_t * peer_id )
934{
935    assert( tr_isPeerIo( io ) );
936
937    if( ( io->peerIdIsSet = peer_id != NULL ) )
938        memcpy( io->peerId, peer_id, 20 );
939    else
940        memset( io->peerId, 0, 20 );
941}
942
943/**
944***
945**/
946
947static unsigned int
948getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
949{
950    /* this is all kind of arbitrary, but what seems to work well is
951     * being large enough to hold the next 20 seconds' worth of input,
952     * or a few blocks, whichever is bigger.
953     * It's okay to tweak this as needed */
954    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
955    const unsigned int period = 15u; /* arbitrary */
956    /* the 3 is arbitrary; the .5 is to leave room for messages */
957    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
958    return MAX( ceiling, currentSpeed_Bps*period );
959}
960
961size_t
962tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
963{
964    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
965    const size_t currentLen = evbuffer_get_length( io->outbuf );
966    size_t freeSpace = 0;
967
968    if( desiredLen > currentLen )
969        freeSpace = desiredLen - currentLen;
970
971    return freeSpace;
972}
973
974/**
975***
976**/
977
978void
979tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
980{
981    assert( tr_isPeerIo( io ) );
982    assert( encryptionMode == PEER_ENCRYPTION_NONE
983         || encryptionMode == PEER_ENCRYPTION_RC4 );
984
985    io->encryptionMode = encryptionMode;
986}
987
988/**
989***
990**/
991
992static void
993addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
994{
995    struct tr_datatype * d;
996
997    d = tr_new( struct tr_datatype, 1 );
998    d->isPieceData = isPieceData != 0;
999    d->length = byteCount;
1000    tr_list_append( &io->outbuf_datatypes, d );
1001}
1002
1003static struct evbuffer_iovec *
1004evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
1005{
1006    const size_t byteCount = evbuffer_get_length( buf );
1007    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
1008    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
1009    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
1010    assert( n == vecCount );
1011    *setme_vecCount = n;
1012    return iovec;
1013}
1014
1015static void
1016maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
1017{
1018    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
1019    {
1020        size_t i, n;
1021        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
1022
1023        for( i=0; i<n; ++i )
1024            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
1025
1026        tr_free( iovec );
1027    }
1028}
1029
1030void
1031tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
1032{
1033    const size_t byteCount = evbuffer_get_length( buf );
1034    maybeEncryptBuffer( io, buf );
1035    evbuffer_add_buffer( io->outbuf, buf );
1036    addDatatype( io, byteCount, isPieceData );
1037}
1038
1039void
1040tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
1041{
1042    struct evbuffer * buf = evbuffer_new( );
1043    evbuffer_add( buf, bytes, byteCount );
1044    tr_peerIoWriteBuf( io, buf, isPieceData );
1045    evbuffer_free( buf );
1046}
1047
1048/***
1049****
1050***/
1051
1052void
1053evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1054{
1055    evbuffer_add( outbuf, &byte, 1 );
1056}
1057
1058void
1059evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1060{
1061    const uint16_t ns = htons( addme_hs );
1062    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1063}
1064
1065void
1066evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1067{
1068    const uint32_t nl = htonl( addme_hl );
1069    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1070}
1071
1072/***
1073****
1074***/
1075
1076void
1077tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1078{
1079    assert( tr_isPeerIo( io ) );
1080    assert( evbuffer_get_length( inbuf )  >= byteCount );
1081
1082    switch( io->encryptionMode )
1083    {
1084        case PEER_ENCRYPTION_NONE:
1085            evbuffer_remove( inbuf, bytes, byteCount );
1086            break;
1087
1088        case PEER_ENCRYPTION_RC4:
1089            evbuffer_remove( inbuf, bytes, byteCount );
1090            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1091            break;
1092
1093        default:
1094            assert( 0 );
1095    }
1096}
1097
1098void
1099tr_peerIoReadUint16( tr_peerIo        * io,
1100                     struct evbuffer  * inbuf,
1101                     uint16_t         * setme )
1102{
1103    uint16_t tmp;
1104    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1105    *setme = ntohs( tmp );
1106}
1107
1108void tr_peerIoReadUint32( tr_peerIo        * io,
1109                          struct evbuffer  * inbuf,
1110                          uint32_t         * setme )
1111{
1112    uint32_t tmp;
1113    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1114    *setme = ntohl( tmp );
1115}
1116
1117void
1118tr_peerIoDrain( tr_peerIo       * io,
1119                struct evbuffer * inbuf,
1120                size_t            byteCount )
1121{
1122    void * buf = tr_sessionGetBuffer( io->session );
1123    const size_t buflen = SESSION_BUFFER_SIZE;
1124
1125    while( byteCount > 0 )
1126    {
1127        const size_t thisPass = MIN( byteCount, buflen );
1128        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1129        byteCount -= thisPass;
1130    }
1131
1132    tr_sessionReleaseBuffer( io->session );
1133}
1134
1135/***
1136****
1137***/
1138
1139static int
1140tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1141{
1142    int res = 0;
1143
1144    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1145    {
1146        if( io->utp_socket != NULL ) /* utp peer connection */
1147        {
1148            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1149             * It opens up the congestion window by sending an ACK (soonish)
1150             * if one was not going to be sent. */
1151            if( evbuffer_get_length( io->inbuf ) == 0 )
1152                UTP_RBDrained( io->utp_socket );
1153        }
1154        else /* tcp peer connection */
1155        {
1156            int e;
1157
1158            EVUTIL_SET_SOCKET_ERROR( 0 );
1159            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1160            e = EVUTIL_SOCKET_ERROR( );
1161
1162            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1163
1164            if( evbuffer_get_length( io->inbuf ) )
1165                canReadWrapper( io );
1166
1167            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1168            {
1169                char errstr[512];
1170                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1171                if( res == 0 )
1172                    what |= BEV_EVENT_EOF;
1173                tr_net_strerror( errstr, sizeof( errstr ), e );
1174                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1175                io->gotError( io, what, io->userData );
1176            }
1177        }
1178    }
1179
1180    return res;
1181}
1182
1183static int
1184tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1185{
1186    int n = 0;
1187    const size_t old_len = evbuffer_get_length( io->outbuf );
1188    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1189
1190    if( howmuch > old_len )
1191        howmuch = old_len;
1192
1193    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1194    {
1195        if( io->utp_socket != NULL ) /* utp peer connection */
1196        {
1197            const size_t old_len = evbuffer_get_length( io->outbuf );
1198            UTP_Write( io->utp_socket, howmuch );
1199            n = old_len - evbuffer_get_length( io->outbuf );
1200        }
1201        else
1202        {
1203            int e;
1204
1205            EVUTIL_SET_SOCKET_ERROR( 0 );
1206            n = tr_evbuffer_write( io, io->socket, howmuch );
1207            e = EVUTIL_SOCKET_ERROR( );
1208
1209            if( n > 0 )
1210                didWriteWrapper( io, n );
1211
1212            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1213            {
1214                char errstr[512];
1215                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1216
1217                tr_net_strerror( errstr, sizeof( errstr ), e );
1218                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1219
1220                if( io->gotError != NULL )
1221                    io->gotError( io, what, io->userData );
1222            }
1223        }
1224    }
1225
1226    return n;
1227}
1228
1229int
1230tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1231{
1232    int bytesUsed = 0;
1233
1234    assert( tr_isPeerIo( io ) );
1235    assert( tr_isDirection( dir ) );
1236
1237    if( dir == TR_DOWN )
1238        bytesUsed = tr_peerIoTryRead( io, limit );
1239    else
1240        bytesUsed = tr_peerIoTryWrite( io, limit );
1241
1242    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1243    return bytesUsed;
1244}
1245
1246int
1247tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1248{
1249    size_t byteCount = 0;
1250    tr_list * it;
1251
1252    /* count up how many bytes are used by non-piece-data messages
1253       at the front of our outbound queue */
1254    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1255    {
1256        struct tr_datatype * d = it->data;
1257
1258        if( d->isPieceData )
1259            break;
1260
1261        byteCount += d->length;
1262    }
1263
1264    return tr_peerIoFlush( io, TR_UP, byteCount );
1265}
Note: See TracBrowser for help on using the repository browser.