source: trunk/libtransmission/peer-io.c @ 12248

Last change on this file since 12248 was 12248, checked in by jordan, 11 years ago

(trunk libT) break the mac build and introduce new crashes.

This is partially to address #4145 "Downloads stuck at 100%" by refactoring the bitset, bitfield, and tr_completion; however, the ripple effect is larger than usual so things may get worse in the short term before getting better.

livings124: to fix the mac build, remove bitset.[ch] from xcode

  • Property svn:keywords set to Date Rev Author Id
File size: 32.6 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 12248 2011-03-28 16:31:05Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h>
16
17#include <event2/event.h>
18#include <event2/buffer.h>
19#include <event2/bufferevent.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "session.h"
25#include "bandwidth.h"
26#include "crypto.h"
27#include "list.h"
28#include "net.h"
29#include "peer-common.h" /* MAX_BLOCK_SIZE */
30#include "peer-io.h"
31#include "trevent.h" /* tr_runInEventThread() */
32#include "utils.h"
33
34
35#ifdef WIN32
36 #define EAGAIN       WSAEWOULDBLOCK
37 #define EINTR        WSAEINTR
38 #define EINPROGRESS  WSAEINPROGRESS
39 #define EPIPE        WSAECONNRESET
40#endif
41
42/* The amount of read bufferring that we allow for uTP sockets. */
43
44#define UTP_READ_BUFFER_SIZE (256 * 1024)
45
46static size_t
47guessPacketOverhead( size_t d )
48{
49    /**
50     * http://sd.wareonearth.com/~phil/net/overhead/
51     *
52     * TCP over Ethernet:
53     * Assuming no header compression (e.g. not PPP)
54     * Add 20 IPv4 header or 40 IPv6 header (no options)
55     * Add 20 TCP header
56     * Add 12 bytes optional TCP timestamps
57     * Max TCP Payload data rates over ethernet are thus:
58     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
59     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
60     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
61     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
62     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
63     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
64     */
65    const double assumed_payload_data_rate = 94.0;
66
67    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
68}
69
70/**
71***
72**/
73
74#define dbgmsg( io, ... ) \
75    do { \
76        if( tr_deepLoggingIsActive( ) ) \
77            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
78    } while( 0 )
79
80struct tr_datatype
81{
82    bool    isPieceData;
83    size_t  length;
84};
85
86
87/***
88****
89***/
90
91static void
92didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
93{
94     while( bytes_transferred && tr_isPeerIo( io ) )
95     {
96        struct tr_datatype * next = io->outbuf_datatypes->data;
97
98        const unsigned int payload = MIN( next->length, bytes_transferred );
99        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
100        const unsigned int overhead =
101            io->socket ? guessPacketOverhead( payload ) : 0;
102        const uint64_t now = tr_time_msec( );
103
104        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
105
106        if( overhead > 0 )
107            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
108
109        if( io->didWrite )
110            io->didWrite( io, payload, next->isPieceData, io->userData );
111
112        if( tr_isPeerIo( io ) )
113        {
114            bytes_transferred -= payload;
115            next->length -= payload;
116            if( !next->length ) {
117                tr_list_pop_front( &io->outbuf_datatypes );
118                tr_free( next );
119            }
120        }
121    }
122}
123
124static void
125canReadWrapper( tr_peerIo * io )
126{
127    bool err = 0;
128    bool done = 0;
129    tr_session * session;
130
131    dbgmsg( io, "canRead" );
132
133fprintf( stderr, "[%p] in canRead; refcount is %d (%s:%d)\n", io, io->refCount, __FILE__, __LINE__ );
134
135    tr_peerIoRef( io );
136
137    session = io->session;
138
139    /* try to consume the input buffer */
140    if( io->canRead )
141    {
142        const uint64_t now = tr_time_msec( );
143
144        tr_sessionLock( session );
145
146        while( !done && !err )
147        {
148            size_t piece = 0;
149            const size_t oldLen = evbuffer_get_length( io->inbuf );
150            const int ret = io->canRead( io, io->userData, &piece );
151            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
152            const unsigned int overhead = guessPacketOverhead( used );
153
154            if( piece || (piece!=used) )
155            {
156                if( piece )
157                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now );
158
159                if( used != piece )
160                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now );
161            }
162
163            if( overhead > 0 )
164                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
165
166            switch( ret )
167            {
168                case READ_NOW:
169                    if( evbuffer_get_length( io->inbuf ) )
170                        continue;
171                    done = 1;
172                    break;
173
174                case READ_LATER:
175                    done = 1;
176                    break;
177
178                case READ_ERR:
179                    err = 1;
180                    break;
181            }
182
183            assert( tr_isPeerIo( io ) );
184        }
185
186        tr_sessionUnlock( session );
187    }
188
189    tr_peerIoUnref( io );
190}
191
192static void
193event_read_cb( int fd, short event UNUSED, void * vio )
194{
195    int res;
196    int e;
197    tr_peerIo * io = vio;
198
199    /* Limit the input buffer to 256K, so it doesn't grow too large */
200    unsigned int howmuch;
201    unsigned int curlen;
202    const tr_direction dir = TR_DOWN;
203    const unsigned int max = 256 * 1024;
204
205    assert( tr_isPeerIo( io ) );
206    assert( io->socket >= 0 );
207
208    io->pendingEvents &= ~EV_READ;
209
210    curlen = evbuffer_get_length( io->inbuf );
211    howmuch = curlen >= max ? 0 : max - curlen;
212    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
213
214    dbgmsg( io, "libevent says this peer is ready to read" );
215
216    /* if we don't have any bandwidth left, stop reading */
217    if( howmuch < 1 ) {
218        tr_peerIoSetEnabled( io, dir, false );
219        return;
220    }
221
222    EVUTIL_SET_SOCKET_ERROR( 0 );
223    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
224    e = EVUTIL_SOCKET_ERROR( );
225
226    if( res > 0 )
227    {
228        tr_peerIoSetEnabled( io, dir, true );
229
230        /* Invoke the user callback - must always be called last */
231        canReadWrapper( io );
232    }
233    else
234    {
235        char errstr[512];
236        short what = BEV_EVENT_READING;
237
238        if( res == 0 ) /* EOF */
239            what |= BEV_EVENT_EOF;
240        else if( res == -1 ) {
241            if( e == EAGAIN || e == EINTR ) {
242                tr_peerIoSetEnabled( io, dir, true );
243                return;
244            }
245            what |= BEV_EVENT_ERROR;
246        }
247
248        tr_net_strerror( errstr, sizeof( errstr ), e );
249        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
250
251        if( io->gotError != NULL )
252            io->gotError( io, what, io->userData );
253    }
254}
255
256static int
257tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
258{
259    int e;
260    int n;
261    char errstr[256];
262
263    EVUTIL_SET_SOCKET_ERROR( 0 );
264    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
265    e = EVUTIL_SOCKET_ERROR( );
266    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
267
268    return n;
269}
270
271static void
272event_write_cb( int fd, short event UNUSED, void * vio )
273{
274    int res = 0;
275    int e;
276    short what = BEV_EVENT_WRITING;
277    tr_peerIo * io = vio;
278    size_t howmuch;
279    const tr_direction dir = TR_UP;
280
281    assert( tr_isPeerIo( io ) );
282    assert( io->socket >= 0 );
283
284    io->pendingEvents &= ~EV_WRITE;
285
286    dbgmsg( io, "libevent says this peer is ready to write" );
287
288    /* Write as much as possible, since the socket is non-blocking, write() will
289     * return if it can't write any more data without blocking */
290    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
291
292    /* if we don't have any bandwidth left, stop writing */
293    if( howmuch < 1 ) {
294        tr_peerIoSetEnabled( io, dir, false );
295        return;
296    }
297
298    EVUTIL_SET_SOCKET_ERROR( 0 );
299    res = tr_evbuffer_write( io, fd, howmuch );
300    e = EVUTIL_SOCKET_ERROR( );
301
302    if (res == -1) {
303        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
304            goto reschedule;
305        /* error case */
306        what |= BEV_EVENT_ERROR;
307    } else if (res == 0) {
308        /* eof case */
309        what |= BEV_EVENT_EOF;
310    }
311    if (res <= 0)
312        goto error;
313
314    if( evbuffer_get_length( io->outbuf ) )
315        tr_peerIoSetEnabled( io, dir, true );
316
317    didWriteWrapper( io, res );
318    return;
319
320 reschedule:
321    if( evbuffer_get_length( io->outbuf ) )
322        tr_peerIoSetEnabled( io, dir, true );
323    return;
324
325 error:
326
327    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
328
329    if( io->gotError != NULL )
330        io->gotError( io, what, io->userData );
331}
332
333/**
334***
335**/
336
337static void
338maybeSetCongestionAlgorithm( int socket, const char * algorithm )
339{
340    if( algorithm && *algorithm )
341    {
342        const int rc = tr_netSetCongestionControl( socket, algorithm );
343
344        if( rc < 0 )
345            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
346                     algorithm, tr_strerror( errno ));
347    }
348}
349
350#ifdef WITH_UTP
351/* UTP callbacks */
352
353static void
354utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
355{
356    int rc;
357    tr_peerIo *io = closure;
358    assert( tr_isPeerIo( io ) );
359
360    rc = evbuffer_add( io->inbuf, buf, buflen );
361    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
362
363    if( rc < 0 ) {
364        tr_nerr( "UTP", "On read evbuffer_add" );
365        return;
366    }
367
368    tr_peerIoSetEnabled( io, TR_DOWN, true );
369    canReadWrapper( io );
370}
371
372static void
373utp_on_write(void *closure, unsigned char *buf, size_t buflen)
374{
375    int rc;
376    tr_peerIo *io = closure;
377    assert( tr_isPeerIo( io ) );
378
379    rc = evbuffer_remove( io->outbuf, buf, buflen );
380    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
381    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
382    if( rc < (long)buflen ) {
383        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
384    }
385
386    didWriteWrapper( io, buflen );
387}
388
389static size_t
390utp_get_rb_size(void *closure)
391{
392    size_t bytes;
393    tr_peerIo *io = closure;
394    assert( tr_isPeerIo( io ) );
395
396    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
397
398    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
399    return UTP_READ_BUFFER_SIZE - bytes;
400}
401
402static void
403utp_on_state_change(void *closure, int state)
404{
405    tr_peerIo *io = closure;
406    assert( tr_isPeerIo( io ) );
407
408    if( state == UTP_STATE_CONNECT ) {
409        dbgmsg( io, "utp_on_state_change -- changed to connected" );
410        io->utpSupported = true;
411    } else if( state == UTP_STATE_WRITABLE ) {
412        dbgmsg( io, "utp_on_state_change -- changed to writable" );
413    } else if( state == UTP_STATE_EOF ) {
414        if( io->gotError )
415            io->gotError( io, BEV_EVENT_EOF, io->userData );
416    } else if( state == UTP_STATE_DESTROYING ) {
417        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
418        return;
419    } else {
420        tr_nerr( "UTP", "Unknown state %d", state );
421    }
422}
423
424static void
425utp_on_error(void *closure, int errcode)
426{
427    tr_peerIo *io = closure;
428    assert( tr_isPeerIo( io ) );
429
430    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
431
432    if( io->gotError ) {
433        errno = errcode;
434        io->gotError( io, BEV_EVENT_ERROR, io->userData );
435    }
436}
437
438static void
439utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
440{
441    tr_peerIo *io = closure;
442    assert( tr_isPeerIo( io ) );
443
444    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
445
446    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
447                      count, false, tr_time_msec() );
448}
449
450static struct UTPFunctionTable utp_function_table = {
451    .on_read = utp_on_read,
452    .on_write = utp_on_write,
453    .get_rb_size = utp_get_rb_size,
454    .on_state = utp_on_state_change,
455    .on_error = utp_on_error,
456    .on_overhead = utp_on_overhead
457};
458
459
460/* Dummy UTP callbacks. */
461/* We switch a UTP socket to use these after the associated peerIo has been
462   destroyed -- see io_dtor. */
463
464static void
465dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
466{
467    /* This cannot happen, as far as I'm aware. */
468    tr_nerr( "UTP", "On_read called on closed socket" );
469
470}
471
472static void
473dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
474{
475    /* This can very well happen if we've shut down a peer connection that
476       had unflushed buffers.  Complain and send zeroes. */
477    tr_ndbg( "UTP", "On_write called on closed socket" );
478    memset( buf, 0, buflen );
479}
480
481static size_t
482dummy_get_rb_size( void * closure UNUSED )
483{
484    return 0;
485}
486
487static void
488dummy_on_state_change(void * closure UNUSED, int state UNUSED )
489{
490    return;
491}
492
493static void
494dummy_on_error( void * closure UNUSED, int errcode UNUSED )
495{
496    return;
497}
498
499static void
500dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
501{
502    return;
503}
504
505static struct UTPFunctionTable dummy_utp_function_table = {
506    .on_read = dummy_read,
507    .on_write = dummy_write,
508    .get_rb_size = dummy_get_rb_size,
509    .on_state = dummy_on_state_change,
510    .on_error = dummy_on_error,
511    .on_overhead = dummy_on_overhead
512};
513
514#endif /* #ifdef WITH_UTP */
515
516static tr_peerIo*
517tr_peerIoNew( tr_session       * session,
518              tr_bandwidth     * parent,
519              const tr_address * addr,
520              tr_port            port,
521              const uint8_t    * torrentHash,
522              bool               isIncoming,
523              bool               isSeed,
524              int                socket,
525              struct UTPSocket * utp_socket)
526{
527    tr_peerIo * io;
528
529    assert( session != NULL );
530    assert( session->events != NULL );
531    assert( tr_isBool( isIncoming ) );
532    assert( tr_isBool( isSeed ) );
533    assert( tr_amInEventThread( session ) );
534    assert( (socket < 0) == (utp_socket != NULL) );
535#ifndef WITH_UTP
536    assert( socket >= 0 );
537#endif
538
539    if( socket >= 0 ) {
540        tr_netSetTOS( socket, session->peerSocketTOS );
541        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
542    }
543
544    io = tr_new0( tr_peerIo, 1 );
545    io->magicNumber = PEER_IO_MAGIC_NUMBER;
546    io->refCount = 1;
547    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
548    io->session = session;
549    io->addr = *addr;
550    io->isSeed = isSeed;
551    io->port = port;
552    io->socket = socket;
553    io->utp_socket = utp_socket;
554    io->isIncoming = isIncoming != 0;
555    io->timeCreated = tr_time( );
556    io->inbuf = evbuffer_new( );
557    io->outbuf = evbuffer_new( );
558    tr_bandwidthConstruct( &io->bandwidth, session, parent );
559    tr_bandwidthSetPeer( &io->bandwidth, io );
560    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
561    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
562
563    if( io->socket >= 0 ) {
564        io->event_read = event_new( session->event_base,
565                                    io->socket, EV_READ, event_read_cb, io );
566        io->event_write = event_new( session->event_base,
567                                     io->socket, EV_WRITE, event_write_cb, io );
568    }
569#ifdef WITH_UTP
570    else {
571        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
572        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
573fprintf( stderr, "[%p] setting utp callbacks (%s:%d)\n", io, __FILE__, __LINE__ );
574        UTP_SetCallbacks( utp_socket,
575                          &utp_function_table,
576                          io );
577        if( !isIncoming ) {
578            dbgmsg( io, "%s", "calling UTP_Connect" );
579            UTP_Connect( utp_socket );
580        }
581    }
582#endif
583
584    return io;
585}
586
587tr_peerIo*
588tr_peerIoNewIncoming( tr_session        * session,
589                      tr_bandwidth      * parent,
590                      const tr_address  * addr,
591                      tr_port             port,
592                      int                 fd,
593                      struct UTPSocket  * utp_socket )
594{
595    assert( session );
596    assert( tr_address_is_valid( addr ) );
597
598    return tr_peerIoNew( session, parent, addr, port, NULL, true, false,
599                         fd, utp_socket );
600}
601
602tr_peerIo*
603tr_peerIoNewOutgoing( tr_session        * session,
604                      tr_bandwidth      * parent,
605                      const tr_address  * addr,
606                      tr_port             port,
607                      const uint8_t     * torrentHash,
608                      bool                isSeed,
609                      bool                utp )
610{
611    int fd = -1;
612    struct UTPSocket * utp_socket = NULL;
613
614    assert( session );
615    assert( tr_address_is_valid( addr ) );
616    assert( torrentHash );
617
618    if( utp )
619        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
620
621    if( !utp_socket ) {
622        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
623        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
624    }
625
626    if( fd < 0 && utp_socket == NULL )
627        return NULL;
628
629    return tr_peerIoNew( session, parent, addr, port,
630                         torrentHash, false, isSeed, fd, utp_socket );
631}
632
633/***
634****
635***/
636
637static void
638event_enable( tr_peerIo * io, short event )
639{
640    assert( tr_amInEventThread( io->session ) );
641    assert( io->session != NULL );
642    assert( io->session->events != NULL );
643
644    if( io->socket < 0 )
645        return;
646
647    assert( io->session->events != NULL );
648    assert( event_initialized( io->event_read ) );
649    assert( event_initialized( io->event_write ) );
650
651    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
652    {
653        dbgmsg( io, "enabling libevent ready-to-read polling" );
654        event_add( io->event_read, NULL );
655        io->pendingEvents |= EV_READ;
656    }
657
658    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
659    {
660        dbgmsg( io, "enabling libevent ready-to-write polling" );
661        event_add( io->event_write, NULL );
662        io->pendingEvents |= EV_WRITE;
663    }
664}
665
666static void
667event_disable( struct tr_peerIo * io, short event )
668{
669    assert( tr_amInEventThread( io->session ) );
670    assert( io->session != NULL );
671
672    if( io->socket < 0 )
673        return;
674
675    assert( io->session->events != NULL );
676    assert( event_initialized( io->event_read ) );
677    assert( event_initialized( io->event_write ) );
678
679    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
680    {
681        dbgmsg( io, "disabling libevent ready-to-read polling" );
682        event_del( io->event_read );
683        io->pendingEvents &= ~EV_READ;
684    }
685
686    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
687    {
688        dbgmsg( io, "disabling libevent ready-to-write polling" );
689        event_del( io->event_write );
690        io->pendingEvents &= ~EV_WRITE;
691    }
692}
693
694void
695tr_peerIoSetEnabled( tr_peerIo    * io,
696                     tr_direction   dir,
697                     bool           isEnabled )
698{
699    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
700
701    assert( tr_isPeerIo( io ) );
702    assert( tr_isDirection( dir ) );
703    assert( tr_amInEventThread( io->session ) );
704    assert( io->session->events != NULL );
705
706    if( isEnabled )
707        event_enable( io, event );
708    else
709        event_disable( io, event );
710}
711
712/***
713****
714***/
715static void
716io_close_socket( tr_peerIo * io )
717{
718    if( io->socket >= 0 ) {
719        tr_netClose( io->session, io->socket );
720        io->socket = -1;
721        event_free( io->event_read );
722        event_free( io->event_write );
723    }
724
725#ifdef WITH_UTP
726    if( io->utp_socket ) {
727fprintf( stderr, "[%p] clearing utp callbacks (%s:%d)\n", io, __FILE__, __LINE__ );
728        UTP_SetCallbacks( io->utp_socket,
729                          &dummy_utp_function_table,
730                          NULL );
731        UTP_Close( io->utp_socket );
732
733        io->utp_socket = NULL;
734    }
735#endif
736}
737
738static void
739io_dtor( void * vio )
740{
741    tr_peerIo * io = vio;
742
743    assert( tr_isPeerIo( io ) );
744    assert( tr_amInEventThread( io->session ) );
745    assert( io->session->events != NULL );
746
747    dbgmsg( io, "in tr_peerIo destructor" );
748    event_disable( io, EV_READ | EV_WRITE );
749    tr_bandwidthDestruct( &io->bandwidth );
750    evbuffer_free( io->outbuf );
751    evbuffer_free( io->inbuf );
752    io_close_socket( io );
753    tr_cryptoFree( io->crypto );
754    tr_list_free( &io->outbuf_datatypes, tr_free );
755
756    memset( io, ~0, sizeof( tr_peerIo ) );
757    tr_free( io );
758}
759
760static void
761tr_peerIoFree( tr_peerIo * io )
762{
763    if( io )
764    {
765        dbgmsg( io, "in tr_peerIoFree" );
766        io->canRead = NULL;
767        io->didWrite = NULL;
768        io->gotError = NULL;
769        tr_runInEventThread( io->session, io_dtor, io );
770    }
771}
772
773void
774tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
775{
776    assert( tr_isPeerIo( io ) );
777
778    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
779                file, line, io->refCount, io->refCount+1 );
780
781    ++io->refCount;
782}
783
784void
785tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
786{
787    assert( tr_isPeerIo( io ) );
788
789    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
790                file, line, io->refCount, io->refCount-1 );
791
792    if( !--io->refCount )
793        tr_peerIoFree( io );
794}
795
796const tr_address*
797tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
798{
799    assert( tr_isPeerIo( io ) );
800
801    if( port )
802        *port = io->port;
803
804    return &io->addr;
805}
806
807const char*
808tr_peerIoAddrStr( const tr_address * addr, tr_port port )
809{
810    static char buf[512];
811    tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_address_to_string( addr ), ntohs( port ) );
812    return buf;
813}
814
815const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
816{
817    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
818}
819
820void
821tr_peerIoSetIOFuncs( tr_peerIo        * io,
822                     tr_can_read_cb     readcb,
823                     tr_did_write_cb    writecb,
824                     tr_net_error_cb    errcb,
825                     void             * userData )
826{
827    io->canRead = readcb;
828    io->didWrite = writecb;
829    io->gotError = errcb;
830    io->userData = userData;
831}
832
833void
834tr_peerIoClear( tr_peerIo * io )
835{
836    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
837    tr_peerIoSetEnabled( io, TR_UP, false );
838    tr_peerIoSetEnabled( io, TR_DOWN, false );
839}
840
841int
842tr_peerIoReconnect( tr_peerIo * io )
843{
844    short int pendingEvents;
845    tr_session * session;
846
847    assert( tr_isPeerIo( io ) );
848    assert( !tr_peerIoIsIncoming( io ) );
849
850    session = tr_peerIoGetSession( io );
851
852    pendingEvents = io->pendingEvents;
853    event_disable( io, EV_READ | EV_WRITE );
854
855    io_close_socket( io );
856
857    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
858    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
859    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
860
861    if( io->socket >= 0 )
862    {
863        event_enable( io, pendingEvents );
864        tr_netSetTOS( io->socket, session->peerSocketTOS );
865        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
866        return 0;
867    }
868
869    return -1;
870}
871
872/**
873***
874**/
875
876void
877tr_peerIoSetTorrentHash( tr_peerIo *     io,
878                         const uint8_t * hash )
879{
880    assert( tr_isPeerIo( io ) );
881
882    tr_cryptoSetTorrentHash( io->crypto, hash );
883}
884
885const uint8_t*
886tr_peerIoGetTorrentHash( tr_peerIo * io )
887{
888    assert( tr_isPeerIo( io ) );
889    assert( io->crypto );
890
891    return tr_cryptoGetTorrentHash( io->crypto );
892}
893
894int
895tr_peerIoHasTorrentHash( const tr_peerIo * io )
896{
897    assert( tr_isPeerIo( io ) );
898    assert( io->crypto );
899
900    return tr_cryptoHasTorrentHash( io->crypto );
901}
902
903/**
904***
905**/
906
907void
908tr_peerIoSetPeersId( tr_peerIo *     io,
909                     const uint8_t * peer_id )
910{
911    assert( tr_isPeerIo( io ) );
912
913    if( ( io->peerIdIsSet = peer_id != NULL ) )
914        memcpy( io->peerId, peer_id, 20 );
915    else
916        memset( io->peerId, 0, 20 );
917}
918
919/**
920***
921**/
922
923static unsigned int
924getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
925{
926    /* this is all kind of arbitrary, but what seems to work well is
927     * being large enough to hold the next 20 seconds' worth of input,
928     * or a few blocks, whichever is bigger.
929     * It's okay to tweak this as needed */
930    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
931    const unsigned int period = 15u; /* arbitrary */
932    /* the 3 is arbitrary; the .5 is to leave room for messages */
933    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
934    return MAX( ceiling, currentSpeed_Bps*period );
935}
936
937size_t
938tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
939{
940    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
941    const size_t currentLen = evbuffer_get_length( io->outbuf );
942    size_t freeSpace = 0;
943
944    if( desiredLen > currentLen )
945        freeSpace = desiredLen - currentLen;
946
947    return freeSpace;
948}
949
950/**
951***
952**/
953
954void
955tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
956{
957    assert( tr_isPeerIo( io ) );
958    assert( encryptionMode == PEER_ENCRYPTION_NONE
959         || encryptionMode == PEER_ENCRYPTION_RC4 );
960
961    io->encryptionMode = encryptionMode;
962}
963
964/**
965***
966**/
967
968static void
969addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData )
970{
971    struct tr_datatype * d;
972
973    d = tr_new( struct tr_datatype, 1 );
974    d->isPieceData = isPieceData != 0;
975    d->length = byteCount;
976    tr_list_append( &io->outbuf_datatypes, d );
977}
978
979static struct evbuffer_iovec *
980evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
981{
982    const size_t byteCount = evbuffer_get_length( buf );
983    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
984    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
985    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
986    assert( n == vecCount );
987    *setme_vecCount = n;
988    return iovec;
989}
990
991static void
992maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
993{
994    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
995    {
996        size_t i, n;
997        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
998
999        for( i=0; i<n; ++i )
1000            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
1001
1002        tr_free( iovec );
1003    }
1004}
1005
1006void
1007tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData )
1008{
1009    const size_t byteCount = evbuffer_get_length( buf );
1010    maybeEncryptBuffer( io, buf );
1011    evbuffer_add_buffer( io->outbuf, buf );
1012    addDatatype( io, byteCount, isPieceData );
1013}
1014
1015void
1016tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData )
1017{
1018    struct evbuffer * buf = evbuffer_new( );
1019    evbuffer_add( buf, bytes, byteCount );
1020    tr_peerIoWriteBuf( io, buf, isPieceData );
1021    evbuffer_free( buf );
1022}
1023
1024/***
1025****
1026***/
1027
1028void
1029evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1030{
1031    evbuffer_add( outbuf, &byte, 1 );
1032}
1033
1034void
1035evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1036{
1037    const uint16_t ns = htons( addme_hs );
1038    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1039}
1040
1041void
1042evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1043{
1044    const uint32_t nl = htonl( addme_hl );
1045    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1046}
1047
1048void
1049evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll )
1050{
1051    const uint64_t nll = tr_htonll( addme_hll );
1052    evbuffer_add( outbuf, &nll, sizeof( nll ) );
1053}
1054
1055/***
1056****
1057***/
1058
1059void
1060tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1061{
1062    assert( tr_isPeerIo( io ) );
1063    assert( evbuffer_get_length( inbuf )  >= byteCount );
1064
1065    switch( io->encryptionMode )
1066    {
1067        case PEER_ENCRYPTION_NONE:
1068            evbuffer_remove( inbuf, bytes, byteCount );
1069            break;
1070
1071        case PEER_ENCRYPTION_RC4:
1072            evbuffer_remove( inbuf, bytes, byteCount );
1073            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1074            break;
1075
1076        default:
1077            assert( 0 );
1078    }
1079}
1080
1081void
1082tr_peerIoReadUint16( tr_peerIo        * io,
1083                     struct evbuffer  * inbuf,
1084                     uint16_t         * setme )
1085{
1086    uint16_t tmp;
1087    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1088    *setme = ntohs( tmp );
1089}
1090
1091void tr_peerIoReadUint32( tr_peerIo        * io,
1092                          struct evbuffer  * inbuf,
1093                          uint32_t         * setme )
1094{
1095    uint32_t tmp;
1096    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1097    *setme = ntohl( tmp );
1098}
1099
1100void
1101tr_peerIoDrain( tr_peerIo       * io,
1102                struct evbuffer * inbuf,
1103                size_t            byteCount )
1104{
1105    void * buf = tr_sessionGetBuffer( io->session );
1106    const size_t buflen = SESSION_BUFFER_SIZE;
1107
1108    while( byteCount > 0 )
1109    {
1110        const size_t thisPass = MIN( byteCount, buflen );
1111        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1112        byteCount -= thisPass;
1113    }
1114
1115    tr_sessionReleaseBuffer( io->session );
1116}
1117
1118/***
1119****
1120***/
1121
1122static int
1123tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1124{
1125    int res = 0;
1126
1127    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1128    {
1129        if( io->utp_socket != NULL ) /* utp peer connection */
1130        {
1131            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1132             * It opens up the congestion window by sending an ACK (soonish)
1133             * if one was not going to be sent. */
1134            if( evbuffer_get_length( io->inbuf ) == 0 )
1135                UTP_RBDrained( io->utp_socket );
1136        }
1137        else /* tcp peer connection */
1138        {
1139            int e;
1140
1141            EVUTIL_SET_SOCKET_ERROR( 0 );
1142            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1143            e = EVUTIL_SOCKET_ERROR( );
1144
1145            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1146
1147            if( evbuffer_get_length( io->inbuf ) )
1148                canReadWrapper( io );
1149
1150            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1151            {
1152                char errstr[512];
1153                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1154                if( res == 0 )
1155                    what |= BEV_EVENT_EOF;
1156                tr_net_strerror( errstr, sizeof( errstr ), e );
1157                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1158                io->gotError( io, what, io->userData );
1159            }
1160        }
1161    }
1162
1163    return res;
1164}
1165
1166static int
1167tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1168{
1169    int n = 0;
1170    const size_t old_len = evbuffer_get_length( io->outbuf );
1171    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1172
1173    if( howmuch > old_len )
1174        howmuch = old_len;
1175
1176    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1177    {
1178        if( io->utp_socket != NULL ) /* utp peer connection */
1179        {
1180            const size_t old_len = evbuffer_get_length( io->outbuf );
1181            UTP_Write( io->utp_socket, howmuch );
1182            n = old_len - evbuffer_get_length( io->outbuf );
1183        }
1184        else
1185        {
1186            int e;
1187
1188            EVUTIL_SET_SOCKET_ERROR( 0 );
1189            n = tr_evbuffer_write( io, io->socket, howmuch );
1190            e = EVUTIL_SOCKET_ERROR( );
1191
1192            if( n > 0 )
1193                didWriteWrapper( io, n );
1194
1195            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1196            {
1197                char errstr[512];
1198                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1199
1200                tr_net_strerror( errstr, sizeof( errstr ), e );
1201                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1202
1203                if( io->gotError != NULL )
1204                    io->gotError( io, what, io->userData );
1205            }
1206        }
1207    }
1208
1209    return n;
1210}
1211
1212int
1213tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1214{
1215    int bytesUsed = 0;
1216
1217    assert( tr_isPeerIo( io ) );
1218    assert( tr_isDirection( dir ) );
1219
1220    if( dir == TR_DOWN )
1221        bytesUsed = tr_peerIoTryRead( io, limit );
1222    else
1223        bytesUsed = tr_peerIoTryWrite( io, limit );
1224
1225    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1226    return bytesUsed;
1227}
1228
1229int
1230tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1231{
1232    size_t byteCount = 0;
1233    tr_list * it;
1234
1235    /* count up how many bytes are used by non-piece-data messages
1236       at the front of our outbound queue */
1237    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1238    {
1239        struct tr_datatype * d = it->data;
1240
1241        if( d->isPieceData )
1242            break;
1243
1244        byteCount += d->length;
1245    }
1246
1247    return tr_peerIoFlush( io, TR_UP, byteCount );
1248}
Note: See TracBrowser for help on using the repository browser.