source: trunk/libtransmission/peer-io.c @ 12223

Last change on this file since 12223 was 12223, checked in by jordan, 11 years ago

(trunk) copyediting: remove some unneeded #includes, and annotate some needed ones

  • Property svn:keywords set to Date Rev Author Id
File size: 32.5 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 12223 2011-03-24 21:49:42Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17
18#ifdef WIN32
19 #include <winsock2.h>
20#else
21 #include <arpa/inet.h> /* inet_ntoa */
22#endif
23
24#include <event2/event.h>
25#include <event2/bufferevent.h>
26#include <libutp/utp.h>
27
28#include "transmission.h"
29#include "session.h"
30#include "bandwidth.h"
31#include "crypto.h"
32#include "list.h"
33#include "net.h"
34#include "peer-common.h" /* MAX_BLOCK_SIZE */
35#include "peer-io.h"
36#include "trevent.h" /* tr_runInEventThread() */
37#include "utils.h"
38
39
40#ifdef WIN32
41 #define EAGAIN       WSAEWOULDBLOCK
42 #define EINTR        WSAEINTR
43 #define EINPROGRESS  WSAEINPROGRESS
44 #define EPIPE        WSAECONNRESET
45#endif
46
47/* The amount of read bufferring that we allow for uTP sockets. */
48
49#define UTP_READ_BUFFER_SIZE (256 * 1024)
50
51static size_t
52guessPacketOverhead( size_t d )
53{
54    /**
55     * http://sd.wareonearth.com/~phil/net/overhead/
56     *
57     * TCP over Ethernet:
58     * Assuming no header compression (e.g. not PPP)
59     * Add 20 IPv4 header or 40 IPv6 header (no options)
60     * Add 20 TCP header
61     * Add 12 bytes optional TCP timestamps
62     * Max TCP Payload data rates over ethernet are thus:
63     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
64     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
65     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
66     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
67     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
68     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
69     */
70    const double assumed_payload_data_rate = 94.0;
71
72    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
73}
74
75/**
76***
77**/
78
79#define dbgmsg( io, ... ) \
80    do { \
81        if( tr_deepLoggingIsActive( ) ) \
82            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
83    } while( 0 )
84
85struct tr_datatype
86{
87    bool    isPieceData;
88    size_t  length;
89};
90
91
92/***
93****
94***/
95
96static void
97didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
98{
99     while( bytes_transferred && tr_isPeerIo( io ) )
100     {
101        struct tr_datatype * next = io->outbuf_datatypes->data;
102
103        const unsigned int payload = MIN( next->length, bytes_transferred );
104        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
105        const unsigned int overhead =
106            io->socket ? guessPacketOverhead( payload ) : 0;
107        const uint64_t now = tr_time_msec( );
108
109        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
110
111        if( overhead > 0 )
112            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
113
114        if( io->didWrite )
115            io->didWrite( io, payload, next->isPieceData, io->userData );
116
117        if( tr_isPeerIo( io ) )
118        {
119            bytes_transferred -= payload;
120            next->length -= payload;
121            if( !next->length ) {
122                tr_list_pop_front( &io->outbuf_datatypes );
123                tr_free( next );
124            }
125        }
126    }
127}
128
129static void
130canReadWrapper( tr_peerIo * io )
131{
132    bool err = 0;
133    bool done = 0;
134    tr_session * session;
135
136    dbgmsg( io, "canRead" );
137
138    tr_peerIoRef( io );
139
140    session = io->session;
141
142    /* try to consume the input buffer */
143    if( io->canRead )
144    {
145        const uint64_t now = tr_time_msec( );
146
147        tr_sessionLock( session );
148
149        while( !done && !err )
150        {
151            size_t piece = 0;
152            const size_t oldLen = evbuffer_get_length( io->inbuf );
153            const int ret = io->canRead( io, io->userData, &piece );
154            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
155            const unsigned int overhead = guessPacketOverhead( used );
156
157            if( piece || (piece!=used) )
158            {
159                if( piece )
160                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now );
161
162                if( used != piece )
163                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now );
164            }
165
166            if( overhead > 0 )
167                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
168
169            switch( ret )
170            {
171                case READ_NOW:
172                    if( evbuffer_get_length( io->inbuf ) )
173                        continue;
174                    done = 1;
175                    break;
176
177                case READ_LATER:
178                    done = 1;
179                    break;
180
181                case READ_ERR:
182                    err = 1;
183                    break;
184            }
185
186            assert( tr_isPeerIo( io ) );
187        }
188
189        tr_sessionUnlock( session );
190    }
191
192    tr_peerIoUnref( io );
193}
194
195static void
196event_read_cb( int fd, short event UNUSED, void * vio )
197{
198    int res;
199    int e;
200    tr_peerIo * io = vio;
201
202    /* Limit the input buffer to 256K, so it doesn't grow too large */
203    unsigned int howmuch;
204    unsigned int curlen;
205    const tr_direction dir = TR_DOWN;
206    const unsigned int max = 256 * 1024;
207
208    assert( tr_isPeerIo( io ) );
209    assert( io->socket >= 0 );
210
211    io->pendingEvents &= ~EV_READ;
212
213    curlen = evbuffer_get_length( io->inbuf );
214    howmuch = curlen >= max ? 0 : max - curlen;
215    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
216
217    dbgmsg( io, "libevent says this peer is ready to read" );
218
219    /* if we don't have any bandwidth left, stop reading */
220    if( howmuch < 1 ) {
221        tr_peerIoSetEnabled( io, dir, false );
222        return;
223    }
224
225    EVUTIL_SET_SOCKET_ERROR( 0 );
226    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
227    e = EVUTIL_SOCKET_ERROR( );
228
229    if( res > 0 )
230    {
231        tr_peerIoSetEnabled( io, dir, true );
232
233        /* Invoke the user callback - must always be called last */
234        canReadWrapper( io );
235    }
236    else
237    {
238        char errstr[512];
239        short what = BEV_EVENT_READING;
240
241        if( res == 0 ) /* EOF */
242            what |= BEV_EVENT_EOF;
243        else if( res == -1 ) {
244            if( e == EAGAIN || e == EINTR ) {
245                tr_peerIoSetEnabled( io, dir, true );
246                return;
247            }
248            what |= BEV_EVENT_ERROR;
249        }
250
251        tr_net_strerror( errstr, sizeof( errstr ), e );
252        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
253
254        if( io->gotError != NULL )
255            io->gotError( io, what, io->userData );
256    }
257}
258
259static int
260tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
261{
262    int e;
263    int n;
264    char errstr[256];
265
266    EVUTIL_SET_SOCKET_ERROR( 0 );
267    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
268    e = EVUTIL_SOCKET_ERROR( );
269    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
270
271    return n;
272}
273
274static void
275event_write_cb( int fd, short event UNUSED, void * vio )
276{
277    int res = 0;
278    int e;
279    short what = BEV_EVENT_WRITING;
280    tr_peerIo * io = vio;
281    size_t howmuch;
282    const tr_direction dir = TR_UP;
283
284    assert( tr_isPeerIo( io ) );
285    assert( io->socket >= 0 );
286
287    io->pendingEvents &= ~EV_WRITE;
288
289    dbgmsg( io, "libevent says this peer is ready to write" );
290
291    /* Write as much as possible, since the socket is non-blocking, write() will
292     * return if it can't write any more data without blocking */
293    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
294
295    /* if we don't have any bandwidth left, stop writing */
296    if( howmuch < 1 ) {
297        tr_peerIoSetEnabled( io, dir, false );
298        return;
299    }
300
301    EVUTIL_SET_SOCKET_ERROR( 0 );
302    res = tr_evbuffer_write( io, fd, howmuch );
303    e = EVUTIL_SOCKET_ERROR( );
304
305    if (res == -1) {
306        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
307            goto reschedule;
308        /* error case */
309        what |= BEV_EVENT_ERROR;
310    } else if (res == 0) {
311        /* eof case */
312        what |= BEV_EVENT_EOF;
313    }
314    if (res <= 0)
315        goto error;
316
317    if( evbuffer_get_length( io->outbuf ) )
318        tr_peerIoSetEnabled( io, dir, true );
319
320    didWriteWrapper( io, res );
321    return;
322
323 reschedule:
324    if( evbuffer_get_length( io->outbuf ) )
325        tr_peerIoSetEnabled( io, dir, true );
326    return;
327
328 error:
329
330    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
331
332    if( io->gotError != NULL )
333        io->gotError( io, what, io->userData );
334}
335
336/**
337***
338**/
339
340static void
341maybeSetCongestionAlgorithm( int socket, const char * algorithm )
342{
343    if( algorithm && *algorithm )
344    {
345        const int rc = tr_netSetCongestionControl( socket, algorithm );
346
347        if( rc < 0 )
348            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
349                     algorithm, tr_strerror( errno ));
350    }
351}
352
353#ifdef WITH_UTP
354/* UTP callbacks */
355
356static void
357utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
358{
359    int rc;
360    tr_peerIo *io = closure;
361    assert( tr_isPeerIo( io ) );
362
363    rc = evbuffer_add( io->inbuf, buf, buflen );
364    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
365
366    if( rc < 0 ) {
367        tr_nerr( "UTP", "On read evbuffer_add" );
368        return;
369    }
370
371    tr_peerIoSetEnabled( io, TR_DOWN, true );
372    canReadWrapper( io );
373}
374
375static void
376utp_on_write(void *closure, unsigned char *buf, size_t buflen)
377{
378    int rc;
379    tr_peerIo *io = closure;
380    assert( tr_isPeerIo( io ) );
381
382    rc = evbuffer_remove( io->outbuf, buf, buflen );
383    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
384    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
385    if( rc < (long)buflen ) {
386        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
387    }
388
389    didWriteWrapper( io, buflen );
390}
391
392static size_t
393utp_get_rb_size(void *closure)
394{
395    size_t bytes;
396    tr_peerIo *io = closure;
397    assert( tr_isPeerIo( io ) );
398
399    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
400
401    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
402    return UTP_READ_BUFFER_SIZE - bytes;
403}
404
405static void
406utp_on_state_change(void *closure, int state)
407{
408    tr_peerIo *io = closure;
409    assert( tr_isPeerIo( io ) );
410
411    if( state == UTP_STATE_CONNECT ) {
412        dbgmsg( io, "utp_on_state_change -- changed to connected" );
413        io->utpSupported = true;
414    } else if( state == UTP_STATE_WRITABLE ) {
415        dbgmsg( io, "utp_on_state_change -- changed to writable" );
416    } else if( state == UTP_STATE_EOF ) {
417        if( io->gotError )
418            io->gotError( io, BEV_EVENT_EOF, io->userData );
419    } else if( state == UTP_STATE_DESTROYING ) {
420        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
421        return;
422    } else {
423        tr_nerr( "UTP", "Unknown state %d", state );
424    }
425}
426
427static void
428utp_on_error(void *closure, int errcode)
429{
430    tr_peerIo *io = closure;
431    assert( tr_isPeerIo( io ) );
432
433    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
434
435    if( io->gotError ) {
436        errno = errcode;
437        io->gotError( io, BEV_EVENT_ERROR, io->userData );
438    }
439}
440
441static void
442utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
443{
444    tr_peerIo *io = closure;
445    assert( tr_isPeerIo( io ) );
446
447    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
448
449    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
450                      count, false, tr_time_msec() );
451}
452
453static struct UTPFunctionTable utp_function_table = {
454    .on_read = utp_on_read,
455    .on_write = utp_on_write,
456    .get_rb_size = utp_get_rb_size,
457    .on_state = utp_on_state_change,
458    .on_error = utp_on_error,
459    .on_overhead = utp_on_overhead
460};
461
462
463/* Dummy UTP callbacks. */
464/* We switch a UTP socket to use these after the associated peerIo has been
465   destroyed -- see io_dtor. */
466
467static void
468dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
469{
470    /* This cannot happen, as far as I'm aware. */
471    tr_nerr( "UTP", "On_read called on closed socket" );
472
473}
474
475static void
476dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
477{
478    /* This can very well happen if we've shut down a peer connection that
479       had unflushed buffers.  Complain and send zeroes. */
480    tr_ndbg( "UTP", "On_write called on closed socket" );
481    memset( buf, 0, buflen );
482}
483
484static size_t
485dummy_get_rb_size( void * closure UNUSED )
486{
487    return 0;
488}
489
490static void
491dummy_on_state_change(void * closure UNUSED, int state UNUSED )
492{
493    return;
494}
495
496static void
497dummy_on_error( void * closure UNUSED, int errcode UNUSED )
498{
499    return;
500}
501
502static void
503dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
504{
505    return;
506}
507
508static struct UTPFunctionTable dummy_utp_function_table = {
509    .on_read = dummy_read,
510    .on_write = dummy_write,
511    .get_rb_size = dummy_get_rb_size,
512    .on_state = dummy_on_state_change,
513    .on_error = dummy_on_error,
514    .on_overhead = dummy_on_overhead
515};
516
517#endif /* #ifdef WITH_UTP */
518
519static tr_peerIo*
520tr_peerIoNew( tr_session       * session,
521              tr_bandwidth     * parent,
522              const tr_address * addr,
523              tr_port            port,
524              const uint8_t    * torrentHash,
525              bool               isIncoming,
526              bool               isSeed,
527              int                socket,
528              struct UTPSocket * utp_socket)
529{
530    tr_peerIo * io;
531
532    assert( session != NULL );
533    assert( session->events != NULL );
534    assert( tr_isBool( isIncoming ) );
535    assert( tr_isBool( isSeed ) );
536    assert( tr_amInEventThread( session ) );
537    assert( (socket < 0) == (utp_socket != NULL) );
538#ifndef WITH_UTP
539    assert( socket >= 0 );
540#endif
541
542    if( socket >= 0 ) {
543        tr_netSetTOS( socket, session->peerSocketTOS );
544        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
545    }
546
547    io = tr_new0( tr_peerIo, 1 );
548    io->magicNumber = PEER_IO_MAGIC_NUMBER;
549    io->refCount = 1;
550    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
551    io->session = session;
552    io->addr = *addr;
553    io->isSeed = isSeed;
554    io->port = port;
555    io->socket = socket;
556    io->utp_socket = utp_socket;
557    io->isIncoming = isIncoming != 0;
558    io->timeCreated = tr_time( );
559    io->inbuf = evbuffer_new( );
560    io->outbuf = evbuffer_new( );
561    tr_bandwidthConstruct( &io->bandwidth, session, parent );
562    tr_bandwidthSetPeer( &io->bandwidth, io );
563    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
564    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
565
566    if( io->socket >= 0 ) {
567        io->event_read = event_new( session->event_base,
568                                    io->socket, EV_READ, event_read_cb, io );
569        io->event_write = event_new( session->event_base,
570                                     io->socket, EV_WRITE, event_write_cb, io );
571    }
572#ifdef WITH_UTP
573    else {
574        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
575        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
576        UTP_SetCallbacks( utp_socket,
577                          &utp_function_table,
578                          io );
579        if( !isIncoming ) {
580            dbgmsg( io, "%s", "calling UTP_Connect" );
581            UTP_Connect( utp_socket );
582        }
583    }
584#endif
585
586    return io;
587}
588
589tr_peerIo*
590tr_peerIoNewIncoming( tr_session        * session,
591                      tr_bandwidth      * parent,
592                      const tr_address  * addr,
593                      tr_port             port,
594                      int                 fd,
595                      struct UTPSocket  * utp_socket )
596{
597    assert( session );
598    assert( tr_isAddress( addr ) );
599
600    return tr_peerIoNew( session, parent, addr, port, NULL, true, false,
601                         fd, utp_socket );
602}
603
604tr_peerIo*
605tr_peerIoNewOutgoing( tr_session        * session,
606                      tr_bandwidth      * parent,
607                      const tr_address  * addr,
608                      tr_port             port,
609                      const uint8_t     * torrentHash,
610                      bool                isSeed,
611                      bool                utp )
612{
613    int fd = -1;
614    struct UTPSocket *utp_socket = NULL;
615
616    assert( session );
617    assert( tr_isAddress( addr ) );
618    assert( torrentHash );
619
620    if( utp )
621        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
622
623    if( !utp_socket ) {
624        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
625        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
626    }
627
628    if( fd < 0 && utp_socket == NULL )
629        return NULL;
630
631    return tr_peerIoNew( session, parent, addr, port,
632                         torrentHash, false, isSeed, fd, utp_socket );
633}
634
635/***
636****
637***/
638
639static void
640event_enable( tr_peerIo * io, short event )
641{
642    assert( tr_amInEventThread( io->session ) );
643    assert( io->session != NULL );
644    assert( io->session->events != NULL );
645
646    if( io->socket < 0 )
647        return;
648
649    assert( io->session->events != NULL );
650    assert( event_initialized( io->event_read ) );
651    assert( event_initialized( io->event_write ) );
652
653    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
654    {
655        dbgmsg( io, "enabling libevent ready-to-read polling" );
656        event_add( io->event_read, NULL );
657        io->pendingEvents |= EV_READ;
658    }
659
660    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
661    {
662        dbgmsg( io, "enabling libevent ready-to-write polling" );
663        event_add( io->event_write, NULL );
664        io->pendingEvents |= EV_WRITE;
665    }
666}
667
668static void
669event_disable( struct tr_peerIo * io, short event )
670{
671    assert( tr_amInEventThread( io->session ) );
672    assert( io->session != NULL );
673
674    if( io->socket < 0 )
675        return;
676
677    assert( io->session->events != NULL );
678    assert( event_initialized( io->event_read ) );
679    assert( event_initialized( io->event_write ) );
680
681    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
682    {
683        dbgmsg( io, "disabling libevent ready-to-read polling" );
684        event_del( io->event_read );
685        io->pendingEvents &= ~EV_READ;
686    }
687
688    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
689    {
690        dbgmsg( io, "disabling libevent ready-to-write polling" );
691        event_del( io->event_write );
692        io->pendingEvents &= ~EV_WRITE;
693    }
694}
695
696void
697tr_peerIoSetEnabled( tr_peerIo    * io,
698                     tr_direction   dir,
699                     bool           isEnabled )
700{
701    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
702
703    assert( tr_isPeerIo( io ) );
704    assert( tr_isDirection( dir ) );
705    assert( tr_amInEventThread( io->session ) );
706    assert( io->session->events != NULL );
707
708    if( isEnabled )
709        event_enable( io, event );
710    else
711        event_disable( io, event );
712}
713
714/***
715****
716***/
717static void
718io_close_socket( tr_peerIo * io )
719{
720    if( io->socket >= 0 ) {
721        tr_netClose( io->session, io->socket );
722        io->socket = -1;
723        event_free( io->event_read );
724        event_free( io->event_write );
725    }
726
727#ifdef WITH_UTP
728    if( io->utp_socket ) {
729        UTP_SetCallbacks( io->utp_socket,
730                          &dummy_utp_function_table,
731                          NULL );
732        UTP_Close( io->utp_socket );
733
734        io->utp_socket = NULL;
735    }
736#endif
737}
738
739static void
740io_dtor( void * vio )
741{
742    tr_peerIo * io = vio;
743
744    assert( tr_isPeerIo( io ) );
745    assert( tr_amInEventThread( io->session ) );
746    assert( io->session->events != NULL );
747
748    dbgmsg( io, "in tr_peerIo destructor" );
749    event_disable( io, EV_READ | EV_WRITE );
750    tr_bandwidthDestruct( &io->bandwidth );
751    evbuffer_free( io->outbuf );
752    evbuffer_free( io->inbuf );
753    io_close_socket( io );
754    tr_cryptoFree( io->crypto );
755    tr_list_free( &io->outbuf_datatypes, tr_free );
756
757    memset( io, ~0, sizeof( tr_peerIo ) );
758    tr_free( io );
759}
760
761static void
762tr_peerIoFree( tr_peerIo * io )
763{
764    if( io )
765    {
766        dbgmsg( io, "in tr_peerIoFree" );
767        io->canRead = NULL;
768        io->didWrite = NULL;
769        io->gotError = NULL;
770        tr_runInEventThread( io->session, io_dtor, io );
771    }
772}
773
774void
775tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
776{
777    assert( tr_isPeerIo( io ) );
778
779    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
780                file, line, io->refCount, io->refCount+1 );
781
782    ++io->refCount;
783}
784
785void
786tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
787{
788    assert( tr_isPeerIo( io ) );
789
790    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
791                file, line, io->refCount, io->refCount-1 );
792
793    if( !--io->refCount )
794        tr_peerIoFree( io );
795}
796
797const tr_address*
798tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
799{
800    assert( tr_isPeerIo( io ) );
801
802    if( port )
803        *port = io->port;
804
805    return &io->addr;
806}
807
808const char*
809tr_peerIoAddrStr( const tr_address * addr, tr_port port )
810{
811    static char buf[512];
812
813    if( addr->type == TR_AF_INET )
814        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
815    else
816        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
817    return buf;
818}
819
820const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
821{
822    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
823}
824
825void
826tr_peerIoSetIOFuncs( tr_peerIo        * io,
827                     tr_can_read_cb     readcb,
828                     tr_did_write_cb    writecb,
829                     tr_net_error_cb    errcb,
830                     void             * userData )
831{
832    io->canRead = readcb;
833    io->didWrite = writecb;
834    io->gotError = errcb;
835    io->userData = userData;
836}
837
838void
839tr_peerIoClear( tr_peerIo * io )
840{
841    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
842    tr_peerIoSetEnabled( io, TR_UP, false );
843    tr_peerIoSetEnabled( io, TR_DOWN, false );
844}
845
846int
847tr_peerIoReconnect( tr_peerIo * io )
848{
849    short int pendingEvents;
850    tr_session * session;
851
852    assert( tr_isPeerIo( io ) );
853    assert( !tr_peerIoIsIncoming( io ) );
854
855    session = tr_peerIoGetSession( io );
856
857    pendingEvents = io->pendingEvents;
858    event_disable( io, EV_READ | EV_WRITE );
859
860    io_close_socket( io );
861
862    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
863    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
864    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
865
866    if( io->socket >= 0 )
867    {
868        event_enable( io, pendingEvents );
869        tr_netSetTOS( io->socket, session->peerSocketTOS );
870        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
871        return 0;
872    }
873
874    return -1;
875}
876
877/**
878***
879**/
880
881void
882tr_peerIoSetTorrentHash( tr_peerIo *     io,
883                         const uint8_t * hash )
884{
885    assert( tr_isPeerIo( io ) );
886
887    tr_cryptoSetTorrentHash( io->crypto, hash );
888}
889
890const uint8_t*
891tr_peerIoGetTorrentHash( tr_peerIo * io )
892{
893    assert( tr_isPeerIo( io ) );
894    assert( io->crypto );
895
896    return tr_cryptoGetTorrentHash( io->crypto );
897}
898
899int
900tr_peerIoHasTorrentHash( const tr_peerIo * io )
901{
902    assert( tr_isPeerIo( io ) );
903    assert( io->crypto );
904
905    return tr_cryptoHasTorrentHash( io->crypto );
906}
907
908/**
909***
910**/
911
912void
913tr_peerIoSetPeersId( tr_peerIo *     io,
914                     const uint8_t * peer_id )
915{
916    assert( tr_isPeerIo( io ) );
917
918    if( ( io->peerIdIsSet = peer_id != NULL ) )
919        memcpy( io->peerId, peer_id, 20 );
920    else
921        memset( io->peerId, 0, 20 );
922}
923
924/**
925***
926**/
927
928static unsigned int
929getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
930{
931    /* this is all kind of arbitrary, but what seems to work well is
932     * being large enough to hold the next 20 seconds' worth of input,
933     * or a few blocks, whichever is bigger.
934     * It's okay to tweak this as needed */
935    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
936    const unsigned int period = 15u; /* arbitrary */
937    /* the 3 is arbitrary; the .5 is to leave room for messages */
938    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
939    return MAX( ceiling, currentSpeed_Bps*period );
940}
941
942size_t
943tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
944{
945    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
946    const size_t currentLen = evbuffer_get_length( io->outbuf );
947    size_t freeSpace = 0;
948
949    if( desiredLen > currentLen )
950        freeSpace = desiredLen - currentLen;
951
952    return freeSpace;
953}
954
955/**
956***
957**/
958
959void
960tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
961{
962    assert( tr_isPeerIo( io ) );
963    assert( encryptionMode == PEER_ENCRYPTION_NONE
964         || encryptionMode == PEER_ENCRYPTION_RC4 );
965
966    io->encryptionMode = encryptionMode;
967}
968
969/**
970***
971**/
972
973static void
974addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData )
975{
976    struct tr_datatype * d;
977
978    d = tr_new( struct tr_datatype, 1 );
979    d->isPieceData = isPieceData != 0;
980    d->length = byteCount;
981    tr_list_append( &io->outbuf_datatypes, d );
982}
983
984static struct evbuffer_iovec *
985evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
986{
987    const size_t byteCount = evbuffer_get_length( buf );
988    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
989    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
990    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
991    assert( n == vecCount );
992    *setme_vecCount = n;
993    return iovec;
994}
995
996static void
997maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
998{
999    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
1000    {
1001        size_t i, n;
1002        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
1003
1004        for( i=0; i<n; ++i )
1005            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
1006
1007        tr_free( iovec );
1008    }
1009}
1010
1011void
1012tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData )
1013{
1014    const size_t byteCount = evbuffer_get_length( buf );
1015    maybeEncryptBuffer( io, buf );
1016    evbuffer_add_buffer( io->outbuf, buf );
1017    addDatatype( io, byteCount, isPieceData );
1018}
1019
1020void
1021tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData )
1022{
1023    struct evbuffer * buf = evbuffer_new( );
1024    evbuffer_add( buf, bytes, byteCount );
1025    tr_peerIoWriteBuf( io, buf, isPieceData );
1026    evbuffer_free( buf );
1027}
1028
1029/***
1030****
1031***/
1032
1033void
1034evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1035{
1036    evbuffer_add( outbuf, &byte, 1 );
1037}
1038
1039void
1040evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1041{
1042    const uint16_t ns = htons( addme_hs );
1043    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1044}
1045
1046void
1047evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1048{
1049    const uint32_t nl = htonl( addme_hl );
1050    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1051}
1052
1053void
1054evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll )
1055{
1056    const uint64_t nll = tr_htonll( addme_hll );
1057    evbuffer_add( outbuf, &nll, sizeof( nll ) );
1058}
1059
1060/***
1061****
1062***/
1063
1064void
1065tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1066{
1067    assert( tr_isPeerIo( io ) );
1068    assert( evbuffer_get_length( inbuf )  >= byteCount );
1069
1070    switch( io->encryptionMode )
1071    {
1072        case PEER_ENCRYPTION_NONE:
1073            evbuffer_remove( inbuf, bytes, byteCount );
1074            break;
1075
1076        case PEER_ENCRYPTION_RC4:
1077            evbuffer_remove( inbuf, bytes, byteCount );
1078            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1079            break;
1080
1081        default:
1082            assert( 0 );
1083    }
1084}
1085
1086void
1087tr_peerIoReadUint16( tr_peerIo        * io,
1088                     struct evbuffer  * inbuf,
1089                     uint16_t         * setme )
1090{
1091    uint16_t tmp;
1092    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1093    *setme = ntohs( tmp );
1094}
1095
1096void tr_peerIoReadUint32( tr_peerIo        * io,
1097                          struct evbuffer  * inbuf,
1098                          uint32_t         * setme )
1099{
1100    uint32_t tmp;
1101    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1102    *setme = ntohl( tmp );
1103}
1104
1105void
1106tr_peerIoDrain( tr_peerIo       * io,
1107                struct evbuffer * inbuf,
1108                size_t            byteCount )
1109{
1110    void * buf = tr_sessionGetBuffer( io->session );
1111    const size_t buflen = SESSION_BUFFER_SIZE;
1112
1113    while( byteCount > 0 )
1114    {
1115        const size_t thisPass = MIN( byteCount, buflen );
1116        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1117        byteCount -= thisPass;
1118    }
1119
1120    tr_sessionReleaseBuffer( io->session );
1121}
1122
1123/***
1124****
1125***/
1126
1127static int
1128tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1129{
1130    int res = 0;
1131
1132    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1133    {
1134        if( io->utp_socket != NULL ) /* utp peer connection */
1135        {
1136            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1137             * It opens up the congestion window by sending an ACK (soonish)
1138             * if one was not going to be sent. */
1139            if( evbuffer_get_length( io->inbuf ) == 0 )
1140                UTP_RBDrained( io->utp_socket );
1141        }
1142        else /* tcp peer connection */
1143        {
1144            int e;
1145
1146            EVUTIL_SET_SOCKET_ERROR( 0 );
1147            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1148            e = EVUTIL_SOCKET_ERROR( );
1149
1150            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1151
1152            if( evbuffer_get_length( io->inbuf ) )
1153                canReadWrapper( io );
1154
1155            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1156            {
1157                char errstr[512];
1158                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1159                if( res == 0 )
1160                    what |= BEV_EVENT_EOF;
1161                tr_net_strerror( errstr, sizeof( errstr ), e );
1162                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1163                io->gotError( io, what, io->userData );
1164            }
1165        }
1166    }
1167
1168    return res;
1169}
1170
1171static int
1172tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1173{
1174    int n = 0;
1175    const size_t old_len = evbuffer_get_length( io->outbuf );
1176    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1177
1178    if( howmuch > old_len )
1179        howmuch = old_len;
1180
1181    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1182    {
1183        if( io->utp_socket != NULL ) /* utp peer connection */
1184        {
1185            const size_t old_len = evbuffer_get_length( io->outbuf );
1186            UTP_Write( io->utp_socket, howmuch );
1187            n = old_len - evbuffer_get_length( io->outbuf );
1188        }
1189        else
1190        {
1191            int e;
1192
1193            EVUTIL_SET_SOCKET_ERROR( 0 );
1194            n = tr_evbuffer_write( io, io->socket, howmuch );
1195            e = EVUTIL_SOCKET_ERROR( );
1196
1197            if( n > 0 )
1198                didWriteWrapper( io, n );
1199
1200            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1201            {
1202                char errstr[512];
1203                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1204
1205                tr_net_strerror( errstr, sizeof( errstr ), e );
1206                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1207
1208                if( io->gotError != NULL )
1209                    io->gotError( io, what, io->userData );
1210            }
1211        }
1212    }
1213
1214    return n;
1215}
1216
1217int
1218tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1219{
1220    int bytesUsed = 0;
1221
1222    assert( tr_isPeerIo( io ) );
1223    assert( tr_isDirection( dir ) );
1224
1225    if( dir == TR_DOWN )
1226        bytesUsed = tr_peerIoTryRead( io, limit );
1227    else
1228        bytesUsed = tr_peerIoTryWrite( io, limit );
1229
1230    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1231    return bytesUsed;
1232}
1233
1234int
1235tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1236{
1237    size_t byteCount = 0;
1238    tr_list * it;
1239
1240    /* count up how many bytes are used by non-piece-data messages
1241       at the front of our outbound queue */
1242    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1243    {
1244        struct tr_datatype * d = it->data;
1245
1246        if( d->isPieceData )
1247            break;
1248
1249        byteCount += d->length;
1250    }
1251
1252    return tr_peerIoFlush( io, TR_UP, byteCount );
1253}
Note: See TracBrowser for help on using the repository browser.