source: trunk/libtransmission/peer-io.c @ 12343

Last change on this file since 12343 was 12343, checked in by jordan, 11 years ago

(trunk libT) use a better data struct for the tr_datatype list in peer-io

  • Property svn:keywords set to Date Rev Author Id
File size: 34.0 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 12343 2011-04-10 05:21:51Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h>
16
17#include <event2/event.h>
18#include <event2/buffer.h>
19#include <event2/bufferevent.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "session.h"
25#include "bandwidth.h"
26#include "crypto.h"
27#include "net.h"
28#include "peer-common.h" /* MAX_BLOCK_SIZE */
29#include "peer-io.h"
30#include "trevent.h" /* tr_runInEventThread() */
31#include "utils.h"
32
33
34#ifdef WIN32
35 #define EAGAIN       WSAEWOULDBLOCK
36 #define EINTR        WSAEINTR
37 #define EINPROGRESS  WSAEINPROGRESS
38 #define EPIPE        WSAECONNRESET
39#endif
40
41/* The amount of read bufferring that we allow for uTP sockets. */
42
43#define UTP_READ_BUFFER_SIZE (256 * 1024)
44
45static size_t
46guessPacketOverhead( size_t d )
47{
48    /**
49     * http://sd.wareonearth.com/~phil/net/overhead/
50     *
51     * TCP over Ethernet:
52     * Assuming no header compression (e.g. not PPP)
53     * Add 20 IPv4 header or 40 IPv6 header (no options)
54     * Add 20 TCP header
55     * Add 12 bytes optional TCP timestamps
56     * Max TCP Payload data rates over ethernet are thus:
57     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
58     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
59     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
60     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
61     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
62     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
63     */
64    const double assumed_payload_data_rate = 94.0;
65
66    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
67}
68
69/**
70***
71**/
72
73#define dbgmsg( io, ... ) \
74    do { \
75        if( tr_deepLoggingIsActive( ) ) \
76            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
77    } while( 0 )
78
79/**
80***
81**/
82
83struct tr_datatype
84{
85    struct tr_datatype * next;
86    size_t length;
87    bool isPieceData;
88};
89
90static struct tr_datatype * datatype_pool = NULL;
91
92static const struct tr_datatype TR_DATATYPE_INIT = { NULL, 0, false };
93
94static struct tr_datatype *
95datatype_new( void )
96{
97    struct tr_datatype * ret;
98
99    if( datatype_pool == NULL )
100        ret = tr_new( struct tr_datatype, 1 );
101    else {
102        ret = datatype_pool;
103        datatype_pool = datatype_pool->next;
104    }
105
106    *ret = TR_DATATYPE_INIT;
107    return ret;
108}
109
110static void
111datatype_free( struct tr_datatype * datatype )
112{
113    datatype->next = datatype_pool;
114    datatype_pool = datatype;
115}
116
117static void
118peer_io_pull_datatype( tr_peerIo * io )
119{
120    struct tr_datatype * tmp;
121
122    if(( tmp = io->outbuf_datatypes ))
123    {
124        io->outbuf_datatypes = tmp->next;
125        datatype_free( tmp );
126    }
127}
128
129static void
130peer_io_push_datatype( tr_peerIo * io, struct tr_datatype * datatype )
131{
132    struct tr_datatype * tmp;
133
134    if(( tmp = io->outbuf_datatypes )) {
135        while( tmp->next != NULL )
136            tmp = tmp->next;
137        tmp->next = datatype;
138    } else {
139        io->outbuf_datatypes = datatype;
140    }
141}
142
143/***
144****
145***/
146
147static void
148didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
149{
150     while( bytes_transferred && tr_isPeerIo( io ) )
151     {
152        struct tr_datatype * next = io->outbuf_datatypes;
153
154        const unsigned int payload = MIN( next->length, bytes_transferred );
155        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
156        const unsigned int overhead =
157            io->socket ? guessPacketOverhead( payload ) : 0;
158        const uint64_t now = tr_time_msec( );
159
160        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
161
162        if( overhead > 0 )
163            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
164
165        if( io->didWrite )
166            io->didWrite( io, payload, next->isPieceData, io->userData );
167
168        if( tr_isPeerIo( io ) )
169        {
170            bytes_transferred -= payload;
171            next->length -= payload;
172            if( !next->length )
173                peer_io_pull_datatype( io );
174        }
175    }
176}
177
178static void
179canReadWrapper( tr_peerIo * io )
180{
181    bool err = 0;
182    bool done = 0;
183    tr_session * session;
184
185    dbgmsg( io, "canRead" );
186
187    tr_peerIoRef( io );
188
189    session = io->session;
190
191    /* try to consume the input buffer */
192    if( io->canRead )
193    {
194        const uint64_t now = tr_time_msec( );
195
196        tr_sessionLock( session );
197
198        while( !done && !err )
199        {
200            size_t piece = 0;
201            const size_t oldLen = evbuffer_get_length( io->inbuf );
202            const int ret = io->canRead( io, io->userData, &piece );
203            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
204            const unsigned int overhead = guessPacketOverhead( used );
205
206            if( piece || (piece!=used) )
207            {
208                if( piece )
209                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now );
210
211                if( used != piece )
212                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now );
213            }
214
215            if( overhead > 0 )
216                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
217
218            switch( ret )
219            {
220                case READ_NOW:
221                    if( evbuffer_get_length( io->inbuf ) )
222                        continue;
223                    done = 1;
224                    break;
225
226                case READ_LATER:
227                    done = 1;
228                    break;
229
230                case READ_ERR:
231                    err = 1;
232                    break;
233            }
234
235            assert( tr_isPeerIo( io ) );
236        }
237
238        tr_sessionUnlock( session );
239    }
240
241    tr_peerIoUnref( io );
242}
243
244static void
245event_read_cb( int fd, short event UNUSED, void * vio )
246{
247    int res;
248    int e;
249    tr_peerIo * io = vio;
250
251    /* Limit the input buffer to 256K, so it doesn't grow too large */
252    unsigned int howmuch;
253    unsigned int curlen;
254    const tr_direction dir = TR_DOWN;
255    const unsigned int max = 256 * 1024;
256
257    assert( tr_isPeerIo( io ) );
258    assert( io->socket >= 0 );
259
260    io->pendingEvents &= ~EV_READ;
261
262    curlen = evbuffer_get_length( io->inbuf );
263    howmuch = curlen >= max ? 0 : max - curlen;
264    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
265
266    dbgmsg( io, "libevent says this peer is ready to read" );
267
268    /* if we don't have any bandwidth left, stop reading */
269    if( howmuch < 1 ) {
270        tr_peerIoSetEnabled( io, dir, false );
271        return;
272    }
273
274    EVUTIL_SET_SOCKET_ERROR( 0 );
275    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
276    e = EVUTIL_SOCKET_ERROR( );
277
278    if( res > 0 )
279    {
280        tr_peerIoSetEnabled( io, dir, true );
281
282        /* Invoke the user callback - must always be called last */
283        canReadWrapper( io );
284    }
285    else
286    {
287        char errstr[512];
288        short what = BEV_EVENT_READING;
289
290        if( res == 0 ) /* EOF */
291            what |= BEV_EVENT_EOF;
292        else if( res == -1 ) {
293            if( e == EAGAIN || e == EINTR ) {
294                tr_peerIoSetEnabled( io, dir, true );
295                return;
296            }
297            what |= BEV_EVENT_ERROR;
298        }
299
300        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)",
301                res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
302
303        if( io->gotError != NULL )
304            io->gotError( io, what, io->userData );
305    }
306}
307
308static int
309tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
310{
311    int e;
312    int n;
313    char errstr[256];
314
315    EVUTIL_SET_SOCKET_ERROR( 0 );
316    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
317    e = EVUTIL_SOCKET_ERROR( );
318    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
319
320    return n;
321}
322
323static void
324event_write_cb( int fd, short event UNUSED, void * vio )
325{
326    int res = 0;
327    int e;
328    short what = BEV_EVENT_WRITING;
329    tr_peerIo * io = vio;
330    size_t howmuch;
331    const tr_direction dir = TR_UP;
332
333    assert( tr_isPeerIo( io ) );
334    assert( io->socket >= 0 );
335
336    io->pendingEvents &= ~EV_WRITE;
337
338    dbgmsg( io, "libevent says this peer is ready to write" );
339
340    /* Write as much as possible, since the socket is non-blocking, write() will
341     * return if it can't write any more data without blocking */
342    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
343
344    /* if we don't have any bandwidth left, stop writing */
345    if( howmuch < 1 ) {
346        tr_peerIoSetEnabled( io, dir, false );
347        return;
348    }
349
350    EVUTIL_SET_SOCKET_ERROR( 0 );
351    res = tr_evbuffer_write( io, fd, howmuch );
352    e = EVUTIL_SOCKET_ERROR( );
353
354    if (res == -1) {
355        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
356            goto reschedule;
357        /* error case */
358        what |= BEV_EVENT_ERROR;
359    } else if (res == 0) {
360        /* eof case */
361        what |= BEV_EVENT_EOF;
362    }
363    if (res <= 0)
364        goto error;
365
366    if( evbuffer_get_length( io->outbuf ) )
367        tr_peerIoSetEnabled( io, dir, true );
368
369    didWriteWrapper( io, res );
370    return;
371
372 reschedule:
373    if( evbuffer_get_length( io->outbuf ) )
374        tr_peerIoSetEnabled( io, dir, true );
375    return;
376
377 error:
378
379    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
380
381    if( io->gotError != NULL )
382        io->gotError( io, what, io->userData );
383}
384
385/**
386***
387**/
388
389static void
390maybeSetCongestionAlgorithm( int socket, const char * algorithm )
391{
392    if( algorithm && *algorithm )
393    {
394        const int rc = tr_netSetCongestionControl( socket, algorithm );
395
396        if( rc < 0 )
397            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
398                     algorithm, tr_strerror( errno ));
399    }
400}
401
402#ifdef WITH_UTP
403/* UTP callbacks */
404
405static void
406utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
407{
408    int rc;
409    tr_peerIo *io = closure;
410    assert( tr_isPeerIo( io ) );
411
412    rc = evbuffer_add( io->inbuf, buf, buflen );
413    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
414
415    if( rc < 0 ) {
416        tr_nerr( "UTP", "On read evbuffer_add" );
417        return;
418    }
419
420    tr_peerIoSetEnabled( io, TR_DOWN, true );
421    canReadWrapper( io );
422}
423
424static void
425utp_on_write(void *closure, unsigned char *buf, size_t buflen)
426{
427    int rc;
428    tr_peerIo *io = closure;
429    assert( tr_isPeerIo( io ) );
430
431    rc = evbuffer_remove( io->outbuf, buf, buflen );
432    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
433    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
434    if( rc < (long)buflen ) {
435        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
436    }
437
438    didWriteWrapper( io, buflen );
439}
440
441static size_t
442utp_get_rb_size(void *closure)
443{
444    size_t bytes;
445    tr_peerIo *io = closure;
446    assert( tr_isPeerIo( io ) );
447
448    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
449
450    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
451    return UTP_READ_BUFFER_SIZE - bytes;
452}
453
454static void
455utp_on_state_change(void *closure, int state)
456{
457    tr_peerIo *io = closure;
458    assert( tr_isPeerIo( io ) );
459
460    if( state == UTP_STATE_CONNECT ) {
461        dbgmsg( io, "utp_on_state_change -- changed to connected" );
462        io->utpSupported = true;
463    } else if( state == UTP_STATE_WRITABLE ) {
464        dbgmsg( io, "utp_on_state_change -- changed to writable" );
465    } else if( state == UTP_STATE_EOF ) {
466        if( io->gotError )
467            io->gotError( io, BEV_EVENT_EOF, io->userData );
468    } else if( state == UTP_STATE_DESTROYING ) {
469        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
470        return;
471    } else {
472        tr_nerr( "UTP", "Unknown state %d", state );
473    }
474}
475
476static void
477utp_on_error(void *closure, int errcode)
478{
479    tr_peerIo *io = closure;
480    assert( tr_isPeerIo( io ) );
481
482    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
483
484    if( io->gotError ) {
485        errno = errcode;
486        io->gotError( io, BEV_EVENT_ERROR, io->userData );
487    }
488}
489
490static void
491utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
492{
493    tr_peerIo *io = closure;
494    assert( tr_isPeerIo( io ) );
495
496    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
497
498    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
499                      count, false, tr_time_msec() );
500}
501
502static struct UTPFunctionTable utp_function_table = {
503    .on_read = utp_on_read,
504    .on_write = utp_on_write,
505    .get_rb_size = utp_get_rb_size,
506    .on_state = utp_on_state_change,
507    .on_error = utp_on_error,
508    .on_overhead = utp_on_overhead
509};
510
511
512/* Dummy UTP callbacks. */
513/* We switch a UTP socket to use these after the associated peerIo has been
514   destroyed -- see io_dtor. */
515
516static void
517dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
518{
519    /* This cannot happen, as far as I'm aware. */
520    tr_nerr( "UTP", "On_read called on closed socket" );
521
522}
523
524static void
525dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
526{
527    /* This can very well happen if we've shut down a peer connection that
528       had unflushed buffers.  Complain and send zeroes. */
529    tr_ndbg( "UTP", "On_write called on closed socket" );
530    memset( buf, 0, buflen );
531}
532
533static size_t
534dummy_get_rb_size( void * closure UNUSED )
535{
536    return 0;
537}
538
539static void
540dummy_on_state_change(void * closure UNUSED, int state UNUSED )
541{
542    return;
543}
544
545static void
546dummy_on_error( void * closure UNUSED, int errcode UNUSED )
547{
548    return;
549}
550
551static void
552dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
553{
554    return;
555}
556
557static struct UTPFunctionTable dummy_utp_function_table = {
558    .on_read = dummy_read,
559    .on_write = dummy_write,
560    .get_rb_size = dummy_get_rb_size,
561    .on_state = dummy_on_state_change,
562    .on_error = dummy_on_error,
563    .on_overhead = dummy_on_overhead
564};
565
566#endif /* #ifdef WITH_UTP */
567
568static tr_peerIo*
569tr_peerIoNew( tr_session       * session,
570              tr_bandwidth     * parent,
571              const tr_address * addr,
572              tr_port            port,
573              const uint8_t    * torrentHash,
574              bool               isIncoming,
575              bool               isSeed,
576              int                socket,
577              struct UTPSocket * utp_socket)
578{
579    tr_peerIo * io;
580
581    assert( session != NULL );
582    assert( session->events != NULL );
583    assert( tr_isBool( isIncoming ) );
584    assert( tr_isBool( isSeed ) );
585    assert( tr_amInEventThread( session ) );
586    assert( (socket < 0) == (utp_socket != NULL) );
587#ifndef WITH_UTP
588    assert( socket >= 0 );
589#endif
590
591    if( socket >= 0 ) {
592        tr_netSetTOS( socket, session->peerSocketTOS );
593        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
594    }
595
596    io = tr_new0( tr_peerIo, 1 );
597    io->magicNumber = PEER_IO_MAGIC_NUMBER;
598    io->refCount = 1;
599    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
600    io->session = session;
601    io->addr = *addr;
602    io->isSeed = isSeed;
603    io->port = port;
604    io->socket = socket;
605    io->utp_socket = utp_socket;
606    io->isIncoming = isIncoming != 0;
607    io->timeCreated = tr_time( );
608    io->inbuf = evbuffer_new( );
609    io->outbuf = evbuffer_new( );
610    tr_bandwidthConstruct( &io->bandwidth, session, parent );
611    tr_bandwidthSetPeer( &io->bandwidth, io );
612    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
613    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
614
615    if( io->socket >= 0 ) {
616        io->event_read = event_new( session->event_base,
617                                    io->socket, EV_READ, event_read_cb, io );
618        io->event_write = event_new( session->event_base,
619                                     io->socket, EV_WRITE, event_write_cb, io );
620    }
621#ifdef WITH_UTP
622    else {
623        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
624        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
625        UTP_SetCallbacks( utp_socket,
626                          &utp_function_table,
627                          io );
628        if( !isIncoming ) {
629            dbgmsg( io, "%s", "calling UTP_Connect" );
630            UTP_Connect( utp_socket );
631        }
632    }
633#endif
634
635    return io;
636}
637
638tr_peerIo*
639tr_peerIoNewIncoming( tr_session        * session,
640                      tr_bandwidth      * parent,
641                      const tr_address  * addr,
642                      tr_port             port,
643                      int                 fd,
644                      struct UTPSocket  * utp_socket )
645{
646    assert( session );
647    assert( tr_address_is_valid( addr ) );
648
649    return tr_peerIoNew( session, parent, addr, port, NULL, true, false,
650                         fd, utp_socket );
651}
652
653tr_peerIo*
654tr_peerIoNewOutgoing( tr_session        * session,
655                      tr_bandwidth      * parent,
656                      const tr_address  * addr,
657                      tr_port             port,
658                      const uint8_t     * torrentHash,
659                      bool                isSeed,
660                      bool                utp )
661{
662    int fd = -1;
663    struct UTPSocket * utp_socket = NULL;
664
665    assert( session );
666    assert( tr_address_is_valid( addr ) );
667    assert( torrentHash );
668
669    if( utp )
670        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
671
672    if( !utp_socket ) {
673        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
674        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
675    }
676
677    if( fd < 0 && utp_socket == NULL )
678        return NULL;
679
680    return tr_peerIoNew( session, parent, addr, port,
681                         torrentHash, false, isSeed, fd, utp_socket );
682}
683
684/***
685****
686***/
687
688static void
689event_enable( tr_peerIo * io, short event )
690{
691    assert( tr_amInEventThread( io->session ) );
692    assert( io->session != NULL );
693    assert( io->session->events != NULL );
694
695    if( io->socket < 0 )
696        return;
697
698    assert( io->session->events != NULL );
699    assert( event_initialized( io->event_read ) );
700    assert( event_initialized( io->event_write ) );
701
702    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
703    {
704        dbgmsg( io, "enabling libevent ready-to-read polling" );
705        event_add( io->event_read, NULL );
706        io->pendingEvents |= EV_READ;
707    }
708
709    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
710    {
711        dbgmsg( io, "enabling libevent ready-to-write polling" );
712        event_add( io->event_write, NULL );
713        io->pendingEvents |= EV_WRITE;
714    }
715}
716
717static void
718event_disable( struct tr_peerIo * io, short event )
719{
720    assert( tr_amInEventThread( io->session ) );
721    assert( io->session != NULL );
722
723    if( io->socket < 0 )
724        return;
725
726    assert( io->session->events != NULL );
727    assert( event_initialized( io->event_read ) );
728    assert( event_initialized( io->event_write ) );
729
730    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
731    {
732        dbgmsg( io, "disabling libevent ready-to-read polling" );
733        event_del( io->event_read );
734        io->pendingEvents &= ~EV_READ;
735    }
736
737    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
738    {
739        dbgmsg( io, "disabling libevent ready-to-write polling" );
740        event_del( io->event_write );
741        io->pendingEvents &= ~EV_WRITE;
742    }
743}
744
745void
746tr_peerIoSetEnabled( tr_peerIo    * io,
747                     tr_direction   dir,
748                     bool           isEnabled )
749{
750    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
751
752    assert( tr_isPeerIo( io ) );
753    assert( tr_isDirection( dir ) );
754    assert( tr_amInEventThread( io->session ) );
755    assert( io->session->events != NULL );
756
757    if( isEnabled )
758        event_enable( io, event );
759    else
760        event_disable( io, event );
761}
762
763/***
764****
765***/
766static void
767io_close_socket( tr_peerIo * io )
768{
769    if( io->socket >= 0 ) {
770        tr_netClose( io->session, io->socket );
771        io->socket = -1;
772    }
773
774    if( io->event_read != NULL) {
775        event_free( io->event_read );
776        io->event_read = NULL;
777    }
778
779    if( io->event_write != NULL) {
780        event_free( io->event_write );
781        io->event_write = NULL;
782    }
783
784#ifdef WITH_UTP
785    if( io->utp_socket ) {
786        UTP_SetCallbacks( io->utp_socket,
787                          &dummy_utp_function_table,
788                          NULL );
789        UTP_Close( io->utp_socket );
790
791        io->utp_socket = NULL;
792    }
793#endif
794}
795
796static void
797io_dtor( void * vio )
798{
799    tr_peerIo * io = vio;
800
801    assert( tr_isPeerIo( io ) );
802    assert( tr_amInEventThread( io->session ) );
803    assert( io->session->events != NULL );
804
805    dbgmsg( io, "in tr_peerIo destructor" );
806    event_disable( io, EV_READ | EV_WRITE );
807    tr_bandwidthDestruct( &io->bandwidth );
808    evbuffer_free( io->outbuf );
809    evbuffer_free( io->inbuf );
810    io_close_socket( io );
811    tr_cryptoFree( io->crypto );
812
813    while( io->outbuf_datatypes != NULL )
814        peer_io_pull_datatype( io );
815
816    memset( io, ~0, sizeof( tr_peerIo ) );
817    tr_free( io );
818}
819
820static void
821tr_peerIoFree( tr_peerIo * io )
822{
823    if( io )
824    {
825        dbgmsg( io, "in tr_peerIoFree" );
826        io->canRead = NULL;
827        io->didWrite = NULL;
828        io->gotError = NULL;
829        tr_runInEventThread( io->session, io_dtor, io );
830    }
831}
832
833void
834tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
835{
836    assert( tr_isPeerIo( io ) );
837
838    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
839                file, line, io->refCount, io->refCount+1 );
840
841    ++io->refCount;
842}
843
844void
845tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
846{
847    assert( tr_isPeerIo( io ) );
848
849    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
850                file, line, io->refCount, io->refCount-1 );
851
852    if( !--io->refCount )
853        tr_peerIoFree( io );
854}
855
856const tr_address*
857tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
858{
859    assert( tr_isPeerIo( io ) );
860
861    if( port )
862        *port = io->port;
863
864    return &io->addr;
865}
866
867const char*
868tr_peerIoAddrStr( const tr_address * addr, tr_port port )
869{
870    static char buf[512];
871    tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_address_to_string( addr ), ntohs( port ) );
872    return buf;
873}
874
875const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
876{
877    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
878}
879
880void
881tr_peerIoSetIOFuncs( tr_peerIo        * io,
882                     tr_can_read_cb     readcb,
883                     tr_did_write_cb    writecb,
884                     tr_net_error_cb    errcb,
885                     void             * userData )
886{
887    io->canRead = readcb;
888    io->didWrite = writecb;
889    io->gotError = errcb;
890    io->userData = userData;
891}
892
893void
894tr_peerIoClear( tr_peerIo * io )
895{
896    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
897    tr_peerIoSetEnabled( io, TR_UP, false );
898    tr_peerIoSetEnabled( io, TR_DOWN, false );
899}
900
901int
902tr_peerIoReconnect( tr_peerIo * io )
903{
904    short int pendingEvents;
905    tr_session * session;
906
907    assert( tr_isPeerIo( io ) );
908    assert( !tr_peerIoIsIncoming( io ) );
909
910    session = tr_peerIoGetSession( io );
911
912    pendingEvents = io->pendingEvents;
913    event_disable( io, EV_READ | EV_WRITE );
914
915    io_close_socket( io );
916
917    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
918    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
919    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
920
921    if( io->socket >= 0 )
922    {
923        event_enable( io, pendingEvents );
924        tr_netSetTOS( io->socket, session->peerSocketTOS );
925        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
926        return 0;
927    }
928
929    return -1;
930}
931
932/**
933***
934**/
935
936void
937tr_peerIoSetTorrentHash( tr_peerIo *     io,
938                         const uint8_t * hash )
939{
940    assert( tr_isPeerIo( io ) );
941
942    tr_cryptoSetTorrentHash( io->crypto, hash );
943}
944
945const uint8_t*
946tr_peerIoGetTorrentHash( tr_peerIo * io )
947{
948    assert( tr_isPeerIo( io ) );
949    assert( io->crypto );
950
951    return tr_cryptoGetTorrentHash( io->crypto );
952}
953
954int
955tr_peerIoHasTorrentHash( const tr_peerIo * io )
956{
957    assert( tr_isPeerIo( io ) );
958    assert( io->crypto );
959
960    return tr_cryptoHasTorrentHash( io->crypto );
961}
962
963/**
964***
965**/
966
967void
968tr_peerIoSetPeersId( tr_peerIo *     io,
969                     const uint8_t * peer_id )
970{
971    assert( tr_isPeerIo( io ) );
972
973    if( ( io->peerIdIsSet = peer_id != NULL ) )
974        memcpy( io->peerId, peer_id, 20 );
975    else
976        memset( io->peerId, 0, 20 );
977}
978
979/**
980***
981**/
982
983static unsigned int
984getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
985{
986    /* this is all kind of arbitrary, but what seems to work well is
987     * being large enough to hold the next 20 seconds' worth of input,
988     * or a few blocks, whichever is bigger.
989     * It's okay to tweak this as needed */
990    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
991    const unsigned int period = 15u; /* arbitrary */
992    /* the 3 is arbitrary; the .5 is to leave room for messages */
993    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
994    return MAX( ceiling, currentSpeed_Bps*period );
995}
996
997size_t
998tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
999{
1000    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
1001    const size_t currentLen = evbuffer_get_length( io->outbuf );
1002    size_t freeSpace = 0;
1003
1004    if( desiredLen > currentLen )
1005        freeSpace = desiredLen - currentLen;
1006
1007    return freeSpace;
1008}
1009
1010/**
1011***
1012**/
1013
1014void
1015tr_peerIoSetEncryption( tr_peerIo * io, tr_encryption_type encryption_type )
1016{
1017    assert( tr_isPeerIo( io ) );
1018    assert( encryption_type == PEER_ENCRYPTION_NONE
1019         || encryption_type == PEER_ENCRYPTION_RC4 );
1020
1021    io->encryption_type = encryption_type;
1022}
1023
1024/**
1025***
1026**/
1027
1028static void
1029addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData )
1030{
1031    struct tr_datatype * d;
1032    d = datatype_new( );
1033    d->isPieceData = isPieceData != 0;
1034    d->length = byteCount;
1035    peer_io_push_datatype( io, d );
1036}
1037
1038static void
1039maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
1040{
1041    if( io->encryption_type == PEER_ENCRYPTION_RC4 )
1042    {
1043        struct evbuffer_ptr pos;
1044        struct evbuffer_iovec iovec;
1045        evbuffer_ptr_set( buf, &pos, 0, EVBUFFER_PTR_SET );
1046        do {
1047            evbuffer_peek( buf, -1, &pos, &iovec, 1 );
1048            tr_cryptoEncrypt( io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1049        } while( !evbuffer_ptr_set( buf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1050    }
1051}
1052
1053void
1054tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData )
1055{
1056    const size_t byteCount = evbuffer_get_length( buf );
1057    maybeEncryptBuffer( io, buf );
1058    evbuffer_add_buffer( io->outbuf, buf );
1059    addDatatype( io, byteCount, isPieceData );
1060}
1061
1062void
1063tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData )
1064{
1065    struct evbuffer * buf = evbuffer_new( );
1066    evbuffer_add( buf, bytes, byteCount );
1067    tr_peerIoWriteBuf( io, buf, isPieceData );
1068    evbuffer_free( buf );
1069}
1070
1071/***
1072****
1073***/
1074
1075void
1076evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1077{
1078    evbuffer_add( outbuf, &byte, 1 );
1079}
1080
1081void
1082evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1083{
1084    const uint16_t ns = htons( addme_hs );
1085    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1086}
1087
1088void
1089evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1090{
1091    const uint32_t nl = htonl( addme_hl );
1092    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1093}
1094
1095void
1096evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll )
1097{
1098    const uint64_t nll = tr_htonll( addme_hll );
1099    evbuffer_add( outbuf, &nll, sizeof( nll ) );
1100}
1101
1102/***
1103****
1104***/
1105
1106void
1107tr_peerIoReadBytesToBuf( tr_peerIo * io, struct evbuffer * inbuf, struct evbuffer * outbuf, size_t byteCount )
1108{
1109    struct evbuffer * tmp;
1110    const size_t old_length = evbuffer_get_length( outbuf );
1111
1112    assert( tr_isPeerIo( io ) );
1113    assert( evbuffer_get_length( inbuf ) >= byteCount );
1114
1115    /* append it to outbuf */
1116    tmp = evbuffer_new( );
1117    evbuffer_remove_buffer( inbuf, tmp, byteCount );
1118    evbuffer_add_buffer( outbuf, tmp );
1119    evbuffer_free( tmp );
1120
1121    /* decrypt if needed */
1122    if( io->encryption_type == PEER_ENCRYPTION_RC4 ) {
1123        struct evbuffer_ptr pos;
1124        struct evbuffer_iovec iovec;
1125        evbuffer_ptr_set( outbuf, &pos, old_length, EVBUFFER_PTR_SET );
1126        do {
1127            evbuffer_peek( outbuf, byteCount, &pos, &iovec, 1 );
1128            tr_cryptoDecrypt( io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1129            byteCount -= iovec.iov_len;
1130        } while( !evbuffer_ptr_set( outbuf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1131    }
1132}
1133
1134void
1135tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1136{
1137    assert( tr_isPeerIo( io ) );
1138    assert( evbuffer_get_length( inbuf )  >= byteCount );
1139
1140    switch( io->encryption_type )
1141    {
1142        case PEER_ENCRYPTION_NONE:
1143            evbuffer_remove( inbuf, bytes, byteCount );
1144            break;
1145
1146        case PEER_ENCRYPTION_RC4:
1147            evbuffer_remove( inbuf, bytes, byteCount );
1148            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1149            break;
1150
1151        default:
1152            assert( 0 );
1153    }
1154}
1155
1156void
1157tr_peerIoReadUint16( tr_peerIo        * io,
1158                     struct evbuffer  * inbuf,
1159                     uint16_t         * setme )
1160{
1161    uint16_t tmp;
1162    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1163    *setme = ntohs( tmp );
1164}
1165
1166void tr_peerIoReadUint32( tr_peerIo        * io,
1167                          struct evbuffer  * inbuf,
1168                          uint32_t         * setme )
1169{
1170    uint32_t tmp;
1171    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1172    *setme = ntohl( tmp );
1173}
1174
1175void
1176tr_peerIoDrain( tr_peerIo       * io,
1177                struct evbuffer * inbuf,
1178                size_t            byteCount )
1179{
1180    char buf[4096];
1181    const size_t buflen = sizeof( buf );
1182
1183    while( byteCount > 0 )
1184    {
1185        const size_t thisPass = MIN( byteCount, buflen );
1186        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1187        byteCount -= thisPass;
1188    }
1189}
1190
1191/***
1192****
1193***/
1194
1195static int
1196tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1197{
1198    int res = 0;
1199
1200    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1201    {
1202        if( io->utp_socket != NULL ) /* utp peer connection */
1203        {
1204            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1205             * It opens up the congestion window by sending an ACK (soonish)
1206             * if one was not going to be sent. */
1207            if( evbuffer_get_length( io->inbuf ) == 0 )
1208                UTP_RBDrained( io->utp_socket );
1209        }
1210        else /* tcp peer connection */
1211        {
1212            int e;
1213
1214            EVUTIL_SET_SOCKET_ERROR( 0 );
1215            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1216            e = EVUTIL_SOCKET_ERROR( );
1217
1218            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1219
1220            if( evbuffer_get_length( io->inbuf ) )
1221                canReadWrapper( io );
1222
1223            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1224            {
1225                char errstr[512];
1226                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1227                if( res == 0 )
1228                    what |= BEV_EVENT_EOF;
1229                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)",
1230                        res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
1231                io->gotError( io, what, io->userData );
1232            }
1233        }
1234    }
1235
1236    return res;
1237}
1238
1239static int
1240tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1241{
1242    int n = 0;
1243    const size_t old_len = evbuffer_get_length( io->outbuf );
1244    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1245
1246    if( howmuch > old_len )
1247        howmuch = old_len;
1248
1249    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1250    {
1251        if( io->utp_socket != NULL ) /* utp peer connection */
1252        {
1253            const size_t old_len = evbuffer_get_length( io->outbuf );
1254            UTP_Write( io->utp_socket, howmuch );
1255            n = old_len - evbuffer_get_length( io->outbuf );
1256        }
1257        else
1258        {
1259            int e;
1260
1261            EVUTIL_SET_SOCKET_ERROR( 0 );
1262            n = tr_evbuffer_write( io, io->socket, howmuch );
1263            e = EVUTIL_SOCKET_ERROR( );
1264
1265            if( n > 0 )
1266                didWriteWrapper( io, n );
1267
1268            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1269            {
1270                char errstr[512];
1271                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1272
1273                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)",
1274                        n, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
1275
1276                if( io->gotError != NULL )
1277                    io->gotError( io, what, io->userData );
1278            }
1279        }
1280    }
1281
1282    return n;
1283}
1284
1285int
1286tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1287{
1288    int bytesUsed = 0;
1289
1290    assert( tr_isPeerIo( io ) );
1291    assert( tr_isDirection( dir ) );
1292
1293    if( dir == TR_DOWN )
1294        bytesUsed = tr_peerIoTryRead( io, limit );
1295    else
1296        bytesUsed = tr_peerIoTryWrite( io, limit );
1297
1298    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1299    return bytesUsed;
1300}
1301
1302int
1303tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1304{
1305    size_t byteCount = 0;
1306    const struct tr_datatype * it;
1307
1308    /* count up how many bytes are used by non-piece-data messages
1309       at the front of our outbound queue */
1310    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1311        if( it->isPieceData )
1312            break;
1313        else
1314            byteCount += it->length;
1315
1316    return tr_peerIoFlush( io, TR_UP, byteCount );
1317}
Note: See TracBrowser for help on using the repository browser.