source: trunk/libtransmission/peer-io.c @ 12365

Last change on this file since 12365 was 12365, checked in by jordan, 11 years ago

(trunk libT) more heap pruning: use composition rather than aggregation for the tr_crypto object owned by tr_peerIo.

  • Property svn:keywords set to Date Rev Author Id
File size: 34.0 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 12365 2011-04-17 05:22:50Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h>
16
17#include <event2/event.h>
18#include <event2/buffer.h>
19#include <event2/bufferevent.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "session.h"
25#include "bandwidth.h"
26#include "crypto.h"
27#include "net.h"
28#include "peer-common.h" /* MAX_BLOCK_SIZE */
29#include "peer-io.h"
30#include "trevent.h" /* tr_runInEventThread() */
31#include "utils.h"
32
33
34#ifdef WIN32
35 #define EAGAIN       WSAEWOULDBLOCK
36 #define EINTR        WSAEINTR
37 #define EINPROGRESS  WSAEINPROGRESS
38 #define EPIPE        WSAECONNRESET
39#endif
40
41/* The amount of read bufferring that we allow for uTP sockets. */
42
43#define UTP_READ_BUFFER_SIZE (256 * 1024)
44
45static size_t
46guessPacketOverhead( size_t d )
47{
48    /**
49     * http://sd.wareonearth.com/~phil/net/overhead/
50     *
51     * TCP over Ethernet:
52     * Assuming no header compression (e.g. not PPP)
53     * Add 20 IPv4 header or 40 IPv6 header (no options)
54     * Add 20 TCP header
55     * Add 12 bytes optional TCP timestamps
56     * Max TCP Payload data rates over ethernet are thus:
57     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
58     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
59     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
60     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
61     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
62     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
63     */
64    const double assumed_payload_data_rate = 94.0;
65
66    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
67}
68
69/**
70***
71**/
72
73#define dbgmsg( io, ... ) \
74    do { \
75        if( tr_deepLoggingIsActive( ) ) \
76            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
77    } while( 0 )
78
79/**
80***
81**/
82
83struct tr_datatype
84{
85    struct tr_datatype * next;
86    size_t length;
87    bool isPieceData;
88};
89
90static struct tr_datatype * datatype_pool = NULL;
91
92static const struct tr_datatype TR_DATATYPE_INIT = { NULL, 0, false };
93
94static struct tr_datatype *
95datatype_new( void )
96{
97    struct tr_datatype * ret;
98
99    if( datatype_pool == NULL )
100        ret = tr_new( struct tr_datatype, 1 );
101    else {
102        ret = datatype_pool;
103        datatype_pool = datatype_pool->next;
104    }
105
106    *ret = TR_DATATYPE_INIT;
107    return ret;
108}
109
110static void
111datatype_free( struct tr_datatype * datatype )
112{
113    datatype->next = datatype_pool;
114    datatype_pool = datatype;
115}
116
117static void
118peer_io_pull_datatype( tr_peerIo * io )
119{
120    struct tr_datatype * tmp;
121
122    if(( tmp = io->outbuf_datatypes ))
123    {
124        io->outbuf_datatypes = tmp->next;
125        datatype_free( tmp );
126    }
127}
128
129static void
130peer_io_push_datatype( tr_peerIo * io, struct tr_datatype * datatype )
131{
132    struct tr_datatype * tmp;
133
134    if(( tmp = io->outbuf_datatypes )) {
135        while( tmp->next != NULL )
136            tmp = tmp->next;
137        tmp->next = datatype;
138    } else {
139        io->outbuf_datatypes = datatype;
140    }
141}
142
143/***
144****
145***/
146
147static void
148didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
149{
150     while( bytes_transferred && tr_isPeerIo( io ) )
151     {
152        struct tr_datatype * next = io->outbuf_datatypes;
153
154        const unsigned int payload = MIN( next->length, bytes_transferred );
155        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
156        const unsigned int overhead =
157            io->socket ? guessPacketOverhead( payload ) : 0;
158        const uint64_t now = tr_time_msec( );
159
160        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
161
162        if( overhead > 0 )
163            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
164
165        if( io->didWrite )
166            io->didWrite( io, payload, next->isPieceData, io->userData );
167
168        if( tr_isPeerIo( io ) )
169        {
170            bytes_transferred -= payload;
171            next->length -= payload;
172            if( !next->length )
173                peer_io_pull_datatype( io );
174        }
175    }
176}
177
178static void
179canReadWrapper( tr_peerIo * io )
180{
181    bool err = 0;
182    bool done = 0;
183    tr_session * session;
184
185    dbgmsg( io, "canRead" );
186
187    tr_peerIoRef( io );
188
189    session = io->session;
190
191    /* try to consume the input buffer */
192    if( io->canRead )
193    {
194        const uint64_t now = tr_time_msec( );
195
196        tr_sessionLock( session );
197
198        while( !done && !err )
199        {
200            size_t piece = 0;
201            const size_t oldLen = evbuffer_get_length( io->inbuf );
202            const int ret = io->canRead( io, io->userData, &piece );
203            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
204            const unsigned int overhead = guessPacketOverhead( used );
205
206            if( piece || (piece!=used) )
207            {
208                if( piece )
209                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now );
210
211                if( used != piece )
212                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now );
213            }
214
215            if( overhead > 0 )
216                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
217
218            switch( ret )
219            {
220                case READ_NOW:
221                    if( evbuffer_get_length( io->inbuf ) )
222                        continue;
223                    done = 1;
224                    break;
225
226                case READ_LATER:
227                    done = 1;
228                    break;
229
230                case READ_ERR:
231                    err = 1;
232                    break;
233            }
234
235            assert( tr_isPeerIo( io ) );
236        }
237
238        tr_sessionUnlock( session );
239    }
240
241    tr_peerIoUnref( io );
242}
243
244static void
245event_read_cb( int fd, short event UNUSED, void * vio )
246{
247    int res;
248    int e;
249    tr_peerIo * io = vio;
250
251    /* Limit the input buffer to 256K, so it doesn't grow too large */
252    unsigned int howmuch;
253    unsigned int curlen;
254    const tr_direction dir = TR_DOWN;
255    const unsigned int max = 256 * 1024;
256
257    assert( tr_isPeerIo( io ) );
258    assert( io->socket >= 0 );
259
260    io->pendingEvents &= ~EV_READ;
261
262    curlen = evbuffer_get_length( io->inbuf );
263    howmuch = curlen >= max ? 0 : max - curlen;
264    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
265
266    dbgmsg( io, "libevent says this peer is ready to read" );
267
268    /* if we don't have any bandwidth left, stop reading */
269    if( howmuch < 1 ) {
270        tr_peerIoSetEnabled( io, dir, false );
271        return;
272    }
273
274    EVUTIL_SET_SOCKET_ERROR( 0 );
275    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
276    e = EVUTIL_SOCKET_ERROR( );
277
278    if( res > 0 )
279    {
280        tr_peerIoSetEnabled( io, dir, true );
281
282        /* Invoke the user callback - must always be called last */
283        canReadWrapper( io );
284    }
285    else
286    {
287        char errstr[512];
288        short what = BEV_EVENT_READING;
289
290        if( res == 0 ) /* EOF */
291            what |= BEV_EVENT_EOF;
292        else if( res == -1 ) {
293            if( e == EAGAIN || e == EINTR ) {
294                tr_peerIoSetEnabled( io, dir, true );
295                return;
296            }
297            what |= BEV_EVENT_ERROR;
298        }
299
300        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)",
301                res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
302
303        if( io->gotError != NULL )
304            io->gotError( io, what, io->userData );
305    }
306}
307
308static int
309tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
310{
311    int e;
312    int n;
313    char errstr[256];
314
315    EVUTIL_SET_SOCKET_ERROR( 0 );
316    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
317    e = EVUTIL_SOCKET_ERROR( );
318    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
319
320    return n;
321}
322
323static void
324event_write_cb( int fd, short event UNUSED, void * vio )
325{
326    int res = 0;
327    int e;
328    short what = BEV_EVENT_WRITING;
329    tr_peerIo * io = vio;
330    size_t howmuch;
331    const tr_direction dir = TR_UP;
332
333    assert( tr_isPeerIo( io ) );
334    assert( io->socket >= 0 );
335
336    io->pendingEvents &= ~EV_WRITE;
337
338    dbgmsg( io, "libevent says this peer is ready to write" );
339
340    /* Write as much as possible, since the socket is non-blocking, write() will
341     * return if it can't write any more data without blocking */
342    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
343
344    /* if we don't have any bandwidth left, stop writing */
345    if( howmuch < 1 ) {
346        tr_peerIoSetEnabled( io, dir, false );
347        return;
348    }
349
350    EVUTIL_SET_SOCKET_ERROR( 0 );
351    res = tr_evbuffer_write( io, fd, howmuch );
352    e = EVUTIL_SOCKET_ERROR( );
353
354    if (res == -1) {
355        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
356            goto reschedule;
357        /* error case */
358        what |= BEV_EVENT_ERROR;
359    } else if (res == 0) {
360        /* eof case */
361        what |= BEV_EVENT_EOF;
362    }
363    if (res <= 0)
364        goto error;
365
366    if( evbuffer_get_length( io->outbuf ) )
367        tr_peerIoSetEnabled( io, dir, true );
368
369    didWriteWrapper( io, res );
370    return;
371
372 reschedule:
373    if( evbuffer_get_length( io->outbuf ) )
374        tr_peerIoSetEnabled( io, dir, true );
375    return;
376
377 error:
378
379    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
380
381    if( io->gotError != NULL )
382        io->gotError( io, what, io->userData );
383}
384
385/**
386***
387**/
388
389static void
390maybeSetCongestionAlgorithm( int socket, const char * algorithm )
391{
392    if( algorithm && *algorithm )
393    {
394        const int rc = tr_netSetCongestionControl( socket, algorithm );
395
396        if( rc < 0 )
397            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
398                     algorithm, tr_strerror( errno ));
399    }
400}
401
402#ifdef WITH_UTP
403/* UTP callbacks */
404
405static void
406utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
407{
408    int rc;
409    tr_peerIo *io = closure;
410    assert( tr_isPeerIo( io ) );
411
412    rc = evbuffer_add( io->inbuf, buf, buflen );
413    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
414
415    if( rc < 0 ) {
416        tr_nerr( "UTP", "On read evbuffer_add" );
417        return;
418    }
419
420    tr_peerIoSetEnabled( io, TR_DOWN, true );
421    canReadWrapper( io );
422}
423
424static void
425utp_on_write(void *closure, unsigned char *buf, size_t buflen)
426{
427    int rc;
428    tr_peerIo *io = closure;
429    assert( tr_isPeerIo( io ) );
430
431    rc = evbuffer_remove( io->outbuf, buf, buflen );
432    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
433    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
434    if( rc < (long)buflen ) {
435        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
436    }
437
438    didWriteWrapper( io, buflen );
439}
440
441static size_t
442utp_get_rb_size(void *closure)
443{
444    size_t bytes;
445    tr_peerIo *io = closure;
446    assert( tr_isPeerIo( io ) );
447
448    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
449
450    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
451    return UTP_READ_BUFFER_SIZE - bytes;
452}
453
454static void
455utp_on_state_change(void *closure, int state)
456{
457    tr_peerIo *io = closure;
458    assert( tr_isPeerIo( io ) );
459
460    if( state == UTP_STATE_CONNECT ) {
461        dbgmsg( io, "utp_on_state_change -- changed to connected" );
462        io->utpSupported = true;
463    } else if( state == UTP_STATE_WRITABLE ) {
464        dbgmsg( io, "utp_on_state_change -- changed to writable" );
465    } else if( state == UTP_STATE_EOF ) {
466        if( io->gotError )
467            io->gotError( io, BEV_EVENT_EOF, io->userData );
468    } else if( state == UTP_STATE_DESTROYING ) {
469        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
470        return;
471    } else {
472        tr_nerr( "UTP", "Unknown state %d", state );
473    }
474}
475
476static void
477utp_on_error(void *closure, int errcode)
478{
479    tr_peerIo *io = closure;
480    assert( tr_isPeerIo( io ) );
481
482    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
483
484    if( io->gotError ) {
485        errno = errcode;
486        io->gotError( io, BEV_EVENT_ERROR, io->userData );
487    }
488}
489
490static void
491utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
492{
493    tr_peerIo *io = closure;
494    assert( tr_isPeerIo( io ) );
495
496    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
497
498    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
499                      count, false, tr_time_msec() );
500}
501
502static struct UTPFunctionTable utp_function_table = {
503    .on_read = utp_on_read,
504    .on_write = utp_on_write,
505    .get_rb_size = utp_get_rb_size,
506    .on_state = utp_on_state_change,
507    .on_error = utp_on_error,
508    .on_overhead = utp_on_overhead
509};
510
511
512/* Dummy UTP callbacks. */
513/* We switch a UTP socket to use these after the associated peerIo has been
514   destroyed -- see io_dtor. */
515
516static void
517dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
518{
519    /* This cannot happen, as far as I'm aware. */
520    tr_nerr( "UTP", "On_read called on closed socket" );
521
522}
523
524static void
525dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
526{
527    /* This can very well happen if we've shut down a peer connection that
528       had unflushed buffers.  Complain and send zeroes. */
529    tr_ndbg( "UTP", "On_write called on closed socket" );
530    memset( buf, 0, buflen );
531}
532
533static size_t
534dummy_get_rb_size( void * closure UNUSED )
535{
536    return 0;
537}
538
539static void
540dummy_on_state_change(void * closure UNUSED, int state UNUSED )
541{
542    return;
543}
544
545static void
546dummy_on_error( void * closure UNUSED, int errcode UNUSED )
547{
548    return;
549}
550
551static void
552dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
553{
554    return;
555}
556
557static struct UTPFunctionTable dummy_utp_function_table = {
558    .on_read = dummy_read,
559    .on_write = dummy_write,
560    .get_rb_size = dummy_get_rb_size,
561    .on_state = dummy_on_state_change,
562    .on_error = dummy_on_error,
563    .on_overhead = dummy_on_overhead
564};
565
566#endif /* #ifdef WITH_UTP */
567
568static tr_peerIo*
569tr_peerIoNew( tr_session       * session,
570              tr_bandwidth     * parent,
571              const tr_address * addr,
572              tr_port            port,
573              const uint8_t    * torrentHash,
574              bool               isIncoming,
575              bool               isSeed,
576              int                socket,
577              struct UTPSocket * utp_socket)
578{
579    tr_peerIo * io;
580
581    assert( session != NULL );
582    assert( session->events != NULL );
583    assert( tr_isBool( isIncoming ) );
584    assert( tr_isBool( isSeed ) );
585    assert( tr_amInEventThread( session ) );
586    assert( (socket < 0) == (utp_socket != NULL) );
587#ifndef WITH_UTP
588    assert( socket >= 0 );
589#endif
590
591    if( socket >= 0 ) {
592        tr_netSetTOS( socket, session->peerSocketTOS );
593        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
594    }
595
596    io = tr_new0( tr_peerIo, 1 );
597    io->magicNumber = PEER_IO_MAGIC_NUMBER;
598    io->refCount = 1;
599    tr_cryptoConstruct( &io->crypto, torrentHash, isIncoming );
600    io->session = session;
601    io->addr = *addr;
602    io->isSeed = isSeed;
603    io->port = port;
604    io->socket = socket;
605    io->utp_socket = utp_socket;
606    io->isIncoming = isIncoming != 0;
607    io->timeCreated = tr_time( );
608    io->inbuf = evbuffer_new( );
609    io->outbuf = evbuffer_new( );
610    tr_bandwidthConstruct( &io->bandwidth, session, parent );
611    tr_bandwidthSetPeer( &io->bandwidth, io );
612    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
613    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
614
615    if( io->socket >= 0 ) {
616        io->event_read = event_new( session->event_base,
617                                    io->socket, EV_READ, event_read_cb, io );
618        io->event_write = event_new( session->event_base,
619                                     io->socket, EV_WRITE, event_write_cb, io );
620    }
621#ifdef WITH_UTP
622    else {
623        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
624        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
625        UTP_SetCallbacks( utp_socket,
626                          &utp_function_table,
627                          io );
628        if( !isIncoming ) {
629            dbgmsg( io, "%s", "calling UTP_Connect" );
630            UTP_Connect( utp_socket );
631        }
632    }
633#endif
634
635    return io;
636}
637
638tr_peerIo*
639tr_peerIoNewIncoming( tr_session        * session,
640                      tr_bandwidth      * parent,
641                      const tr_address  * addr,
642                      tr_port             port,
643                      int                 fd,
644                      struct UTPSocket  * utp_socket )
645{
646    assert( session );
647    assert( tr_address_is_valid( addr ) );
648
649    return tr_peerIoNew( session, parent, addr, port, NULL, true, false,
650                         fd, utp_socket );
651}
652
653tr_peerIo*
654tr_peerIoNewOutgoing( tr_session        * session,
655                      tr_bandwidth      * parent,
656                      const tr_address  * addr,
657                      tr_port             port,
658                      const uint8_t     * torrentHash,
659                      bool                isSeed,
660                      bool                utp )
661{
662    int fd = -1;
663    struct UTPSocket * utp_socket = NULL;
664
665    assert( session );
666    assert( tr_address_is_valid( addr ) );
667    assert( torrentHash );
668
669    if( utp )
670        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
671
672    if( !utp_socket ) {
673        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
674        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
675    }
676
677    if( fd < 0 && utp_socket == NULL )
678        return NULL;
679
680    return tr_peerIoNew( session, parent, addr, port,
681                         torrentHash, false, isSeed, fd, utp_socket );
682}
683
684/***
685****
686***/
687
688static void
689event_enable( tr_peerIo * io, short event )
690{
691    assert( tr_amInEventThread( io->session ) );
692    assert( io->session != NULL );
693    assert( io->session->events != NULL );
694
695    if( io->socket < 0 )
696        return;
697
698    assert( io->session->events != NULL );
699    assert( event_initialized( io->event_read ) );
700    assert( event_initialized( io->event_write ) );
701
702    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
703    {
704        dbgmsg( io, "enabling libevent ready-to-read polling" );
705        event_add( io->event_read, NULL );
706        io->pendingEvents |= EV_READ;
707    }
708
709    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
710    {
711        dbgmsg( io, "enabling libevent ready-to-write polling" );
712        event_add( io->event_write, NULL );
713        io->pendingEvents |= EV_WRITE;
714    }
715}
716
717static void
718event_disable( struct tr_peerIo * io, short event )
719{
720    assert( tr_amInEventThread( io->session ) );
721    assert( io->session != NULL );
722
723    if( io->socket < 0 )
724        return;
725
726    assert( io->session->events != NULL );
727    assert( event_initialized( io->event_read ) );
728    assert( event_initialized( io->event_write ) );
729
730    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
731    {
732        dbgmsg( io, "disabling libevent ready-to-read polling" );
733        event_del( io->event_read );
734        io->pendingEvents &= ~EV_READ;
735    }
736
737    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
738    {
739        dbgmsg( io, "disabling libevent ready-to-write polling" );
740        event_del( io->event_write );
741        io->pendingEvents &= ~EV_WRITE;
742    }
743}
744
745void
746tr_peerIoSetEnabled( tr_peerIo    * io,
747                     tr_direction   dir,
748                     bool           isEnabled )
749{
750    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
751
752    assert( tr_isPeerIo( io ) );
753    assert( tr_isDirection( dir ) );
754    assert( tr_amInEventThread( io->session ) );
755    assert( io->session->events != NULL );
756
757    if( isEnabled )
758        event_enable( io, event );
759    else
760        event_disable( io, event );
761}
762
763/***
764****
765***/
766static void
767io_close_socket( tr_peerIo * io )
768{
769    if( io->socket >= 0 ) {
770        tr_netClose( io->session, io->socket );
771        io->socket = -1;
772    }
773
774    if( io->event_read != NULL) {
775        event_free( io->event_read );
776        io->event_read = NULL;
777    }
778
779    if( io->event_write != NULL) {
780        event_free( io->event_write );
781        io->event_write = NULL;
782    }
783
784#ifdef WITH_UTP
785    if( io->utp_socket ) {
786        UTP_SetCallbacks( io->utp_socket,
787                          &dummy_utp_function_table,
788                          NULL );
789        UTP_Close( io->utp_socket );
790
791        io->utp_socket = NULL;
792    }
793#endif
794}
795
796static void
797io_dtor( void * vio )
798{
799    tr_peerIo * io = vio;
800
801    assert( tr_isPeerIo( io ) );
802    assert( tr_amInEventThread( io->session ) );
803    assert( io->session->events != NULL );
804
805    dbgmsg( io, "in tr_peerIo destructor" );
806    event_disable( io, EV_READ | EV_WRITE );
807    tr_bandwidthDestruct( &io->bandwidth );
808    evbuffer_free( io->outbuf );
809    evbuffer_free( io->inbuf );
810    io_close_socket( io );
811    tr_cryptoDestruct( &io->crypto );
812
813    while( io->outbuf_datatypes != NULL )
814        peer_io_pull_datatype( io );
815
816    memset( io, ~0, sizeof( tr_peerIo ) );
817    tr_free( io );
818}
819
820static void
821tr_peerIoFree( tr_peerIo * io )
822{
823    if( io )
824    {
825        dbgmsg( io, "in tr_peerIoFree" );
826        io->canRead = NULL;
827        io->didWrite = NULL;
828        io->gotError = NULL;
829        tr_runInEventThread( io->session, io_dtor, io );
830    }
831}
832
833void
834tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
835{
836    assert( tr_isPeerIo( io ) );
837
838    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
839                file, line, io->refCount, io->refCount+1 );
840
841    ++io->refCount;
842}
843
844void
845tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
846{
847    assert( tr_isPeerIo( io ) );
848
849    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
850                file, line, io->refCount, io->refCount-1 );
851
852    if( !--io->refCount )
853        tr_peerIoFree( io );
854}
855
856const tr_address*
857tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
858{
859    assert( tr_isPeerIo( io ) );
860
861    if( port )
862        *port = io->port;
863
864    return &io->addr;
865}
866
867const char*
868tr_peerIoAddrStr( const tr_address * addr, tr_port port )
869{
870    static char buf[512];
871    tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_address_to_string( addr ), ntohs( port ) );
872    return buf;
873}
874
875const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
876{
877    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
878}
879
880void
881tr_peerIoSetIOFuncs( tr_peerIo        * io,
882                     tr_can_read_cb     readcb,
883                     tr_did_write_cb    writecb,
884                     tr_net_error_cb    errcb,
885                     void             * userData )
886{
887    io->canRead = readcb;
888    io->didWrite = writecb;
889    io->gotError = errcb;
890    io->userData = userData;
891}
892
893void
894tr_peerIoClear( tr_peerIo * io )
895{
896    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
897    tr_peerIoSetEnabled( io, TR_UP, false );
898    tr_peerIoSetEnabled( io, TR_DOWN, false );
899}
900
901int
902tr_peerIoReconnect( tr_peerIo * io )
903{
904    short int pendingEvents;
905    tr_session * session;
906
907    assert( tr_isPeerIo( io ) );
908    assert( !tr_peerIoIsIncoming( io ) );
909
910    session = tr_peerIoGetSession( io );
911
912    pendingEvents = io->pendingEvents;
913    event_disable( io, EV_READ | EV_WRITE );
914
915    io_close_socket( io );
916
917    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
918    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
919    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
920
921    if( io->socket >= 0 )
922    {
923        event_enable( io, pendingEvents );
924        tr_netSetTOS( io->socket, session->peerSocketTOS );
925        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
926        return 0;
927    }
928
929    return -1;
930}
931
932/**
933***
934**/
935
936void
937tr_peerIoSetTorrentHash( tr_peerIo *     io,
938                         const uint8_t * hash )
939{
940    assert( tr_isPeerIo( io ) );
941
942    tr_cryptoSetTorrentHash( &io->crypto, hash );
943}
944
945const uint8_t*
946tr_peerIoGetTorrentHash( tr_peerIo * io )
947{
948    assert( tr_isPeerIo( io ) );
949
950    return tr_cryptoGetTorrentHash( &io->crypto );
951}
952
953int
954tr_peerIoHasTorrentHash( const tr_peerIo * io )
955{
956    assert( tr_isPeerIo( io ) );
957
958    return tr_cryptoHasTorrentHash( &io->crypto );
959}
960
961/**
962***
963**/
964
965void
966tr_peerIoSetPeersId( tr_peerIo * io, const uint8_t * peer_id )
967{
968    assert( tr_isPeerIo( io ) );
969
970    if( ( io->peerIdIsSet = peer_id != NULL ) )
971        memcpy( io->peerId, peer_id, 20 );
972    else
973        memset( io->peerId, 0, 20 );
974}
975
976/**
977***
978**/
979
980static unsigned int
981getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
982{
983    /* this is all kind of arbitrary, but what seems to work well is
984     * being large enough to hold the next 20 seconds' worth of input,
985     * or a few blocks, whichever is bigger.
986     * It's okay to tweak this as needed */
987    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
988    const unsigned int period = 15u; /* arbitrary */
989    /* the 3 is arbitrary; the .5 is to leave room for messages */
990    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
991    return MAX( ceiling, currentSpeed_Bps*period );
992}
993
994size_t
995tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
996{
997    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
998    const size_t currentLen = evbuffer_get_length( io->outbuf );
999    size_t freeSpace = 0;
1000
1001    if( desiredLen > currentLen )
1002        freeSpace = desiredLen - currentLen;
1003
1004    return freeSpace;
1005}
1006
1007/**
1008***
1009**/
1010
1011void
1012tr_peerIoSetEncryption( tr_peerIo * io, tr_encryption_type encryption_type )
1013{
1014    assert( tr_isPeerIo( io ) );
1015    assert( encryption_type == PEER_ENCRYPTION_NONE
1016         || encryption_type == PEER_ENCRYPTION_RC4 );
1017
1018    io->encryption_type = encryption_type;
1019}
1020
1021/**
1022***
1023**/
1024
1025static void
1026addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData )
1027{
1028    struct tr_datatype * d;
1029    d = datatype_new( );
1030    d->isPieceData = isPieceData != 0;
1031    d->length = byteCount;
1032    peer_io_push_datatype( io, d );
1033}
1034
1035static void
1036maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
1037{
1038    if( io->encryption_type == PEER_ENCRYPTION_RC4 )
1039    {
1040        struct evbuffer_ptr pos;
1041        struct evbuffer_iovec iovec;
1042        evbuffer_ptr_set( buf, &pos, 0, EVBUFFER_PTR_SET );
1043        do {
1044            evbuffer_peek( buf, -1, &pos, &iovec, 1 );
1045            tr_cryptoEncrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1046        } while( !evbuffer_ptr_set( buf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1047    }
1048}
1049
1050void
1051tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData )
1052{
1053    const size_t byteCount = evbuffer_get_length( buf );
1054    maybeEncryptBuffer( io, buf );
1055    evbuffer_add_buffer( io->outbuf, buf );
1056    addDatatype( io, byteCount, isPieceData );
1057}
1058
1059void
1060tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData )
1061{
1062    struct evbuffer * buf = evbuffer_new( );
1063    evbuffer_add( buf, bytes, byteCount );
1064    tr_peerIoWriteBuf( io, buf, isPieceData );
1065    evbuffer_free( buf );
1066}
1067
1068/***
1069****
1070***/
1071
1072void
1073evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1074{
1075    evbuffer_add( outbuf, &byte, 1 );
1076}
1077
1078void
1079evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1080{
1081    const uint16_t ns = htons( addme_hs );
1082    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1083}
1084
1085void
1086evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1087{
1088    const uint32_t nl = htonl( addme_hl );
1089    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1090}
1091
1092void
1093evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll )
1094{
1095    const uint64_t nll = tr_htonll( addme_hll );
1096    evbuffer_add( outbuf, &nll, sizeof( nll ) );
1097}
1098
1099/***
1100****
1101***/
1102
1103void
1104tr_peerIoReadBytesToBuf( tr_peerIo * io, struct evbuffer * inbuf, struct evbuffer * outbuf, size_t byteCount )
1105{
1106    struct evbuffer * tmp;
1107    const size_t old_length = evbuffer_get_length( outbuf );
1108
1109    assert( tr_isPeerIo( io ) );
1110    assert( evbuffer_get_length( inbuf ) >= byteCount );
1111
1112    /* append it to outbuf */
1113    tmp = evbuffer_new( );
1114    evbuffer_remove_buffer( inbuf, tmp, byteCount );
1115    evbuffer_add_buffer( outbuf, tmp );
1116    evbuffer_free( tmp );
1117
1118    /* decrypt if needed */
1119    if( io->encryption_type == PEER_ENCRYPTION_RC4 ) {
1120        struct evbuffer_ptr pos;
1121        struct evbuffer_iovec iovec;
1122        evbuffer_ptr_set( outbuf, &pos, old_length, EVBUFFER_PTR_SET );
1123        do {
1124            evbuffer_peek( outbuf, byteCount, &pos, &iovec, 1 );
1125            tr_cryptoDecrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1126            byteCount -= iovec.iov_len;
1127        } while( !evbuffer_ptr_set( outbuf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1128    }
1129}
1130
1131void
1132tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1133{
1134    assert( tr_isPeerIo( io ) );
1135    assert( evbuffer_get_length( inbuf )  >= byteCount );
1136
1137    switch( io->encryption_type )
1138    {
1139        case PEER_ENCRYPTION_NONE:
1140            evbuffer_remove( inbuf, bytes, byteCount );
1141            break;
1142
1143        case PEER_ENCRYPTION_RC4:
1144            evbuffer_remove( inbuf, bytes, byteCount );
1145            tr_cryptoDecrypt( &io->crypto, byteCount, bytes, bytes );
1146            break;
1147
1148        default:
1149            assert( 0 );
1150    }
1151}
1152
1153void
1154tr_peerIoReadUint16( tr_peerIo        * io,
1155                     struct evbuffer  * inbuf,
1156                     uint16_t         * setme )
1157{
1158    uint16_t tmp;
1159    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1160    *setme = ntohs( tmp );
1161}
1162
1163void tr_peerIoReadUint32( tr_peerIo        * io,
1164                          struct evbuffer  * inbuf,
1165                          uint32_t         * setme )
1166{
1167    uint32_t tmp;
1168    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1169    *setme = ntohl( tmp );
1170}
1171
1172void
1173tr_peerIoDrain( tr_peerIo       * io,
1174                struct evbuffer * inbuf,
1175                size_t            byteCount )
1176{
1177    char buf[4096];
1178    const size_t buflen = sizeof( buf );
1179
1180    while( byteCount > 0 )
1181    {
1182        const size_t thisPass = MIN( byteCount, buflen );
1183        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1184        byteCount -= thisPass;
1185    }
1186}
1187
1188/***
1189****
1190***/
1191
1192static int
1193tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1194{
1195    int res = 0;
1196
1197    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1198    {
1199        if( io->utp_socket != NULL ) /* utp peer connection */
1200        {
1201            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1202             * It opens up the congestion window by sending an ACK (soonish)
1203             * if one was not going to be sent. */
1204            if( evbuffer_get_length( io->inbuf ) == 0 )
1205                UTP_RBDrained( io->utp_socket );
1206        }
1207        else /* tcp peer connection */
1208        {
1209            int e;
1210
1211            EVUTIL_SET_SOCKET_ERROR( 0 );
1212            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1213            e = EVUTIL_SOCKET_ERROR( );
1214
1215            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1216
1217            if( evbuffer_get_length( io->inbuf ) )
1218                canReadWrapper( io );
1219
1220            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1221            {
1222                char errstr[512];
1223                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1224                if( res == 0 )
1225                    what |= BEV_EVENT_EOF;
1226                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)",
1227                        res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
1228                io->gotError( io, what, io->userData );
1229            }
1230        }
1231    }
1232
1233    return res;
1234}
1235
1236static int
1237tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1238{
1239    int n = 0;
1240    const size_t old_len = evbuffer_get_length( io->outbuf );
1241    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1242
1243    if( howmuch > old_len )
1244        howmuch = old_len;
1245
1246    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1247    {
1248        if( io->utp_socket != NULL ) /* utp peer connection */
1249        {
1250            const size_t old_len = evbuffer_get_length( io->outbuf );
1251            UTP_Write( io->utp_socket, howmuch );
1252            n = old_len - evbuffer_get_length( io->outbuf );
1253        }
1254        else
1255        {
1256            int e;
1257
1258            EVUTIL_SET_SOCKET_ERROR( 0 );
1259            n = tr_evbuffer_write( io, io->socket, howmuch );
1260            e = EVUTIL_SOCKET_ERROR( );
1261
1262            if( n > 0 )
1263                didWriteWrapper( io, n );
1264
1265            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1266            {
1267                char errstr[512];
1268                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1269
1270                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)",
1271                        n, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
1272
1273                if( io->gotError != NULL )
1274                    io->gotError( io, what, io->userData );
1275            }
1276        }
1277    }
1278
1279    return n;
1280}
1281
1282int
1283tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1284{
1285    int bytesUsed = 0;
1286
1287    assert( tr_isPeerIo( io ) );
1288    assert( tr_isDirection( dir ) );
1289
1290    if( dir == TR_DOWN )
1291        bytesUsed = tr_peerIoTryRead( io, limit );
1292    else
1293        bytesUsed = tr_peerIoTryWrite( io, limit );
1294
1295    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1296    return bytesUsed;
1297}
1298
1299int
1300tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1301{
1302    size_t byteCount = 0;
1303    const struct tr_datatype * it;
1304
1305    /* count up how many bytes are used by non-piece-data messages
1306       at the front of our outbound queue */
1307    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1308        if( it->isPieceData )
1309            break;
1310        else
1311            byteCount += it->length;
1312
1313    return tr_peerIoFlush( io, TR_UP, byteCount );
1314}
Note: See TracBrowser for help on using the repository browser.