source: trunk/libtransmission/peer-io.c @ 12918

Last change on this file since 12918 was 12477, checked in by jordan, 10 years ago

(trunk libt) #4301 "strerror used instead of tr_strerror -- fix in peer-io.c too

  • Property svn:keywords set to Date Rev Author Id
File size: 34.3 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 12477 2011-05-30 15:50:50Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h>
16
17#include <event2/event.h>
18#include <event2/buffer.h>
19#include <event2/bufferevent.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "session.h"
25#include "bandwidth.h"
26#include "crypto.h"
27#include "net.h"
28#include "peer-common.h" /* MAX_BLOCK_SIZE */
29#include "peer-io.h"
30#include "trevent.h" /* tr_runInEventThread() */
31#include "utils.h"
32
33
34#ifdef WIN32
35 #define EAGAIN       WSAEWOULDBLOCK
36 #define EINTR        WSAEINTR
37 #define EINPROGRESS  WSAEINPROGRESS
38 #define EPIPE        WSAECONNRESET
39#endif
40
41/* The amount of read bufferring that we allow for uTP sockets. */
42
43#define UTP_READ_BUFFER_SIZE (256 * 1024)
44
45static size_t
46guessPacketOverhead( size_t d )
47{
48    /**
49     * http://sd.wareonearth.com/~phil/net/overhead/
50     *
51     * TCP over Ethernet:
52     * Assuming no header compression (e.g. not PPP)
53     * Add 20 IPv4 header or 40 IPv6 header (no options)
54     * Add 20 TCP header
55     * Add 12 bytes optional TCP timestamps
56     * Max TCP Payload data rates over ethernet are thus:
57     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
58     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
59     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
60     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
61     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
62     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
63     */
64    const double assumed_payload_data_rate = 94.0;
65
66    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
67}
68
69/**
70***
71**/
72
73#define dbgmsg( io, ... ) \
74    do { \
75        if( tr_deepLoggingIsActive( ) ) \
76            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
77    } while( 0 )
78
79/**
80***
81**/
82
83struct tr_datatype
84{
85    struct tr_datatype * next;
86    size_t length;
87    bool isPieceData;
88};
89
90static struct tr_datatype * datatype_pool = NULL;
91
92static const struct tr_datatype TR_DATATYPE_INIT = { NULL, 0, false };
93
94static struct tr_datatype *
95datatype_new( void )
96{
97    struct tr_datatype * ret;
98
99    if( datatype_pool == NULL )
100        ret = tr_new( struct tr_datatype, 1 );
101    else {
102        ret = datatype_pool;
103        datatype_pool = datatype_pool->next;
104    }
105
106    *ret = TR_DATATYPE_INIT;
107    return ret;
108}
109
110static void
111datatype_free( struct tr_datatype * datatype )
112{
113    datatype->next = datatype_pool;
114    datatype_pool = datatype;
115}
116
117static void
118peer_io_pull_datatype( tr_peerIo * io )
119{
120    struct tr_datatype * tmp;
121
122    if(( tmp = io->outbuf_datatypes ))
123    {
124        io->outbuf_datatypes = tmp->next;
125        datatype_free( tmp );
126    }
127}
128
129static void
130peer_io_push_datatype( tr_peerIo * io, struct tr_datatype * datatype )
131{
132    struct tr_datatype * tmp;
133
134    if(( tmp = io->outbuf_datatypes )) {
135        while( tmp->next != NULL )
136            tmp = tmp->next;
137        tmp->next = datatype;
138    } else {
139        io->outbuf_datatypes = datatype;
140    }
141}
142
143/***
144****
145***/
146
147static void
148didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
149{
150     while( bytes_transferred && tr_isPeerIo( io ) )
151     {
152        struct tr_datatype * next = io->outbuf_datatypes;
153
154        const unsigned int payload = MIN( next->length, bytes_transferred );
155        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
156        const unsigned int overhead =
157            io->socket ? guessPacketOverhead( payload ) : 0;
158        const uint64_t now = tr_time_msec( );
159
160        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
161
162        if( overhead > 0 )
163            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
164
165        if( io->didWrite )
166            io->didWrite( io, payload, next->isPieceData, io->userData );
167
168        if( tr_isPeerIo( io ) )
169        {
170            bytes_transferred -= payload;
171            next->length -= payload;
172            if( !next->length )
173                peer_io_pull_datatype( io );
174        }
175    }
176}
177
178static void
179canReadWrapper( tr_peerIo * io )
180{
181    bool err = 0;
182    bool done = 0;
183    tr_session * session;
184
185    dbgmsg( io, "canRead" );
186
187    tr_peerIoRef( io );
188
189    session = io->session;
190
191    /* try to consume the input buffer */
192    if( io->canRead )
193    {
194        const uint64_t now = tr_time_msec( );
195
196        tr_sessionLock( session );
197
198        while( !done && !err )
199        {
200            size_t piece = 0;
201            const size_t oldLen = evbuffer_get_length( io->inbuf );
202            const int ret = io->canRead( io, io->userData, &piece );
203            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
204            const unsigned int overhead = guessPacketOverhead( used );
205
206            if( piece || (piece!=used) )
207            {
208                if( piece )
209                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now );
210
211                if( used != piece )
212                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now );
213            }
214
215            if( overhead > 0 )
216                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
217
218            switch( ret )
219            {
220                case READ_NOW:
221                    if( evbuffer_get_length( io->inbuf ) )
222                        continue;
223                    done = 1;
224                    break;
225
226                case READ_LATER:
227                    done = 1;
228                    break;
229
230                case READ_ERR:
231                    err = 1;
232                    break;
233            }
234
235            assert( tr_isPeerIo( io ) );
236        }
237
238        tr_sessionUnlock( session );
239    }
240
241    tr_peerIoUnref( io );
242}
243
244static void
245event_read_cb( int fd, short event UNUSED, void * vio )
246{
247    int res;
248    int e;
249    tr_peerIo * io = vio;
250
251    /* Limit the input buffer to 256K, so it doesn't grow too large */
252    unsigned int howmuch;
253    unsigned int curlen;
254    const tr_direction dir = TR_DOWN;
255    const unsigned int max = 256 * 1024;
256
257    assert( tr_isPeerIo( io ) );
258    assert( io->socket >= 0 );
259
260    io->pendingEvents &= ~EV_READ;
261
262    curlen = evbuffer_get_length( io->inbuf );
263    howmuch = curlen >= max ? 0 : max - curlen;
264    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
265
266    dbgmsg( io, "libevent says this peer is ready to read" );
267
268    /* if we don't have any bandwidth left, stop reading */
269    if( howmuch < 1 ) {
270        tr_peerIoSetEnabled( io, dir, false );
271        return;
272    }
273
274    EVUTIL_SET_SOCKET_ERROR( 0 );
275    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
276    e = EVUTIL_SOCKET_ERROR( );
277
278    if( res > 0 )
279    {
280        tr_peerIoSetEnabled( io, dir, true );
281
282        /* Invoke the user callback - must always be called last */
283        canReadWrapper( io );
284    }
285    else
286    {
287        char errstr[512];
288        short what = BEV_EVENT_READING;
289
290        if( res == 0 ) /* EOF */
291            what |= BEV_EVENT_EOF;
292        else if( res == -1 ) {
293            if( e == EAGAIN || e == EINTR ) {
294                tr_peerIoSetEnabled( io, dir, true );
295                return;
296            }
297            what |= BEV_EVENT_ERROR;
298        }
299
300        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)",
301                res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
302
303        if( io->gotError != NULL )
304            io->gotError( io, what, io->userData );
305    }
306}
307
308static int
309tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
310{
311    int e;
312    int n;
313    char errstr[256];
314
315    EVUTIL_SET_SOCKET_ERROR( 0 );
316    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
317    e = EVUTIL_SOCKET_ERROR( );
318    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
319
320    return n;
321}
322
323static void
324event_write_cb( int fd, short event UNUSED, void * vio )
325{
326    int res = 0;
327    int e;
328    short what = BEV_EVENT_WRITING;
329    tr_peerIo * io = vio;
330    size_t howmuch;
331    const tr_direction dir = TR_UP;
332    char errstr[1024];
333
334    assert( tr_isPeerIo( io ) );
335    assert( io->socket >= 0 );
336
337    io->pendingEvents &= ~EV_WRITE;
338
339    dbgmsg( io, "libevent says this peer is ready to write" );
340
341    /* Write as much as possible, since the socket is non-blocking, write() will
342     * return if it can't write any more data without blocking */
343    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
344
345    /* if we don't have any bandwidth left, stop writing */
346    if( howmuch < 1 ) {
347        tr_peerIoSetEnabled( io, dir, false );
348        return;
349    }
350
351    EVUTIL_SET_SOCKET_ERROR( 0 );
352    res = tr_evbuffer_write( io, fd, howmuch );
353    e = EVUTIL_SOCKET_ERROR( );
354
355    if (res == -1) {
356        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
357            goto reschedule;
358        /* error case */
359        what |= BEV_EVENT_ERROR;
360    } else if (res == 0) {
361        /* eof case */
362        what |= BEV_EVENT_EOF;
363    }
364    if (res <= 0)
365        goto error;
366
367    if( evbuffer_get_length( io->outbuf ) )
368        tr_peerIoSetEnabled( io, dir, true );
369
370    didWriteWrapper( io, res );
371    return;
372
373 reschedule:
374    if( evbuffer_get_length( io->outbuf ) )
375        tr_peerIoSetEnabled( io, dir, true );
376    return;
377
378 error:
379
380    tr_net_strerror( errstr, sizeof( errstr ), e );
381    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
382
383    if( io->gotError != NULL )
384        io->gotError( io, what, io->userData );
385}
386
387/**
388***
389**/
390
391static void
392maybeSetCongestionAlgorithm( int socket, const char * algorithm )
393{
394    if( algorithm && *algorithm )
395    {
396        const int rc = tr_netSetCongestionControl( socket, algorithm );
397
398        if( rc < 0 )
399            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
400                     algorithm, tr_strerror( errno ));
401    }
402}
403
404#ifdef WITH_UTP
405/* UTP callbacks */
406
407static void
408utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
409{
410    int rc;
411    tr_peerIo *io = closure;
412    assert( tr_isPeerIo( io ) );
413
414    rc = evbuffer_add( io->inbuf, buf, buflen );
415    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
416
417    if( rc < 0 ) {
418        tr_nerr( "UTP", "On read evbuffer_add" );
419        return;
420    }
421
422    tr_peerIoSetEnabled( io, TR_DOWN, true );
423    canReadWrapper( io );
424}
425
426static void
427utp_on_write(void *closure, unsigned char *buf, size_t buflen)
428{
429    int rc;
430    tr_peerIo *io = closure;
431    assert( tr_isPeerIo( io ) );
432
433    rc = evbuffer_remove( io->outbuf, buf, buflen );
434    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
435    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
436    if( rc < (long)buflen ) {
437        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
438    }
439
440    didWriteWrapper( io, buflen );
441}
442
443static size_t
444utp_get_rb_size(void *closure)
445{
446    size_t bytes;
447    tr_peerIo *io = closure;
448    assert( tr_isPeerIo( io ) );
449
450    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
451
452    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
453    return UTP_READ_BUFFER_SIZE - bytes;
454}
455
456static void
457utp_on_state_change(void *closure, int state)
458{
459    tr_peerIo *io = closure;
460    assert( tr_isPeerIo( io ) );
461
462    if( state == UTP_STATE_CONNECT ) {
463        dbgmsg( io, "utp_on_state_change -- changed to connected" );
464        io->utpSupported = true;
465    } else if( state == UTP_STATE_WRITABLE ) {
466        dbgmsg( io, "utp_on_state_change -- changed to writable" );
467    } else if( state == UTP_STATE_EOF ) {
468        if( io->gotError )
469            io->gotError( io, BEV_EVENT_EOF, io->userData );
470    } else if( state == UTP_STATE_DESTROYING ) {
471        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
472        return;
473    } else {
474        tr_nerr( "UTP", "Unknown state %d", state );
475    }
476}
477
478static void
479utp_on_error(void *closure, int errcode)
480{
481    tr_peerIo *io = closure;
482    assert( tr_isPeerIo( io ) );
483
484    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
485
486    if( io->gotError ) {
487        errno = errcode;
488        io->gotError( io, BEV_EVENT_ERROR, io->userData );
489    }
490}
491
492static void
493utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
494{
495    tr_peerIo *io = closure;
496    assert( tr_isPeerIo( io ) );
497
498    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
499
500    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
501                      count, false, tr_time_msec() );
502}
503
504static struct UTPFunctionTable utp_function_table = {
505    .on_read = utp_on_read,
506    .on_write = utp_on_write,
507    .get_rb_size = utp_get_rb_size,
508    .on_state = utp_on_state_change,
509    .on_error = utp_on_error,
510    .on_overhead = utp_on_overhead
511};
512
513
514/* Dummy UTP callbacks. */
515/* We switch a UTP socket to use these after the associated peerIo has been
516   destroyed -- see io_dtor. */
517
518static void
519dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
520{
521    /* This cannot happen, as far as I'm aware. */
522    tr_nerr( "UTP", "On_read called on closed socket" );
523
524}
525
526static void
527dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
528{
529    /* This can very well happen if we've shut down a peer connection that
530       had unflushed buffers.  Complain and send zeroes. */
531    tr_ndbg( "UTP", "On_write called on closed socket" );
532    memset( buf, 0, buflen );
533}
534
535static size_t
536dummy_get_rb_size( void * closure UNUSED )
537{
538    return 0;
539}
540
541static void
542dummy_on_state_change(void * closure UNUSED, int state UNUSED )
543{
544    return;
545}
546
547static void
548dummy_on_error( void * closure UNUSED, int errcode UNUSED )
549{
550    return;
551}
552
553static void
554dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
555{
556    return;
557}
558
559static struct UTPFunctionTable dummy_utp_function_table = {
560    .on_read = dummy_read,
561    .on_write = dummy_write,
562    .get_rb_size = dummy_get_rb_size,
563    .on_state = dummy_on_state_change,
564    .on_error = dummy_on_error,
565    .on_overhead = dummy_on_overhead
566};
567
568#endif /* #ifdef WITH_UTP */
569
570static tr_peerIo*
571tr_peerIoNew( tr_session       * session,
572              tr_bandwidth     * parent,
573              const tr_address * addr,
574              tr_port            port,
575              const uint8_t    * torrentHash,
576              bool               isIncoming,
577              bool               isSeed,
578              int                socket,
579              struct UTPSocket * utp_socket)
580{
581    tr_peerIo * io;
582
583    assert( session != NULL );
584    assert( session->events != NULL );
585    assert( tr_isBool( isIncoming ) );
586    assert( tr_isBool( isSeed ) );
587    assert( tr_amInEventThread( session ) );
588    assert( (socket < 0) == (utp_socket != NULL) );
589#ifndef WITH_UTP
590    assert( socket >= 0 );
591#endif
592
593    if( socket >= 0 ) {
594        tr_netSetTOS( socket, session->peerSocketTOS );
595        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
596    }
597
598    io = tr_new0( tr_peerIo, 1 );
599    io->magicNumber = PEER_IO_MAGIC_NUMBER;
600    io->refCount = 1;
601    tr_cryptoConstruct( &io->crypto, torrentHash, isIncoming );
602    io->session = session;
603    io->addr = *addr;
604    io->isSeed = isSeed;
605    io->port = port;
606    io->socket = socket;
607    io->utp_socket = utp_socket;
608    io->isIncoming = isIncoming != 0;
609    io->timeCreated = tr_time( );
610    io->inbuf = evbuffer_new( );
611    io->outbuf = evbuffer_new( );
612    tr_bandwidthConstruct( &io->bandwidth, session, parent );
613    tr_bandwidthSetPeer( &io->bandwidth, io );
614    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
615    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
616
617    if( io->socket >= 0 ) {
618        io->event_read = event_new( session->event_base,
619                                    io->socket, EV_READ, event_read_cb, io );
620        io->event_write = event_new( session->event_base,
621                                     io->socket, EV_WRITE, event_write_cb, io );
622    }
623#ifdef WITH_UTP
624    else {
625        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
626        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
627        UTP_SetCallbacks( utp_socket,
628                          &utp_function_table,
629                          io );
630        if( !isIncoming ) {
631            dbgmsg( io, "%s", "calling UTP_Connect" );
632            UTP_Connect( utp_socket );
633        }
634    }
635#endif
636
637    return io;
638}
639
640tr_peerIo*
641tr_peerIoNewIncoming( tr_session        * session,
642                      tr_bandwidth      * parent,
643                      const tr_address  * addr,
644                      tr_port             port,
645                      int                 fd,
646                      struct UTPSocket  * utp_socket )
647{
648    assert( session );
649    assert( tr_address_is_valid( addr ) );
650
651    return tr_peerIoNew( session, parent, addr, port, NULL, true, false,
652                         fd, utp_socket );
653}
654
655tr_peerIo*
656tr_peerIoNewOutgoing( tr_session        * session,
657                      tr_bandwidth      * parent,
658                      const tr_address  * addr,
659                      tr_port             port,
660                      const uint8_t     * torrentHash,
661                      bool                isSeed,
662                      bool                utp )
663{
664    int fd = -1;
665    struct UTPSocket * utp_socket = NULL;
666
667    assert( session );
668    assert( tr_address_is_valid( addr ) );
669    assert( torrentHash );
670
671    if( utp )
672        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
673
674    if( !utp_socket ) {
675        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
676        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
677    }
678
679    if( fd < 0 && utp_socket == NULL )
680        return NULL;
681
682    return tr_peerIoNew( session, parent, addr, port,
683                         torrentHash, false, isSeed, fd, utp_socket );
684}
685
686/***
687****
688***/
689
690static void
691event_enable( tr_peerIo * io, short event )
692{
693    assert( tr_amInEventThread( io->session ) );
694    assert( io->session != NULL );
695    assert( io->session->events != NULL );
696
697    if( io->socket < 0 )
698        return;
699
700    assert( io->session->events != NULL );
701    assert( event_initialized( io->event_read ) );
702    assert( event_initialized( io->event_write ) );
703
704    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
705    {
706        dbgmsg( io, "enabling libevent ready-to-read polling" );
707        event_add( io->event_read, NULL );
708        io->pendingEvents |= EV_READ;
709    }
710
711    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
712    {
713        dbgmsg( io, "enabling libevent ready-to-write polling" );
714        event_add( io->event_write, NULL );
715        io->pendingEvents |= EV_WRITE;
716    }
717}
718
719static void
720event_disable( struct tr_peerIo * io, short event )
721{
722    assert( tr_amInEventThread( io->session ) );
723    assert( io->session != NULL );
724
725    if( io->socket < 0 )
726        return;
727
728    assert( io->session->events != NULL );
729    assert( event_initialized( io->event_read ) );
730    assert( event_initialized( io->event_write ) );
731
732    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
733    {
734        dbgmsg( io, "disabling libevent ready-to-read polling" );
735        event_del( io->event_read );
736        io->pendingEvents &= ~EV_READ;
737    }
738
739    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
740    {
741        dbgmsg( io, "disabling libevent ready-to-write polling" );
742        event_del( io->event_write );
743        io->pendingEvents &= ~EV_WRITE;
744    }
745}
746
747void
748tr_peerIoSetEnabled( tr_peerIo    * io,
749                     tr_direction   dir,
750                     bool           isEnabled )
751{
752    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
753
754    assert( tr_isPeerIo( io ) );
755    assert( tr_isDirection( dir ) );
756    assert( tr_amInEventThread( io->session ) );
757    assert( io->session->events != NULL );
758
759    if( isEnabled )
760        event_enable( io, event );
761    else
762        event_disable( io, event );
763}
764
765/***
766****
767***/
768static void
769io_close_socket( tr_peerIo * io )
770{
771    if( io->socket >= 0 ) {
772        tr_netClose( io->session, io->socket );
773        io->socket = -1;
774    }
775
776    if( io->event_read != NULL) {
777        event_free( io->event_read );
778        io->event_read = NULL;
779    }
780
781    if( io->event_write != NULL) {
782        event_free( io->event_write );
783        io->event_write = NULL;
784    }
785
786#ifdef WITH_UTP
787    if( io->utp_socket ) {
788        UTP_SetCallbacks( io->utp_socket,
789                          &dummy_utp_function_table,
790                          NULL );
791        UTP_Close( io->utp_socket );
792
793        io->utp_socket = NULL;
794    }
795#endif
796}
797
798static void
799io_dtor( void * vio )
800{
801    tr_peerIo * io = vio;
802
803    assert( tr_isPeerIo( io ) );
804    assert( tr_amInEventThread( io->session ) );
805    assert( io->session->events != NULL );
806
807    dbgmsg( io, "in tr_peerIo destructor" );
808    event_disable( io, EV_READ | EV_WRITE );
809    tr_bandwidthDestruct( &io->bandwidth );
810    evbuffer_free( io->outbuf );
811    evbuffer_free( io->inbuf );
812    io_close_socket( io );
813    tr_cryptoDestruct( &io->crypto );
814
815    while( io->outbuf_datatypes != NULL )
816        peer_io_pull_datatype( io );
817
818    memset( io, ~0, sizeof( tr_peerIo ) );
819    tr_free( io );
820}
821
822static void
823tr_peerIoFree( tr_peerIo * io )
824{
825    if( io )
826    {
827        dbgmsg( io, "in tr_peerIoFree" );
828        io->canRead = NULL;
829        io->didWrite = NULL;
830        io->gotError = NULL;
831        tr_runInEventThread( io->session, io_dtor, io );
832    }
833}
834
835void
836tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
837{
838    assert( tr_isPeerIo( io ) );
839
840    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
841                file, line, io->refCount, io->refCount+1 );
842
843    ++io->refCount;
844}
845
846void
847tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
848{
849    assert( tr_isPeerIo( io ) );
850
851    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
852                file, line, io->refCount, io->refCount-1 );
853
854    if( !--io->refCount )
855        tr_peerIoFree( io );
856}
857
858const tr_address*
859tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
860{
861    assert( tr_isPeerIo( io ) );
862
863    if( port )
864        *port = io->port;
865
866    return &io->addr;
867}
868
869const char*
870tr_peerIoAddrStr( const tr_address * addr, tr_port port )
871{
872    static char buf[512];
873    tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_address_to_string( addr ), ntohs( port ) );
874    return buf;
875}
876
877const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
878{
879    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
880}
881
882void
883tr_peerIoSetIOFuncs( tr_peerIo        * io,
884                     tr_can_read_cb     readcb,
885                     tr_did_write_cb    writecb,
886                     tr_net_error_cb    errcb,
887                     void             * userData )
888{
889    io->canRead = readcb;
890    io->didWrite = writecb;
891    io->gotError = errcb;
892    io->userData = userData;
893}
894
895void
896tr_peerIoClear( tr_peerIo * io )
897{
898    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
899    tr_peerIoSetEnabled( io, TR_UP, false );
900    tr_peerIoSetEnabled( io, TR_DOWN, false );
901}
902
903int
904tr_peerIoReconnect( tr_peerIo * io )
905{
906    short int pendingEvents;
907    tr_session * session;
908
909    assert( tr_isPeerIo( io ) );
910    assert( !tr_peerIoIsIncoming( io ) );
911
912    session = tr_peerIoGetSession( io );
913
914    pendingEvents = io->pendingEvents;
915    event_disable( io, EV_READ | EV_WRITE );
916
917    io_close_socket( io );
918
919    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
920    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
921    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
922
923    if( io->socket >= 0 )
924    {
925        event_enable( io, pendingEvents );
926        tr_netSetTOS( io->socket, session->peerSocketTOS );
927        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
928        return 0;
929    }
930
931    return -1;
932}
933
934/**
935***
936**/
937
938void
939tr_peerIoSetTorrentHash( tr_peerIo *     io,
940                         const uint8_t * hash )
941{
942    assert( tr_isPeerIo( io ) );
943
944    tr_cryptoSetTorrentHash( &io->crypto, hash );
945}
946
947const uint8_t*
948tr_peerIoGetTorrentHash( tr_peerIo * io )
949{
950    assert( tr_isPeerIo( io ) );
951
952    return tr_cryptoGetTorrentHash( &io->crypto );
953}
954
955int
956tr_peerIoHasTorrentHash( const tr_peerIo * io )
957{
958    assert( tr_isPeerIo( io ) );
959
960    return tr_cryptoHasTorrentHash( &io->crypto );
961}
962
963/**
964***
965**/
966
967void
968tr_peerIoSetPeersId( tr_peerIo * io, const uint8_t * peer_id )
969{
970    assert( tr_isPeerIo( io ) );
971
972    if( ( io->peerIdIsSet = peer_id != NULL ) )
973        memcpy( io->peerId, peer_id, 20 );
974    else
975        memset( io->peerId, 0, 20 );
976}
977
978/**
979***
980**/
981
982static unsigned int
983getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
984{
985    /* this is all kind of arbitrary, but what seems to work well is
986     * being large enough to hold the next 20 seconds' worth of input,
987     * or a few blocks, whichever is bigger.
988     * It's okay to tweak this as needed */
989    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
990    const unsigned int period = 15u; /* arbitrary */
991    /* the 3 is arbitrary; the .5 is to leave room for messages */
992    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
993    return MAX( ceiling, currentSpeed_Bps*period );
994}
995
996size_t
997tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
998{
999    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
1000    const size_t currentLen = evbuffer_get_length( io->outbuf );
1001    size_t freeSpace = 0;
1002
1003    if( desiredLen > currentLen )
1004        freeSpace = desiredLen - currentLen;
1005
1006    return freeSpace;
1007}
1008
1009/**
1010***
1011**/
1012
1013void
1014tr_peerIoSetEncryption( tr_peerIo * io, tr_encryption_type encryption_type )
1015{
1016    assert( tr_isPeerIo( io ) );
1017    assert( encryption_type == PEER_ENCRYPTION_NONE
1018         || encryption_type == PEER_ENCRYPTION_RC4 );
1019
1020    io->encryption_type = encryption_type;
1021}
1022
1023/**
1024***
1025**/
1026
1027static void
1028addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData )
1029{
1030    struct tr_datatype * d;
1031    d = datatype_new( );
1032    d->isPieceData = isPieceData != 0;
1033    d->length = byteCount;
1034    peer_io_push_datatype( io, d );
1035}
1036
1037static void
1038maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
1039{
1040    if( io->encryption_type == PEER_ENCRYPTION_RC4 )
1041    {
1042        struct evbuffer_ptr pos;
1043        struct evbuffer_iovec iovec;
1044        evbuffer_ptr_set( buf, &pos, 0, EVBUFFER_PTR_SET );
1045        do {
1046            evbuffer_peek( buf, -1, &pos, &iovec, 1 );
1047            tr_cryptoEncrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1048        } while( !evbuffer_ptr_set( buf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1049    }
1050}
1051
1052void
1053tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData )
1054{
1055    const size_t byteCount = evbuffer_get_length( buf );
1056    maybeEncryptBuffer( io, buf );
1057    evbuffer_add_buffer( io->outbuf, buf );
1058    addDatatype( io, byteCount, isPieceData );
1059}
1060
1061void
1062tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData )
1063{
1064    struct evbuffer_iovec iovec;
1065    evbuffer_reserve_space( io->outbuf, byteCount, &iovec, 1 );
1066
1067    iovec.iov_len = byteCount;
1068    if( io->encryption_type == PEER_ENCRYPTION_RC4 )
1069        tr_cryptoEncrypt( &io->crypto, iovec.iov_len, bytes, iovec.iov_base );
1070    else
1071        memcpy( iovec.iov_base, bytes, iovec.iov_len );
1072    evbuffer_commit_space( io->outbuf, &iovec, 1 );
1073
1074    addDatatype( io, byteCount, isPieceData );
1075}
1076
1077/***
1078****
1079***/
1080
1081void
1082evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1083{
1084    evbuffer_add( outbuf, &byte, 1 );
1085}
1086
1087void
1088evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1089{
1090    const uint16_t ns = htons( addme_hs );
1091    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1092}
1093
1094void
1095evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1096{
1097    const uint32_t nl = htonl( addme_hl );
1098    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1099}
1100
1101void
1102evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll )
1103{
1104    const uint64_t nll = tr_htonll( addme_hll );
1105    evbuffer_add( outbuf, &nll, sizeof( nll ) );
1106}
1107
1108/***
1109****
1110***/
1111
1112void
1113tr_peerIoReadBytesToBuf( tr_peerIo * io, struct evbuffer * inbuf, struct evbuffer * outbuf, size_t byteCount )
1114{
1115    struct evbuffer * tmp;
1116    const size_t old_length = evbuffer_get_length( outbuf );
1117
1118    assert( tr_isPeerIo( io ) );
1119    assert( evbuffer_get_length( inbuf ) >= byteCount );
1120
1121    /* append it to outbuf */
1122    tmp = evbuffer_new( );
1123    evbuffer_remove_buffer( inbuf, tmp, byteCount );
1124    evbuffer_add_buffer( outbuf, tmp );
1125    evbuffer_free( tmp );
1126
1127    /* decrypt if needed */
1128    if( io->encryption_type == PEER_ENCRYPTION_RC4 ) {
1129        struct evbuffer_ptr pos;
1130        struct evbuffer_iovec iovec;
1131        evbuffer_ptr_set( outbuf, &pos, old_length, EVBUFFER_PTR_SET );
1132        do {
1133            evbuffer_peek( outbuf, byteCount, &pos, &iovec, 1 );
1134            tr_cryptoDecrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1135            byteCount -= iovec.iov_len;
1136        } while( !evbuffer_ptr_set( outbuf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1137    }
1138}
1139
1140void
1141tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1142{
1143    assert( tr_isPeerIo( io ) );
1144    assert( evbuffer_get_length( inbuf )  >= byteCount );
1145
1146    switch( io->encryption_type )
1147    {
1148        case PEER_ENCRYPTION_NONE:
1149            evbuffer_remove( inbuf, bytes, byteCount );
1150            break;
1151
1152        case PEER_ENCRYPTION_RC4:
1153            evbuffer_remove( inbuf, bytes, byteCount );
1154            tr_cryptoDecrypt( &io->crypto, byteCount, bytes, bytes );
1155            break;
1156
1157        default:
1158            assert( 0 );
1159    }
1160}
1161
1162void
1163tr_peerIoReadUint16( tr_peerIo        * io,
1164                     struct evbuffer  * inbuf,
1165                     uint16_t         * setme )
1166{
1167    uint16_t tmp;
1168    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1169    *setme = ntohs( tmp );
1170}
1171
1172void tr_peerIoReadUint32( tr_peerIo        * io,
1173                          struct evbuffer  * inbuf,
1174                          uint32_t         * setme )
1175{
1176    uint32_t tmp;
1177    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1178    *setme = ntohl( tmp );
1179}
1180
1181void
1182tr_peerIoDrain( tr_peerIo       * io,
1183                struct evbuffer * inbuf,
1184                size_t            byteCount )
1185{
1186    char buf[4096];
1187    const size_t buflen = sizeof( buf );
1188
1189    while( byteCount > 0 )
1190    {
1191        const size_t thisPass = MIN( byteCount, buflen );
1192        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1193        byteCount -= thisPass;
1194    }
1195}
1196
1197/***
1198****
1199***/
1200
1201static int
1202tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1203{
1204    int res = 0;
1205
1206    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1207    {
1208        if( io->utp_socket != NULL ) /* utp peer connection */
1209        {
1210            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1211             * It opens up the congestion window by sending an ACK (soonish)
1212             * if one was not going to be sent. */
1213            if( evbuffer_get_length( io->inbuf ) == 0 )
1214                UTP_RBDrained( io->utp_socket );
1215        }
1216        else /* tcp peer connection */
1217        {
1218            int e;
1219
1220            EVUTIL_SET_SOCKET_ERROR( 0 );
1221            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1222            e = EVUTIL_SOCKET_ERROR( );
1223
1224            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?tr_strerror(e):"") );
1225
1226            if( evbuffer_get_length( io->inbuf ) )
1227                canReadWrapper( io );
1228
1229            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1230            {
1231                char errstr[512];
1232                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1233                if( res == 0 )
1234                    what |= BEV_EVENT_EOF;
1235                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)",
1236                        res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
1237                io->gotError( io, what, io->userData );
1238            }
1239        }
1240    }
1241
1242    return res;
1243}
1244
1245static int
1246tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1247{
1248    int n = 0;
1249    const size_t old_len = evbuffer_get_length( io->outbuf );
1250    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1251
1252    if( howmuch > old_len )
1253        howmuch = old_len;
1254
1255    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1256    {
1257        if( io->utp_socket != NULL ) /* utp peer connection */
1258        {
1259            const size_t old_len = evbuffer_get_length( io->outbuf );
1260            UTP_Write( io->utp_socket, howmuch );
1261            n = old_len - evbuffer_get_length( io->outbuf );
1262        }
1263        else
1264        {
1265            int e;
1266
1267            EVUTIL_SET_SOCKET_ERROR( 0 );
1268            n = tr_evbuffer_write( io, io->socket, howmuch );
1269            e = EVUTIL_SOCKET_ERROR( );
1270
1271            if( n > 0 )
1272                didWriteWrapper( io, n );
1273
1274            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1275            {
1276                char errstr[512];
1277                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1278
1279                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)",
1280                        n, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
1281
1282                if( io->gotError != NULL )
1283                    io->gotError( io, what, io->userData );
1284            }
1285        }
1286    }
1287
1288    return n;
1289}
1290
1291int
1292tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1293{
1294    int bytesUsed = 0;
1295
1296    assert( tr_isPeerIo( io ) );
1297    assert( tr_isDirection( dir ) );
1298
1299    if( dir == TR_DOWN )
1300        bytesUsed = tr_peerIoTryRead( io, limit );
1301    else
1302        bytesUsed = tr_peerIoTryWrite( io, limit );
1303
1304    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1305    return bytesUsed;
1306}
1307
1308int
1309tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1310{
1311    size_t byteCount = 0;
1312    const struct tr_datatype * it;
1313
1314    /* count up how many bytes are used by non-piece-data messages
1315       at the front of our outbound queue */
1316    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1317        if( it->isPieceData )
1318            break;
1319        else
1320            byteCount += it->length;
1321
1322    return tr_peerIoFlush( io, TR_UP, byteCount );
1323}
Note: See TracBrowser for help on using the repository browser.