source: trunk/libtransmission/peer-io.c @ 13329

Last change on this file since 13329 was 13329, checked in by jordan, 9 years ago

(trunk libT) fix the Linux build wrt compiling with the new snapshot of libutp checked into r13317

Previously we made sure to include stdbool.h (via transmission.h) before utp.h, since the latter used 'bool' without defining it. The new snapshot defines it unconditionally in non-C++ code, so now we need to include it first.

  • Property svn:keywords set to Date Rev Author Id
File size: 34.3 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 13329 2012-05-30 17:47:29Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h>
16
17#include <event2/event.h>
18#include <event2/buffer.h>
19#include <event2/bufferevent.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "session.h"
25#include "bandwidth.h"
26#include "crypto.h"
27#include "net.h"
28#include "peer-common.h" /* MAX_BLOCK_SIZE */
29#include "peer-io.h"
30#include "trevent.h" /* tr_runInEventThread() */
31#include "tr-utp.h"
32#include "utils.h"
33
34
35#ifdef WIN32
36 #define EAGAIN       WSAEWOULDBLOCK
37 #define EINTR        WSAEINTR
38 #define EINPROGRESS  WSAEINPROGRESS
39 #define EPIPE        WSAECONNRESET
40#endif
41
42/* The amount of read bufferring that we allow for uTP sockets. */
43
44#define UTP_READ_BUFFER_SIZE (256 * 1024)
45
46static size_t
47guessPacketOverhead( size_t d )
48{
49    /**
50     * http://sd.wareonearth.com/~phil/net/overhead/
51     *
52     * TCP over Ethernet:
53     * Assuming no header compression (e.g. not PPP)
54     * Add 20 IPv4 header or 40 IPv6 header (no options)
55     * Add 20 TCP header
56     * Add 12 bytes optional TCP timestamps
57     * Max TCP Payload data rates over ethernet are thus:
58     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
59     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
60     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
61     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
62     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
63     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
64     */
65    const double assumed_payload_data_rate = 94.0;
66
67    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
68}
69
70/**
71***
72**/
73
74#define dbgmsg( io, ... ) \
75    do { \
76        if( tr_deepLoggingIsActive( ) ) \
77            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
78    } while( 0 )
79
80/**
81***
82**/
83
84struct tr_datatype
85{
86    struct tr_datatype * next;
87    size_t length;
88    bool isPieceData;
89};
90
91static struct tr_datatype * datatype_pool = NULL;
92
93static const struct tr_datatype TR_DATATYPE_INIT = { NULL, 0, false };
94
95static struct tr_datatype *
96datatype_new( void )
97{
98    struct tr_datatype * ret;
99
100    if( datatype_pool == NULL )
101        ret = tr_new( struct tr_datatype, 1 );
102    else {
103        ret = datatype_pool;
104        datatype_pool = datatype_pool->next;
105    }
106
107    *ret = TR_DATATYPE_INIT;
108    return ret;
109}
110
111static void
112datatype_free( struct tr_datatype * datatype )
113{
114    datatype->next = datatype_pool;
115    datatype_pool = datatype;
116}
117
118static void
119peer_io_pull_datatype( tr_peerIo * io )
120{
121    struct tr_datatype * tmp;
122
123    if(( tmp = io->outbuf_datatypes ))
124    {
125        io->outbuf_datatypes = tmp->next;
126        datatype_free( tmp );
127    }
128}
129
130static void
131peer_io_push_datatype( tr_peerIo * io, struct tr_datatype * datatype )
132{
133    struct tr_datatype * tmp;
134
135    if(( tmp = io->outbuf_datatypes )) {
136        while( tmp->next != NULL )
137            tmp = tmp->next;
138        tmp->next = datatype;
139    } else {
140        io->outbuf_datatypes = datatype;
141    }
142}
143
144/***
145****
146***/
147
148static void
149didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
150{
151     while( bytes_transferred && tr_isPeerIo( io ) )
152     {
153        struct tr_datatype * next = io->outbuf_datatypes;
154
155        const unsigned int payload = MIN( next->length, bytes_transferred );
156        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
157        const unsigned int overhead =
158            io->socket ? guessPacketOverhead( payload ) : 0;
159        const uint64_t now = tr_time_msec( );
160
161        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
162
163        if( overhead > 0 )
164            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
165
166        if( io->didWrite )
167            io->didWrite( io, payload, next->isPieceData, io->userData );
168
169        if( tr_isPeerIo( io ) )
170        {
171            bytes_transferred -= payload;
172            next->length -= payload;
173            if( !next->length )
174                peer_io_pull_datatype( io );
175        }
176    }
177}
178
179static void
180canReadWrapper( tr_peerIo * io )
181{
182    bool err = 0;
183    bool done = 0;
184    tr_session * session;
185
186    dbgmsg( io, "canRead" );
187
188    tr_peerIoRef( io );
189
190    session = io->session;
191
192    /* try to consume the input buffer */
193    if( io->canRead )
194    {
195        const uint64_t now = tr_time_msec( );
196
197        tr_sessionLock( session );
198
199        while( !done && !err )
200        {
201            size_t piece = 0;
202            const size_t oldLen = evbuffer_get_length( io->inbuf );
203            const int ret = io->canRead( io, io->userData, &piece );
204            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
205            const unsigned int overhead = guessPacketOverhead( used );
206
207            if( piece || (piece!=used) )
208            {
209                if( piece )
210                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now );
211
212                if( used != piece )
213                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now );
214            }
215
216            if( overhead > 0 )
217                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
218
219            switch( ret )
220            {
221                case READ_NOW:
222                    if( evbuffer_get_length( io->inbuf ) )
223                        continue;
224                    done = 1;
225                    break;
226
227                case READ_LATER:
228                    done = 1;
229                    break;
230
231                case READ_ERR:
232                    err = 1;
233                    break;
234            }
235
236            assert( tr_isPeerIo( io ) );
237        }
238
239        tr_sessionUnlock( session );
240    }
241
242    tr_peerIoUnref( io );
243}
244
245static void
246event_read_cb( int fd, short event UNUSED, void * vio )
247{
248    int res;
249    int e;
250    tr_peerIo * io = vio;
251
252    /* Limit the input buffer to 256K, so it doesn't grow too large */
253    unsigned int howmuch;
254    unsigned int curlen;
255    const tr_direction dir = TR_DOWN;
256    const unsigned int max = 256 * 1024;
257
258    assert( tr_isPeerIo( io ) );
259    assert( io->socket >= 0 );
260
261    io->pendingEvents &= ~EV_READ;
262
263    curlen = evbuffer_get_length( io->inbuf );
264    howmuch = curlen >= max ? 0 : max - curlen;
265    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
266
267    dbgmsg( io, "libevent says this peer is ready to read" );
268
269    /* if we don't have any bandwidth left, stop reading */
270    if( howmuch < 1 ) {
271        tr_peerIoSetEnabled( io, dir, false );
272        return;
273    }
274
275    EVUTIL_SET_SOCKET_ERROR( 0 );
276    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
277    e = EVUTIL_SOCKET_ERROR( );
278
279    if( res > 0 )
280    {
281        tr_peerIoSetEnabled( io, dir, true );
282
283        /* Invoke the user callback - must always be called last */
284        canReadWrapper( io );
285    }
286    else
287    {
288        char errstr[512];
289        short what = BEV_EVENT_READING;
290
291        if( res == 0 ) /* EOF */
292            what |= BEV_EVENT_EOF;
293        else if( res == -1 ) {
294            if( e == EAGAIN || e == EINTR ) {
295                tr_peerIoSetEnabled( io, dir, true );
296                return;
297            }
298            what |= BEV_EVENT_ERROR;
299        }
300
301        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)",
302                res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
303
304        if( io->gotError != NULL )
305            io->gotError( io, what, io->userData );
306    }
307}
308
309static int
310tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
311{
312    int e;
313    int n;
314    char errstr[256];
315
316    EVUTIL_SET_SOCKET_ERROR( 0 );
317    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
318    e = EVUTIL_SOCKET_ERROR( );
319    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
320
321    return n;
322}
323
324static void
325event_write_cb( int fd, short event UNUSED, void * vio )
326{
327    int res = 0;
328    int e;
329    short what = BEV_EVENT_WRITING;
330    tr_peerIo * io = vio;
331    size_t howmuch;
332    const tr_direction dir = TR_UP;
333    char errstr[1024];
334
335    assert( tr_isPeerIo( io ) );
336    assert( io->socket >= 0 );
337
338    io->pendingEvents &= ~EV_WRITE;
339
340    dbgmsg( io, "libevent says this peer is ready to write" );
341
342    /* Write as much as possible, since the socket is non-blocking, write() will
343     * return if it can't write any more data without blocking */
344    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
345
346    /* if we don't have any bandwidth left, stop writing */
347    if( howmuch < 1 ) {
348        tr_peerIoSetEnabled( io, dir, false );
349        return;
350    }
351
352    EVUTIL_SET_SOCKET_ERROR( 0 );
353    res = tr_evbuffer_write( io, fd, howmuch );
354    e = EVUTIL_SOCKET_ERROR( );
355
356    if (res == -1) {
357        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
358            goto reschedule;
359        /* error case */
360        what |= BEV_EVENT_ERROR;
361    } else if (res == 0) {
362        /* eof case */
363        what |= BEV_EVENT_EOF;
364    }
365    if (res <= 0)
366        goto error;
367
368    if( evbuffer_get_length( io->outbuf ) )
369        tr_peerIoSetEnabled( io, dir, true );
370
371    didWriteWrapper( io, res );
372    return;
373
374 reschedule:
375    if( evbuffer_get_length( io->outbuf ) )
376        tr_peerIoSetEnabled( io, dir, true );
377    return;
378
379 error:
380
381    tr_net_strerror( errstr, sizeof( errstr ), e );
382    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
383
384    if( io->gotError != NULL )
385        io->gotError( io, what, io->userData );
386}
387
388/**
389***
390**/
391
392static void
393maybeSetCongestionAlgorithm( int socket, const char * algorithm )
394{
395    if( algorithm && *algorithm )
396    {
397        const int rc = tr_netSetCongestionControl( socket, algorithm );
398
399        if( rc < 0 )
400            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
401                     algorithm, tr_strerror( errno ));
402    }
403}
404
405#ifdef WITH_UTP
406/* UTP callbacks */
407
408static void
409utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
410{
411    int rc;
412    tr_peerIo *io = closure;
413    assert( tr_isPeerIo( io ) );
414
415    rc = evbuffer_add( io->inbuf, buf, buflen );
416    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
417
418    if( rc < 0 ) {
419        tr_nerr( "UTP", "On read evbuffer_add" );
420        return;
421    }
422
423    tr_peerIoSetEnabled( io, TR_DOWN, true );
424    canReadWrapper( io );
425}
426
427static void
428utp_on_write(void *closure, unsigned char *buf, size_t buflen)
429{
430    int rc;
431    tr_peerIo *io = closure;
432    assert( tr_isPeerIo( io ) );
433
434    rc = evbuffer_remove( io->outbuf, buf, buflen );
435    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
436    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
437    if( rc < (long)buflen ) {
438        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
439    }
440
441    didWriteWrapper( io, buflen );
442}
443
444static size_t
445utp_get_rb_size(void *closure)
446{
447    size_t bytes;
448    tr_peerIo *io = closure;
449    assert( tr_isPeerIo( io ) );
450
451    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
452
453    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
454    return UTP_READ_BUFFER_SIZE - bytes;
455}
456
457static void
458utp_on_state_change(void *closure, int state)
459{
460    tr_peerIo *io = closure;
461    assert( tr_isPeerIo( io ) );
462
463    if( state == UTP_STATE_CONNECT ) {
464        dbgmsg( io, "utp_on_state_change -- changed to connected" );
465        io->utpSupported = true;
466    } else if( state == UTP_STATE_WRITABLE ) {
467        dbgmsg( io, "utp_on_state_change -- changed to writable" );
468    } else if( state == UTP_STATE_EOF ) {
469        if( io->gotError )
470            io->gotError( io, BEV_EVENT_EOF, io->userData );
471    } else if( state == UTP_STATE_DESTROYING ) {
472        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
473        return;
474    } else {
475        tr_nerr( "UTP", "Unknown state %d", state );
476    }
477}
478
479static void
480utp_on_error(void *closure, int errcode)
481{
482    tr_peerIo *io = closure;
483    assert( tr_isPeerIo( io ) );
484
485    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
486
487    if( io->gotError ) {
488        errno = errcode;
489        io->gotError( io, BEV_EVENT_ERROR, io->userData );
490    }
491}
492
493static void
494utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
495{
496    tr_peerIo *io = closure;
497    assert( tr_isPeerIo( io ) );
498
499    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
500
501    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
502                      count, false, tr_time_msec() );
503}
504
505static struct UTPFunctionTable utp_function_table = {
506    .on_read = utp_on_read,
507    .on_write = utp_on_write,
508    .get_rb_size = utp_get_rb_size,
509    .on_state = utp_on_state_change,
510    .on_error = utp_on_error,
511    .on_overhead = utp_on_overhead
512};
513
514
515/* Dummy UTP callbacks. */
516/* We switch a UTP socket to use these after the associated peerIo has been
517   destroyed -- see io_dtor. */
518
519static void
520dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
521{
522    /* This cannot happen, as far as I'm aware. */
523    tr_nerr( "UTP", "On_read called on closed socket" );
524
525}
526
527static void
528dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
529{
530    /* This can very well happen if we've shut down a peer connection that
531       had unflushed buffers.  Complain and send zeroes. */
532    tr_ndbg( "UTP", "On_write called on closed socket" );
533    memset( buf, 0, buflen );
534}
535
536static size_t
537dummy_get_rb_size( void * closure UNUSED )
538{
539    return 0;
540}
541
542static void
543dummy_on_state_change(void * closure UNUSED, int state UNUSED )
544{
545    return;
546}
547
548static void
549dummy_on_error( void * closure UNUSED, int errcode UNUSED )
550{
551    return;
552}
553
554static void
555dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
556{
557    return;
558}
559
560static struct UTPFunctionTable dummy_utp_function_table = {
561    .on_read = dummy_read,
562    .on_write = dummy_write,
563    .get_rb_size = dummy_get_rb_size,
564    .on_state = dummy_on_state_change,
565    .on_error = dummy_on_error,
566    .on_overhead = dummy_on_overhead
567};
568
569#endif /* #ifdef WITH_UTP */
570
571static tr_peerIo*
572tr_peerIoNew( tr_session       * session,
573              tr_bandwidth     * parent,
574              const tr_address * addr,
575              tr_port            port,
576              const uint8_t    * torrentHash,
577              bool               isIncoming,
578              bool               isSeed,
579              int                socket,
580              struct UTPSocket * utp_socket)
581{
582    tr_peerIo * io;
583
584    assert( session != NULL );
585    assert( session->events != NULL );
586    assert( tr_isBool( isIncoming ) );
587    assert( tr_isBool( isSeed ) );
588    assert( tr_amInEventThread( session ) );
589    assert( (socket < 0) == (utp_socket != NULL) );
590#ifndef WITH_UTP
591    assert( socket >= 0 );
592#endif
593
594    if( socket >= 0 ) {
595        tr_netSetTOS( socket, session->peerSocketTOS );
596        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
597    }
598
599    io = tr_new0( tr_peerIo, 1 );
600    io->magicNumber = PEER_IO_MAGIC_NUMBER;
601    io->refCount = 1;
602    tr_cryptoConstruct( &io->crypto, torrentHash, isIncoming );
603    io->session = session;
604    io->addr = *addr;
605    io->isSeed = isSeed;
606    io->port = port;
607    io->socket = socket;
608    io->utp_socket = utp_socket;
609    io->isIncoming = isIncoming != 0;
610    io->timeCreated = tr_time( );
611    io->inbuf = evbuffer_new( );
612    io->outbuf = evbuffer_new( );
613    tr_bandwidthConstruct( &io->bandwidth, session, parent );
614    tr_bandwidthSetPeer( &io->bandwidth, io );
615    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
616    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
617
618    if( io->socket >= 0 ) {
619        io->event_read = event_new( session->event_base,
620                                    io->socket, EV_READ, event_read_cb, io );
621        io->event_write = event_new( session->event_base,
622                                     io->socket, EV_WRITE, event_write_cb, io );
623    }
624#ifdef WITH_UTP
625    else {
626        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
627        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
628        UTP_SetCallbacks( utp_socket,
629                          &utp_function_table,
630                          io );
631        if( !isIncoming ) {
632            dbgmsg( io, "%s", "calling UTP_Connect" );
633            UTP_Connect( utp_socket );
634        }
635    }
636#endif
637
638    return io;
639}
640
641tr_peerIo*
642tr_peerIoNewIncoming( tr_session        * session,
643                      tr_bandwidth      * parent,
644                      const tr_address  * addr,
645                      tr_port             port,
646                      int                 fd,
647                      struct UTPSocket  * utp_socket )
648{
649    assert( session );
650    assert( tr_address_is_valid( addr ) );
651
652    return tr_peerIoNew( session, parent, addr, port, NULL, true, false,
653                         fd, utp_socket );
654}
655
656tr_peerIo*
657tr_peerIoNewOutgoing( tr_session        * session,
658                      tr_bandwidth      * parent,
659                      const tr_address  * addr,
660                      tr_port             port,
661                      const uint8_t     * torrentHash,
662                      bool                isSeed,
663                      bool                utp )
664{
665    int fd = -1;
666    struct UTPSocket * utp_socket = NULL;
667
668    assert( session );
669    assert( tr_address_is_valid( addr ) );
670    assert( torrentHash );
671
672    if( utp )
673        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
674
675    if( !utp_socket ) {
676        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
677        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
678    }
679
680    if( fd < 0 && utp_socket == NULL )
681        return NULL;
682
683    return tr_peerIoNew( session, parent, addr, port,
684                         torrentHash, false, isSeed, fd, utp_socket );
685}
686
687/***
688****
689***/
690
691static void
692event_enable( tr_peerIo * io, short event )
693{
694    assert( tr_amInEventThread( io->session ) );
695    assert( io->session != NULL );
696    assert( io->session->events != NULL );
697
698    if( io->socket < 0 )
699        return;
700
701    assert( io->session->events != NULL );
702    assert( event_initialized( io->event_read ) );
703    assert( event_initialized( io->event_write ) );
704
705    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
706    {
707        dbgmsg( io, "enabling libevent ready-to-read polling" );
708        event_add( io->event_read, NULL );
709        io->pendingEvents |= EV_READ;
710    }
711
712    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
713    {
714        dbgmsg( io, "enabling libevent ready-to-write polling" );
715        event_add( io->event_write, NULL );
716        io->pendingEvents |= EV_WRITE;
717    }
718}
719
720static void
721event_disable( struct tr_peerIo * io, short event )
722{
723    assert( tr_amInEventThread( io->session ) );
724    assert( io->session != NULL );
725
726    if( io->socket < 0 )
727        return;
728
729    assert( io->session->events != NULL );
730    assert( event_initialized( io->event_read ) );
731    assert( event_initialized( io->event_write ) );
732
733    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
734    {
735        dbgmsg( io, "disabling libevent ready-to-read polling" );
736        event_del( io->event_read );
737        io->pendingEvents &= ~EV_READ;
738    }
739
740    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
741    {
742        dbgmsg( io, "disabling libevent ready-to-write polling" );
743        event_del( io->event_write );
744        io->pendingEvents &= ~EV_WRITE;
745    }
746}
747
748void
749tr_peerIoSetEnabled( tr_peerIo    * io,
750                     tr_direction   dir,
751                     bool           isEnabled )
752{
753    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
754
755    assert( tr_isPeerIo( io ) );
756    assert( tr_isDirection( dir ) );
757    assert( tr_amInEventThread( io->session ) );
758    assert( io->session->events != NULL );
759
760    if( isEnabled )
761        event_enable( io, event );
762    else
763        event_disable( io, event );
764}
765
766/***
767****
768***/
769static void
770io_close_socket( tr_peerIo * io )
771{
772    if( io->socket >= 0 ) {
773        tr_netClose( io->session, io->socket );
774        io->socket = -1;
775    }
776
777    if( io->event_read != NULL) {
778        event_free( io->event_read );
779        io->event_read = NULL;
780    }
781
782    if( io->event_write != NULL) {
783        event_free( io->event_write );
784        io->event_write = NULL;
785    }
786
787#ifdef WITH_UTP
788    if( io->utp_socket ) {
789        UTP_SetCallbacks( io->utp_socket,
790                          &dummy_utp_function_table,
791                          NULL );
792        UTP_Close( io->utp_socket );
793
794        io->utp_socket = NULL;
795    }
796#endif
797}
798
799static void
800io_dtor( void * vio )
801{
802    tr_peerIo * io = vio;
803
804    assert( tr_isPeerIo( io ) );
805    assert( tr_amInEventThread( io->session ) );
806    assert( io->session->events != NULL );
807
808    dbgmsg( io, "in tr_peerIo destructor" );
809    event_disable( io, EV_READ | EV_WRITE );
810    tr_bandwidthDestruct( &io->bandwidth );
811    evbuffer_free( io->outbuf );
812    evbuffer_free( io->inbuf );
813    io_close_socket( io );
814    tr_cryptoDestruct( &io->crypto );
815
816    while( io->outbuf_datatypes != NULL )
817        peer_io_pull_datatype( io );
818
819    memset( io, ~0, sizeof( tr_peerIo ) );
820    tr_free( io );
821}
822
823static void
824tr_peerIoFree( tr_peerIo * io )
825{
826    if( io )
827    {
828        dbgmsg( io, "in tr_peerIoFree" );
829        io->canRead = NULL;
830        io->didWrite = NULL;
831        io->gotError = NULL;
832        tr_runInEventThread( io->session, io_dtor, io );
833    }
834}
835
836void
837tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
838{
839    assert( tr_isPeerIo( io ) );
840
841    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
842                file, line, io->refCount, io->refCount+1 );
843
844    ++io->refCount;
845}
846
847void
848tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
849{
850    assert( tr_isPeerIo( io ) );
851
852    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
853                file, line, io->refCount, io->refCount-1 );
854
855    if( !--io->refCount )
856        tr_peerIoFree( io );
857}
858
859const tr_address*
860tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
861{
862    assert( tr_isPeerIo( io ) );
863
864    if( port )
865        *port = io->port;
866
867    return &io->addr;
868}
869
870const char*
871tr_peerIoAddrStr( const tr_address * addr, tr_port port )
872{
873    static char buf[512];
874    tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_address_to_string( addr ), ntohs( port ) );
875    return buf;
876}
877
878const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
879{
880    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
881}
882
883void
884tr_peerIoSetIOFuncs( tr_peerIo        * io,
885                     tr_can_read_cb     readcb,
886                     tr_did_write_cb    writecb,
887                     tr_net_error_cb    errcb,
888                     void             * userData )
889{
890    io->canRead = readcb;
891    io->didWrite = writecb;
892    io->gotError = errcb;
893    io->userData = userData;
894}
895
896void
897tr_peerIoClear( tr_peerIo * io )
898{
899    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
900    tr_peerIoSetEnabled( io, TR_UP, false );
901    tr_peerIoSetEnabled( io, TR_DOWN, false );
902}
903
904int
905tr_peerIoReconnect( tr_peerIo * io )
906{
907    short int pendingEvents;
908    tr_session * session;
909
910    assert( tr_isPeerIo( io ) );
911    assert( !tr_peerIoIsIncoming( io ) );
912
913    session = tr_peerIoGetSession( io );
914
915    pendingEvents = io->pendingEvents;
916    event_disable( io, EV_READ | EV_WRITE );
917
918    io_close_socket( io );
919
920    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
921    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
922    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
923
924    if( io->socket >= 0 )
925    {
926        event_enable( io, pendingEvents );
927        tr_netSetTOS( io->socket, session->peerSocketTOS );
928        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
929        return 0;
930    }
931
932    return -1;
933}
934
935/**
936***
937**/
938
939void
940tr_peerIoSetTorrentHash( tr_peerIo *     io,
941                         const uint8_t * hash )
942{
943    assert( tr_isPeerIo( io ) );
944
945    tr_cryptoSetTorrentHash( &io->crypto, hash );
946}
947
948const uint8_t*
949tr_peerIoGetTorrentHash( tr_peerIo * io )
950{
951    assert( tr_isPeerIo( io ) );
952
953    return tr_cryptoGetTorrentHash( &io->crypto );
954}
955
956int
957tr_peerIoHasTorrentHash( const tr_peerIo * io )
958{
959    assert( tr_isPeerIo( io ) );
960
961    return tr_cryptoHasTorrentHash( &io->crypto );
962}
963
964/**
965***
966**/
967
968void
969tr_peerIoSetPeersId( tr_peerIo * io, const uint8_t * peer_id )
970{
971    assert( tr_isPeerIo( io ) );
972
973    if( ( io->peerIdIsSet = peer_id != NULL ) )
974        memcpy( io->peerId, peer_id, 20 );
975    else
976        memset( io->peerId, 0, 20 );
977}
978
979/**
980***
981**/
982
983static unsigned int
984getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
985{
986    /* this is all kind of arbitrary, but what seems to work well is
987     * being large enough to hold the next 20 seconds' worth of input,
988     * or a few blocks, whichever is bigger.
989     * It's okay to tweak this as needed */
990    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
991    const unsigned int period = 15u; /* arbitrary */
992    /* the 3 is arbitrary; the .5 is to leave room for messages */
993    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
994    return MAX( ceiling, currentSpeed_Bps*period );
995}
996
997size_t
998tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
999{
1000    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
1001    const size_t currentLen = evbuffer_get_length( io->outbuf );
1002    size_t freeSpace = 0;
1003
1004    if( desiredLen > currentLen )
1005        freeSpace = desiredLen - currentLen;
1006
1007    return freeSpace;
1008}
1009
1010/**
1011***
1012**/
1013
1014void
1015tr_peerIoSetEncryption( tr_peerIo * io, tr_encryption_type encryption_type )
1016{
1017    assert( tr_isPeerIo( io ) );
1018    assert( encryption_type == PEER_ENCRYPTION_NONE
1019         || encryption_type == PEER_ENCRYPTION_RC4 );
1020
1021    io->encryption_type = encryption_type;
1022}
1023
1024/**
1025***
1026**/
1027
1028static void
1029addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData )
1030{
1031    struct tr_datatype * d;
1032    d = datatype_new( );
1033    d->isPieceData = isPieceData != 0;
1034    d->length = byteCount;
1035    peer_io_push_datatype( io, d );
1036}
1037
1038static void
1039maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
1040{
1041    if( io->encryption_type == PEER_ENCRYPTION_RC4 )
1042    {
1043        struct evbuffer_ptr pos;
1044        struct evbuffer_iovec iovec;
1045        evbuffer_ptr_set( buf, &pos, 0, EVBUFFER_PTR_SET );
1046        do {
1047            evbuffer_peek( buf, -1, &pos, &iovec, 1 );
1048            tr_cryptoEncrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1049        } while( !evbuffer_ptr_set( buf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1050    }
1051}
1052
1053void
1054tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData )
1055{
1056    const size_t byteCount = evbuffer_get_length( buf );
1057    maybeEncryptBuffer( io, buf );
1058    evbuffer_add_buffer( io->outbuf, buf );
1059    addDatatype( io, byteCount, isPieceData );
1060}
1061
1062void
1063tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData )
1064{
1065    struct evbuffer_iovec iovec;
1066    evbuffer_reserve_space( io->outbuf, byteCount, &iovec, 1 );
1067
1068    iovec.iov_len = byteCount;
1069    if( io->encryption_type == PEER_ENCRYPTION_RC4 )
1070        tr_cryptoEncrypt( &io->crypto, iovec.iov_len, bytes, iovec.iov_base );
1071    else
1072        memcpy( iovec.iov_base, bytes, iovec.iov_len );
1073    evbuffer_commit_space( io->outbuf, &iovec, 1 );
1074
1075    addDatatype( io, byteCount, isPieceData );
1076}
1077
1078/***
1079****
1080***/
1081
1082void
1083evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1084{
1085    evbuffer_add( outbuf, &byte, 1 );
1086}
1087
1088void
1089evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1090{
1091    const uint16_t ns = htons( addme_hs );
1092    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1093}
1094
1095void
1096evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1097{
1098    const uint32_t nl = htonl( addme_hl );
1099    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1100}
1101
1102void
1103evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll )
1104{
1105    const uint64_t nll = tr_htonll( addme_hll );
1106    evbuffer_add( outbuf, &nll, sizeof( nll ) );
1107}
1108
1109/***
1110****
1111***/
1112
1113void
1114tr_peerIoReadBytesToBuf( tr_peerIo * io, struct evbuffer * inbuf, struct evbuffer * outbuf, size_t byteCount )
1115{
1116    struct evbuffer * tmp;
1117    const size_t old_length = evbuffer_get_length( outbuf );
1118
1119    assert( tr_isPeerIo( io ) );
1120    assert( evbuffer_get_length( inbuf ) >= byteCount );
1121
1122    /* append it to outbuf */
1123    tmp = evbuffer_new( );
1124    evbuffer_remove_buffer( inbuf, tmp, byteCount );
1125    evbuffer_add_buffer( outbuf, tmp );
1126    evbuffer_free( tmp );
1127
1128    /* decrypt if needed */
1129    if( io->encryption_type == PEER_ENCRYPTION_RC4 ) {
1130        struct evbuffer_ptr pos;
1131        struct evbuffer_iovec iovec;
1132        evbuffer_ptr_set( outbuf, &pos, old_length, EVBUFFER_PTR_SET );
1133        do {
1134            evbuffer_peek( outbuf, byteCount, &pos, &iovec, 1 );
1135            tr_cryptoDecrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1136            byteCount -= iovec.iov_len;
1137        } while( !evbuffer_ptr_set( outbuf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1138    }
1139}
1140
1141void
1142tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1143{
1144    assert( tr_isPeerIo( io ) );
1145    assert( evbuffer_get_length( inbuf )  >= byteCount );
1146
1147    switch( io->encryption_type )
1148    {
1149        case PEER_ENCRYPTION_NONE:
1150            evbuffer_remove( inbuf, bytes, byteCount );
1151            break;
1152
1153        case PEER_ENCRYPTION_RC4:
1154            evbuffer_remove( inbuf, bytes, byteCount );
1155            tr_cryptoDecrypt( &io->crypto, byteCount, bytes, bytes );
1156            break;
1157
1158        default:
1159            assert( 0 );
1160    }
1161}
1162
1163void
1164tr_peerIoReadUint16( tr_peerIo        * io,
1165                     struct evbuffer  * inbuf,
1166                     uint16_t         * setme )
1167{
1168    uint16_t tmp;
1169    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1170    *setme = ntohs( tmp );
1171}
1172
1173void tr_peerIoReadUint32( tr_peerIo        * io,
1174                          struct evbuffer  * inbuf,
1175                          uint32_t         * setme )
1176{
1177    uint32_t tmp;
1178    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1179    *setme = ntohl( tmp );
1180}
1181
1182void
1183tr_peerIoDrain( tr_peerIo       * io,
1184                struct evbuffer * inbuf,
1185                size_t            byteCount )
1186{
1187    char buf[4096];
1188    const size_t buflen = sizeof( buf );
1189
1190    while( byteCount > 0 )
1191    {
1192        const size_t thisPass = MIN( byteCount, buflen );
1193        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1194        byteCount -= thisPass;
1195    }
1196}
1197
1198/***
1199****
1200***/
1201
1202static int
1203tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1204{
1205    int res = 0;
1206
1207    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1208    {
1209        if( io->utp_socket != NULL ) /* utp peer connection */
1210        {
1211            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1212             * It opens up the congestion window by sending an ACK (soonish)
1213             * if one was not going to be sent. */
1214            if( evbuffer_get_length( io->inbuf ) == 0 )
1215                UTP_RBDrained( io->utp_socket );
1216        }
1217        else /* tcp peer connection */
1218        {
1219            int e;
1220
1221            EVUTIL_SET_SOCKET_ERROR( 0 );
1222            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1223            e = EVUTIL_SOCKET_ERROR( );
1224
1225            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?tr_strerror(e):"") );
1226
1227            if( evbuffer_get_length( io->inbuf ) )
1228                canReadWrapper( io );
1229
1230            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1231            {
1232                char errstr[512];
1233                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1234                if( res == 0 )
1235                    what |= BEV_EVENT_EOF;
1236                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)",
1237                        res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
1238                io->gotError( io, what, io->userData );
1239            }
1240        }
1241    }
1242
1243    return res;
1244}
1245
1246static int
1247tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1248{
1249    int n = 0;
1250    const size_t old_len = evbuffer_get_length( io->outbuf );
1251    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1252
1253    if( howmuch > old_len )
1254        howmuch = old_len;
1255
1256    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1257    {
1258        if( io->utp_socket != NULL ) /* utp peer connection */
1259        {
1260            const size_t old_len = evbuffer_get_length( io->outbuf );
1261            UTP_Write( io->utp_socket, howmuch );
1262            n = old_len - evbuffer_get_length( io->outbuf );
1263        }
1264        else
1265        {
1266            int e;
1267
1268            EVUTIL_SET_SOCKET_ERROR( 0 );
1269            n = tr_evbuffer_write( io, io->socket, howmuch );
1270            e = EVUTIL_SOCKET_ERROR( );
1271
1272            if( n > 0 )
1273                didWriteWrapper( io, n );
1274
1275            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1276            {
1277                char errstr[512];
1278                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1279
1280                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)",
1281                        n, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
1282
1283                if( io->gotError != NULL )
1284                    io->gotError( io, what, io->userData );
1285            }
1286        }
1287    }
1288
1289    return n;
1290}
1291
1292int
1293tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1294{
1295    int bytesUsed = 0;
1296
1297    assert( tr_isPeerIo( io ) );
1298    assert( tr_isDirection( dir ) );
1299
1300    if( dir == TR_DOWN )
1301        bytesUsed = tr_peerIoTryRead( io, limit );
1302    else
1303        bytesUsed = tr_peerIoTryWrite( io, limit );
1304
1305    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1306    return bytesUsed;
1307}
1308
1309int
1310tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1311{
1312    size_t byteCount = 0;
1313    const struct tr_datatype * it;
1314
1315    /* count up how many bytes are used by non-piece-data messages
1316       at the front of our outbound queue */
1317    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1318        if( it->isPieceData )
1319            break;
1320        else
1321            byteCount += it->length;
1322
1323    return tr_peerIoFlush( io, TR_UP, byteCount );
1324}
Note: See TracBrowser for help on using the repository browser.