source: trunk/libtransmission/peer-io.c @ 12249

Last change on this file since 12249 was 12249, checked in by jordan, 11 years ago

(trunk libT) whoops, remove 4 debugging fprintf()'s from the previous commit

  • Property svn:keywords set to Date Rev Author Id
File size: 32.3 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 12249 2011-03-28 16:33:40Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h>
16
17#include <event2/event.h>
18#include <event2/buffer.h>
19#include <event2/bufferevent.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "session.h"
25#include "bandwidth.h"
26#include "crypto.h"
27#include "list.h"
28#include "net.h"
29#include "peer-common.h" /* MAX_BLOCK_SIZE */
30#include "peer-io.h"
31#include "trevent.h" /* tr_runInEventThread() */
32#include "utils.h"
33
34
35#ifdef WIN32
36 #define EAGAIN       WSAEWOULDBLOCK
37 #define EINTR        WSAEINTR
38 #define EINPROGRESS  WSAEINPROGRESS
39 #define EPIPE        WSAECONNRESET
40#endif
41
42/* The amount of read bufferring that we allow for uTP sockets. */
43
44#define UTP_READ_BUFFER_SIZE (256 * 1024)
45
46static size_t
47guessPacketOverhead( size_t d )
48{
49    /**
50     * http://sd.wareonearth.com/~phil/net/overhead/
51     *
52     * TCP over Ethernet:
53     * Assuming no header compression (e.g. not PPP)
54     * Add 20 IPv4 header or 40 IPv6 header (no options)
55     * Add 20 TCP header
56     * Add 12 bytes optional TCP timestamps
57     * Max TCP Payload data rates over ethernet are thus:
58     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
59     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
60     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
61     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
62     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
63     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
64     */
65    const double assumed_payload_data_rate = 94.0;
66
67    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
68}
69
70/**
71***
72**/
73
74#define dbgmsg( io, ... ) \
75    do { \
76        if( tr_deepLoggingIsActive( ) ) \
77            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
78    } while( 0 )
79
80struct tr_datatype
81{
82    bool    isPieceData;
83    size_t  length;
84};
85
86
87/***
88****
89***/
90
91static void
92didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
93{
94     while( bytes_transferred && tr_isPeerIo( io ) )
95     {
96        struct tr_datatype * next = io->outbuf_datatypes->data;
97
98        const unsigned int payload = MIN( next->length, bytes_transferred );
99        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
100        const unsigned int overhead =
101            io->socket ? guessPacketOverhead( payload ) : 0;
102        const uint64_t now = tr_time_msec( );
103
104        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
105
106        if( overhead > 0 )
107            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
108
109        if( io->didWrite )
110            io->didWrite( io, payload, next->isPieceData, io->userData );
111
112        if( tr_isPeerIo( io ) )
113        {
114            bytes_transferred -= payload;
115            next->length -= payload;
116            if( !next->length ) {
117                tr_list_pop_front( &io->outbuf_datatypes );
118                tr_free( next );
119            }
120        }
121    }
122}
123
124static void
125canReadWrapper( tr_peerIo * io )
126{
127    bool err = 0;
128    bool done = 0;
129    tr_session * session;
130
131    dbgmsg( io, "canRead" );
132
133    tr_peerIoRef( io );
134
135    session = io->session;
136
137    /* try to consume the input buffer */
138    if( io->canRead )
139    {
140        const uint64_t now = tr_time_msec( );
141
142        tr_sessionLock( session );
143
144        while( !done && !err )
145        {
146            size_t piece = 0;
147            const size_t oldLen = evbuffer_get_length( io->inbuf );
148            const int ret = io->canRead( io, io->userData, &piece );
149            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
150            const unsigned int overhead = guessPacketOverhead( used );
151
152            if( piece || (piece!=used) )
153            {
154                if( piece )
155                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now );
156
157                if( used != piece )
158                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now );
159            }
160
161            if( overhead > 0 )
162                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
163
164            switch( ret )
165            {
166                case READ_NOW:
167                    if( evbuffer_get_length( io->inbuf ) )
168                        continue;
169                    done = 1;
170                    break;
171
172                case READ_LATER:
173                    done = 1;
174                    break;
175
176                case READ_ERR:
177                    err = 1;
178                    break;
179            }
180
181            assert( tr_isPeerIo( io ) );
182        }
183
184        tr_sessionUnlock( session );
185    }
186
187    tr_peerIoUnref( io );
188}
189
190static void
191event_read_cb( int fd, short event UNUSED, void * vio )
192{
193    int res;
194    int e;
195    tr_peerIo * io = vio;
196
197    /* Limit the input buffer to 256K, so it doesn't grow too large */
198    unsigned int howmuch;
199    unsigned int curlen;
200    const tr_direction dir = TR_DOWN;
201    const unsigned int max = 256 * 1024;
202
203    assert( tr_isPeerIo( io ) );
204    assert( io->socket >= 0 );
205
206    io->pendingEvents &= ~EV_READ;
207
208    curlen = evbuffer_get_length( io->inbuf );
209    howmuch = curlen >= max ? 0 : max - curlen;
210    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
211
212    dbgmsg( io, "libevent says this peer is ready to read" );
213
214    /* if we don't have any bandwidth left, stop reading */
215    if( howmuch < 1 ) {
216        tr_peerIoSetEnabled( io, dir, false );
217        return;
218    }
219
220    EVUTIL_SET_SOCKET_ERROR( 0 );
221    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
222    e = EVUTIL_SOCKET_ERROR( );
223
224    if( res > 0 )
225    {
226        tr_peerIoSetEnabled( io, dir, true );
227
228        /* Invoke the user callback - must always be called last */
229        canReadWrapper( io );
230    }
231    else
232    {
233        char errstr[512];
234        short what = BEV_EVENT_READING;
235
236        if( res == 0 ) /* EOF */
237            what |= BEV_EVENT_EOF;
238        else if( res == -1 ) {
239            if( e == EAGAIN || e == EINTR ) {
240                tr_peerIoSetEnabled( io, dir, true );
241                return;
242            }
243            what |= BEV_EVENT_ERROR;
244        }
245
246        tr_net_strerror( errstr, sizeof( errstr ), e );
247        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
248
249        if( io->gotError != NULL )
250            io->gotError( io, what, io->userData );
251    }
252}
253
254static int
255tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
256{
257    int e;
258    int n;
259    char errstr[256];
260
261    EVUTIL_SET_SOCKET_ERROR( 0 );
262    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
263    e = EVUTIL_SOCKET_ERROR( );
264    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
265
266    return n;
267}
268
269static void
270event_write_cb( int fd, short event UNUSED, void * vio )
271{
272    int res = 0;
273    int e;
274    short what = BEV_EVENT_WRITING;
275    tr_peerIo * io = vio;
276    size_t howmuch;
277    const tr_direction dir = TR_UP;
278
279    assert( tr_isPeerIo( io ) );
280    assert( io->socket >= 0 );
281
282    io->pendingEvents &= ~EV_WRITE;
283
284    dbgmsg( io, "libevent says this peer is ready to write" );
285
286    /* Write as much as possible, since the socket is non-blocking, write() will
287     * return if it can't write any more data without blocking */
288    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
289
290    /* if we don't have any bandwidth left, stop writing */
291    if( howmuch < 1 ) {
292        tr_peerIoSetEnabled( io, dir, false );
293        return;
294    }
295
296    EVUTIL_SET_SOCKET_ERROR( 0 );
297    res = tr_evbuffer_write( io, fd, howmuch );
298    e = EVUTIL_SOCKET_ERROR( );
299
300    if (res == -1) {
301        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
302            goto reschedule;
303        /* error case */
304        what |= BEV_EVENT_ERROR;
305    } else if (res == 0) {
306        /* eof case */
307        what |= BEV_EVENT_EOF;
308    }
309    if (res <= 0)
310        goto error;
311
312    if( evbuffer_get_length( io->outbuf ) )
313        tr_peerIoSetEnabled( io, dir, true );
314
315    didWriteWrapper( io, res );
316    return;
317
318 reschedule:
319    if( evbuffer_get_length( io->outbuf ) )
320        tr_peerIoSetEnabled( io, dir, true );
321    return;
322
323 error:
324
325    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
326
327    if( io->gotError != NULL )
328        io->gotError( io, what, io->userData );
329}
330
331/**
332***
333**/
334
335static void
336maybeSetCongestionAlgorithm( int socket, const char * algorithm )
337{
338    if( algorithm && *algorithm )
339    {
340        const int rc = tr_netSetCongestionControl( socket, algorithm );
341
342        if( rc < 0 )
343            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
344                     algorithm, tr_strerror( errno ));
345    }
346}
347
348#ifdef WITH_UTP
349/* UTP callbacks */
350
351static void
352utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
353{
354    int rc;
355    tr_peerIo *io = closure;
356    assert( tr_isPeerIo( io ) );
357
358    rc = evbuffer_add( io->inbuf, buf, buflen );
359    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
360
361    if( rc < 0 ) {
362        tr_nerr( "UTP", "On read evbuffer_add" );
363        return;
364    }
365
366    tr_peerIoSetEnabled( io, TR_DOWN, true );
367    canReadWrapper( io );
368}
369
370static void
371utp_on_write(void *closure, unsigned char *buf, size_t buflen)
372{
373    int rc;
374    tr_peerIo *io = closure;
375    assert( tr_isPeerIo( io ) );
376
377    rc = evbuffer_remove( io->outbuf, buf, buflen );
378    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
379    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
380    if( rc < (long)buflen ) {
381        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
382    }
383
384    didWriteWrapper( io, buflen );
385}
386
387static size_t
388utp_get_rb_size(void *closure)
389{
390    size_t bytes;
391    tr_peerIo *io = closure;
392    assert( tr_isPeerIo( io ) );
393
394    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
395
396    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
397    return UTP_READ_BUFFER_SIZE - bytes;
398}
399
400static void
401utp_on_state_change(void *closure, int state)
402{
403    tr_peerIo *io = closure;
404    assert( tr_isPeerIo( io ) );
405
406    if( state == UTP_STATE_CONNECT ) {
407        dbgmsg( io, "utp_on_state_change -- changed to connected" );
408        io->utpSupported = true;
409    } else if( state == UTP_STATE_WRITABLE ) {
410        dbgmsg( io, "utp_on_state_change -- changed to writable" );
411    } else if( state == UTP_STATE_EOF ) {
412        if( io->gotError )
413            io->gotError( io, BEV_EVENT_EOF, io->userData );
414    } else if( state == UTP_STATE_DESTROYING ) {
415        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
416        return;
417    } else {
418        tr_nerr( "UTP", "Unknown state %d", state );
419    }
420}
421
422static void
423utp_on_error(void *closure, int errcode)
424{
425    tr_peerIo *io = closure;
426    assert( tr_isPeerIo( io ) );
427
428    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
429
430    if( io->gotError ) {
431        errno = errcode;
432        io->gotError( io, BEV_EVENT_ERROR, io->userData );
433    }
434}
435
436static void
437utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
438{
439    tr_peerIo *io = closure;
440    assert( tr_isPeerIo( io ) );
441
442    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
443
444    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
445                      count, false, tr_time_msec() );
446}
447
448static struct UTPFunctionTable utp_function_table = {
449    .on_read = utp_on_read,
450    .on_write = utp_on_write,
451    .get_rb_size = utp_get_rb_size,
452    .on_state = utp_on_state_change,
453    .on_error = utp_on_error,
454    .on_overhead = utp_on_overhead
455};
456
457
458/* Dummy UTP callbacks. */
459/* We switch a UTP socket to use these after the associated peerIo has been
460   destroyed -- see io_dtor. */
461
462static void
463dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
464{
465    /* This cannot happen, as far as I'm aware. */
466    tr_nerr( "UTP", "On_read called on closed socket" );
467
468}
469
470static void
471dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
472{
473    /* This can very well happen if we've shut down a peer connection that
474       had unflushed buffers.  Complain and send zeroes. */
475    tr_ndbg( "UTP", "On_write called on closed socket" );
476    memset( buf, 0, buflen );
477}
478
479static size_t
480dummy_get_rb_size( void * closure UNUSED )
481{
482    return 0;
483}
484
485static void
486dummy_on_state_change(void * closure UNUSED, int state UNUSED )
487{
488    return;
489}
490
491static void
492dummy_on_error( void * closure UNUSED, int errcode UNUSED )
493{
494    return;
495}
496
497static void
498dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
499{
500    return;
501}
502
503static struct UTPFunctionTable dummy_utp_function_table = {
504    .on_read = dummy_read,
505    .on_write = dummy_write,
506    .get_rb_size = dummy_get_rb_size,
507    .on_state = dummy_on_state_change,
508    .on_error = dummy_on_error,
509    .on_overhead = dummy_on_overhead
510};
511
512#endif /* #ifdef WITH_UTP */
513
514static tr_peerIo*
515tr_peerIoNew( tr_session       * session,
516              tr_bandwidth     * parent,
517              const tr_address * addr,
518              tr_port            port,
519              const uint8_t    * torrentHash,
520              bool               isIncoming,
521              bool               isSeed,
522              int                socket,
523              struct UTPSocket * utp_socket)
524{
525    tr_peerIo * io;
526
527    assert( session != NULL );
528    assert( session->events != NULL );
529    assert( tr_isBool( isIncoming ) );
530    assert( tr_isBool( isSeed ) );
531    assert( tr_amInEventThread( session ) );
532    assert( (socket < 0) == (utp_socket != NULL) );
533#ifndef WITH_UTP
534    assert( socket >= 0 );
535#endif
536
537    if( socket >= 0 ) {
538        tr_netSetTOS( socket, session->peerSocketTOS );
539        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
540    }
541
542    io = tr_new0( tr_peerIo, 1 );
543    io->magicNumber = PEER_IO_MAGIC_NUMBER;
544    io->refCount = 1;
545    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
546    io->session = session;
547    io->addr = *addr;
548    io->isSeed = isSeed;
549    io->port = port;
550    io->socket = socket;
551    io->utp_socket = utp_socket;
552    io->isIncoming = isIncoming != 0;
553    io->timeCreated = tr_time( );
554    io->inbuf = evbuffer_new( );
555    io->outbuf = evbuffer_new( );
556    tr_bandwidthConstruct( &io->bandwidth, session, parent );
557    tr_bandwidthSetPeer( &io->bandwidth, io );
558    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
559    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
560
561    if( io->socket >= 0 ) {
562        io->event_read = event_new( session->event_base,
563                                    io->socket, EV_READ, event_read_cb, io );
564        io->event_write = event_new( session->event_base,
565                                     io->socket, EV_WRITE, event_write_cb, io );
566    }
567#ifdef WITH_UTP
568    else {
569        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
570        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
571        UTP_SetCallbacks( utp_socket,
572                          &utp_function_table,
573                          io );
574        if( !isIncoming ) {
575            dbgmsg( io, "%s", "calling UTP_Connect" );
576            UTP_Connect( utp_socket );
577        }
578    }
579#endif
580
581    return io;
582}
583
584tr_peerIo*
585tr_peerIoNewIncoming( tr_session        * session,
586                      tr_bandwidth      * parent,
587                      const tr_address  * addr,
588                      tr_port             port,
589                      int                 fd,
590                      struct UTPSocket  * utp_socket )
591{
592    assert( session );
593    assert( tr_address_is_valid( addr ) );
594
595    return tr_peerIoNew( session, parent, addr, port, NULL, true, false,
596                         fd, utp_socket );
597}
598
599tr_peerIo*
600tr_peerIoNewOutgoing( tr_session        * session,
601                      tr_bandwidth      * parent,
602                      const tr_address  * addr,
603                      tr_port             port,
604                      const uint8_t     * torrentHash,
605                      bool                isSeed,
606                      bool                utp )
607{
608    int fd = -1;
609    struct UTPSocket * utp_socket = NULL;
610
611    assert( session );
612    assert( tr_address_is_valid( addr ) );
613    assert( torrentHash );
614
615    if( utp )
616        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
617
618    if( !utp_socket ) {
619        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
620        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
621    }
622
623    if( fd < 0 && utp_socket == NULL )
624        return NULL;
625
626    return tr_peerIoNew( session, parent, addr, port,
627                         torrentHash, false, isSeed, fd, utp_socket );
628}
629
630/***
631****
632***/
633
634static void
635event_enable( tr_peerIo * io, short event )
636{
637    assert( tr_amInEventThread( io->session ) );
638    assert( io->session != NULL );
639    assert( io->session->events != NULL );
640
641    if( io->socket < 0 )
642        return;
643
644    assert( io->session->events != NULL );
645    assert( event_initialized( io->event_read ) );
646    assert( event_initialized( io->event_write ) );
647
648    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
649    {
650        dbgmsg( io, "enabling libevent ready-to-read polling" );
651        event_add( io->event_read, NULL );
652        io->pendingEvents |= EV_READ;
653    }
654
655    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
656    {
657        dbgmsg( io, "enabling libevent ready-to-write polling" );
658        event_add( io->event_write, NULL );
659        io->pendingEvents |= EV_WRITE;
660    }
661}
662
663static void
664event_disable( struct tr_peerIo * io, short event )
665{
666    assert( tr_amInEventThread( io->session ) );
667    assert( io->session != NULL );
668
669    if( io->socket < 0 )
670        return;
671
672    assert( io->session->events != NULL );
673    assert( event_initialized( io->event_read ) );
674    assert( event_initialized( io->event_write ) );
675
676    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
677    {
678        dbgmsg( io, "disabling libevent ready-to-read polling" );
679        event_del( io->event_read );
680        io->pendingEvents &= ~EV_READ;
681    }
682
683    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
684    {
685        dbgmsg( io, "disabling libevent ready-to-write polling" );
686        event_del( io->event_write );
687        io->pendingEvents &= ~EV_WRITE;
688    }
689}
690
691void
692tr_peerIoSetEnabled( tr_peerIo    * io,
693                     tr_direction   dir,
694                     bool           isEnabled )
695{
696    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
697
698    assert( tr_isPeerIo( io ) );
699    assert( tr_isDirection( dir ) );
700    assert( tr_amInEventThread( io->session ) );
701    assert( io->session->events != NULL );
702
703    if( isEnabled )
704        event_enable( io, event );
705    else
706        event_disable( io, event );
707}
708
709/***
710****
711***/
712static void
713io_close_socket( tr_peerIo * io )
714{
715    if( io->socket >= 0 ) {
716        tr_netClose( io->session, io->socket );
717        io->socket = -1;
718        event_free( io->event_read );
719        event_free( io->event_write );
720    }
721
722#ifdef WITH_UTP
723    if( io->utp_socket ) {
724        UTP_SetCallbacks( io->utp_socket,
725                          &dummy_utp_function_table,
726                          NULL );
727        UTP_Close( io->utp_socket );
728
729        io->utp_socket = NULL;
730    }
731#endif
732}
733
734static void
735io_dtor( void * vio )
736{
737    tr_peerIo * io = vio;
738
739    assert( tr_isPeerIo( io ) );
740    assert( tr_amInEventThread( io->session ) );
741    assert( io->session->events != NULL );
742
743    dbgmsg( io, "in tr_peerIo destructor" );
744    event_disable( io, EV_READ | EV_WRITE );
745    tr_bandwidthDestruct( &io->bandwidth );
746    evbuffer_free( io->outbuf );
747    evbuffer_free( io->inbuf );
748    io_close_socket( io );
749    tr_cryptoFree( io->crypto );
750    tr_list_free( &io->outbuf_datatypes, tr_free );
751
752    memset( io, ~0, sizeof( tr_peerIo ) );
753    tr_free( io );
754}
755
756static void
757tr_peerIoFree( tr_peerIo * io )
758{
759    if( io )
760    {
761        dbgmsg( io, "in tr_peerIoFree" );
762        io->canRead = NULL;
763        io->didWrite = NULL;
764        io->gotError = NULL;
765        tr_runInEventThread( io->session, io_dtor, io );
766    }
767}
768
769void
770tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
771{
772    assert( tr_isPeerIo( io ) );
773
774    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
775                file, line, io->refCount, io->refCount+1 );
776
777    ++io->refCount;
778}
779
780void
781tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
782{
783    assert( tr_isPeerIo( io ) );
784
785    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
786                file, line, io->refCount, io->refCount-1 );
787
788    if( !--io->refCount )
789        tr_peerIoFree( io );
790}
791
792const tr_address*
793tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
794{
795    assert( tr_isPeerIo( io ) );
796
797    if( port )
798        *port = io->port;
799
800    return &io->addr;
801}
802
803const char*
804tr_peerIoAddrStr( const tr_address * addr, tr_port port )
805{
806    static char buf[512];
807    tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_address_to_string( addr ), ntohs( port ) );
808    return buf;
809}
810
811const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
812{
813    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
814}
815
816void
817tr_peerIoSetIOFuncs( tr_peerIo        * io,
818                     tr_can_read_cb     readcb,
819                     tr_did_write_cb    writecb,
820                     tr_net_error_cb    errcb,
821                     void             * userData )
822{
823    io->canRead = readcb;
824    io->didWrite = writecb;
825    io->gotError = errcb;
826    io->userData = userData;
827}
828
829void
830tr_peerIoClear( tr_peerIo * io )
831{
832    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
833    tr_peerIoSetEnabled( io, TR_UP, false );
834    tr_peerIoSetEnabled( io, TR_DOWN, false );
835}
836
837int
838tr_peerIoReconnect( tr_peerIo * io )
839{
840    short int pendingEvents;
841    tr_session * session;
842
843    assert( tr_isPeerIo( io ) );
844    assert( !tr_peerIoIsIncoming( io ) );
845
846    session = tr_peerIoGetSession( io );
847
848    pendingEvents = io->pendingEvents;
849    event_disable( io, EV_READ | EV_WRITE );
850
851    io_close_socket( io );
852
853    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
854    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
855    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
856
857    if( io->socket >= 0 )
858    {
859        event_enable( io, pendingEvents );
860        tr_netSetTOS( io->socket, session->peerSocketTOS );
861        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
862        return 0;
863    }
864
865    return -1;
866}
867
868/**
869***
870**/
871
872void
873tr_peerIoSetTorrentHash( tr_peerIo *     io,
874                         const uint8_t * hash )
875{
876    assert( tr_isPeerIo( io ) );
877
878    tr_cryptoSetTorrentHash( io->crypto, hash );
879}
880
881const uint8_t*
882tr_peerIoGetTorrentHash( tr_peerIo * io )
883{
884    assert( tr_isPeerIo( io ) );
885    assert( io->crypto );
886
887    return tr_cryptoGetTorrentHash( io->crypto );
888}
889
890int
891tr_peerIoHasTorrentHash( const tr_peerIo * io )
892{
893    assert( tr_isPeerIo( io ) );
894    assert( io->crypto );
895
896    return tr_cryptoHasTorrentHash( io->crypto );
897}
898
899/**
900***
901**/
902
903void
904tr_peerIoSetPeersId( tr_peerIo *     io,
905                     const uint8_t * peer_id )
906{
907    assert( tr_isPeerIo( io ) );
908
909    if( ( io->peerIdIsSet = peer_id != NULL ) )
910        memcpy( io->peerId, peer_id, 20 );
911    else
912        memset( io->peerId, 0, 20 );
913}
914
915/**
916***
917**/
918
919static unsigned int
920getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
921{
922    /* this is all kind of arbitrary, but what seems to work well is
923     * being large enough to hold the next 20 seconds' worth of input,
924     * or a few blocks, whichever is bigger.
925     * It's okay to tweak this as needed */
926    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
927    const unsigned int period = 15u; /* arbitrary */
928    /* the 3 is arbitrary; the .5 is to leave room for messages */
929    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
930    return MAX( ceiling, currentSpeed_Bps*period );
931}
932
933size_t
934tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
935{
936    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
937    const size_t currentLen = evbuffer_get_length( io->outbuf );
938    size_t freeSpace = 0;
939
940    if( desiredLen > currentLen )
941        freeSpace = desiredLen - currentLen;
942
943    return freeSpace;
944}
945
946/**
947***
948**/
949
950void
951tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
952{
953    assert( tr_isPeerIo( io ) );
954    assert( encryptionMode == PEER_ENCRYPTION_NONE
955         || encryptionMode == PEER_ENCRYPTION_RC4 );
956
957    io->encryptionMode = encryptionMode;
958}
959
960/**
961***
962**/
963
964static void
965addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData )
966{
967    struct tr_datatype * d;
968
969    d = tr_new( struct tr_datatype, 1 );
970    d->isPieceData = isPieceData != 0;
971    d->length = byteCount;
972    tr_list_append( &io->outbuf_datatypes, d );
973}
974
975static struct evbuffer_iovec *
976evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
977{
978    const size_t byteCount = evbuffer_get_length( buf );
979    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
980    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
981    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
982    assert( n == vecCount );
983    *setme_vecCount = n;
984    return iovec;
985}
986
987static void
988maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
989{
990    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
991    {
992        size_t i, n;
993        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
994
995        for( i=0; i<n; ++i )
996            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
997
998        tr_free( iovec );
999    }
1000}
1001
1002void
1003tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData )
1004{
1005    const size_t byteCount = evbuffer_get_length( buf );
1006    maybeEncryptBuffer( io, buf );
1007    evbuffer_add_buffer( io->outbuf, buf );
1008    addDatatype( io, byteCount, isPieceData );
1009}
1010
1011void
1012tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData )
1013{
1014    struct evbuffer * buf = evbuffer_new( );
1015    evbuffer_add( buf, bytes, byteCount );
1016    tr_peerIoWriteBuf( io, buf, isPieceData );
1017    evbuffer_free( buf );
1018}
1019
1020/***
1021****
1022***/
1023
1024void
1025evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1026{
1027    evbuffer_add( outbuf, &byte, 1 );
1028}
1029
1030void
1031evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1032{
1033    const uint16_t ns = htons( addme_hs );
1034    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1035}
1036
1037void
1038evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1039{
1040    const uint32_t nl = htonl( addme_hl );
1041    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1042}
1043
1044void
1045evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll )
1046{
1047    const uint64_t nll = tr_htonll( addme_hll );
1048    evbuffer_add( outbuf, &nll, sizeof( nll ) );
1049}
1050
1051/***
1052****
1053***/
1054
1055void
1056tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1057{
1058    assert( tr_isPeerIo( io ) );
1059    assert( evbuffer_get_length( inbuf )  >= byteCount );
1060
1061    switch( io->encryptionMode )
1062    {
1063        case PEER_ENCRYPTION_NONE:
1064            evbuffer_remove( inbuf, bytes, byteCount );
1065            break;
1066
1067        case PEER_ENCRYPTION_RC4:
1068            evbuffer_remove( inbuf, bytes, byteCount );
1069            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1070            break;
1071
1072        default:
1073            assert( 0 );
1074    }
1075}
1076
1077void
1078tr_peerIoReadUint16( tr_peerIo        * io,
1079                     struct evbuffer  * inbuf,
1080                     uint16_t         * setme )
1081{
1082    uint16_t tmp;
1083    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1084    *setme = ntohs( tmp );
1085}
1086
1087void tr_peerIoReadUint32( tr_peerIo        * io,
1088                          struct evbuffer  * inbuf,
1089                          uint32_t         * setme )
1090{
1091    uint32_t tmp;
1092    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1093    *setme = ntohl( tmp );
1094}
1095
1096void
1097tr_peerIoDrain( tr_peerIo       * io,
1098                struct evbuffer * inbuf,
1099                size_t            byteCount )
1100{
1101    void * buf = tr_sessionGetBuffer( io->session );
1102    const size_t buflen = SESSION_BUFFER_SIZE;
1103
1104    while( byteCount > 0 )
1105    {
1106        const size_t thisPass = MIN( byteCount, buflen );
1107        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1108        byteCount -= thisPass;
1109    }
1110
1111    tr_sessionReleaseBuffer( io->session );
1112}
1113
1114/***
1115****
1116***/
1117
1118static int
1119tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1120{
1121    int res = 0;
1122
1123    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1124    {
1125        if( io->utp_socket != NULL ) /* utp peer connection */
1126        {
1127            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1128             * It opens up the congestion window by sending an ACK (soonish)
1129             * if one was not going to be sent. */
1130            if( evbuffer_get_length( io->inbuf ) == 0 )
1131                UTP_RBDrained( io->utp_socket );
1132        }
1133        else /* tcp peer connection */
1134        {
1135            int e;
1136
1137            EVUTIL_SET_SOCKET_ERROR( 0 );
1138            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1139            e = EVUTIL_SOCKET_ERROR( );
1140
1141            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1142
1143            if( evbuffer_get_length( io->inbuf ) )
1144                canReadWrapper( io );
1145
1146            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1147            {
1148                char errstr[512];
1149                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1150                if( res == 0 )
1151                    what |= BEV_EVENT_EOF;
1152                tr_net_strerror( errstr, sizeof( errstr ), e );
1153                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1154                io->gotError( io, what, io->userData );
1155            }
1156        }
1157    }
1158
1159    return res;
1160}
1161
1162static int
1163tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1164{
1165    int n = 0;
1166    const size_t old_len = evbuffer_get_length( io->outbuf );
1167    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1168
1169    if( howmuch > old_len )
1170        howmuch = old_len;
1171
1172    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1173    {
1174        if( io->utp_socket != NULL ) /* utp peer connection */
1175        {
1176            const size_t old_len = evbuffer_get_length( io->outbuf );
1177            UTP_Write( io->utp_socket, howmuch );
1178            n = old_len - evbuffer_get_length( io->outbuf );
1179        }
1180        else
1181        {
1182            int e;
1183
1184            EVUTIL_SET_SOCKET_ERROR( 0 );
1185            n = tr_evbuffer_write( io, io->socket, howmuch );
1186            e = EVUTIL_SOCKET_ERROR( );
1187
1188            if( n > 0 )
1189                didWriteWrapper( io, n );
1190
1191            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1192            {
1193                char errstr[512];
1194                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1195
1196                tr_net_strerror( errstr, sizeof( errstr ), e );
1197                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1198
1199                if( io->gotError != NULL )
1200                    io->gotError( io, what, io->userData );
1201            }
1202        }
1203    }
1204
1205    return n;
1206}
1207
1208int
1209tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1210{
1211    int bytesUsed = 0;
1212
1213    assert( tr_isPeerIo( io ) );
1214    assert( tr_isDirection( dir ) );
1215
1216    if( dir == TR_DOWN )
1217        bytesUsed = tr_peerIoTryRead( io, limit );
1218    else
1219        bytesUsed = tr_peerIoTryWrite( io, limit );
1220
1221    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1222    return bytesUsed;
1223}
1224
1225int
1226tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1227{
1228    size_t byteCount = 0;
1229    tr_list * it;
1230
1231    /* count up how many bytes are used by non-piece-data messages
1232       at the front of our outbound queue */
1233    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1234    {
1235        struct tr_datatype * d = it->data;
1236
1237        if( d->isPieceData )
1238            break;
1239
1240        byteCount += d->length;
1241    }
1242
1243    return tr_peerIoFlush( io, TR_UP, byteCount );
1244}
Note: See TracBrowser for help on using the repository browser.