source: trunk/libtransmission/peer-io.c @ 12954

Last change on this file since 12954 was 12954, checked in by jordan, 9 years ago

(trunk) #4490 "Transmission 2.40b1 fails to build: undefined references" -- fixed.

  • Property svn:keywords set to Date Rev Author Id
File size: 34.3 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 12954 2011-10-08 23:53:27Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h>
16
17#include <event2/event.h>
18#include <event2/buffer.h>
19#include <event2/bufferevent.h>
20
21#include "transmission.h"
22#include "session.h"
23#include "bandwidth.h"
24#include "crypto.h"
25#include "net.h"
26#include "peer-common.h" /* MAX_BLOCK_SIZE */
27#include "peer-io.h"
28#include "trevent.h" /* tr_runInEventThread() */
29#include "tr-utp.h"
30#include "utils.h"
31
32
33#ifdef WIN32
34 #define EAGAIN       WSAEWOULDBLOCK
35 #define EINTR        WSAEINTR
36 #define EINPROGRESS  WSAEINPROGRESS
37 #define EPIPE        WSAECONNRESET
38#endif
39
40/* The amount of read bufferring that we allow for uTP sockets. */
41
42#define UTP_READ_BUFFER_SIZE (256 * 1024)
43
44static size_t
45guessPacketOverhead( size_t d )
46{
47    /**
48     * http://sd.wareonearth.com/~phil/net/overhead/
49     *
50     * TCP over Ethernet:
51     * Assuming no header compression (e.g. not PPP)
52     * Add 20 IPv4 header or 40 IPv6 header (no options)
53     * Add 20 TCP header
54     * Add 12 bytes optional TCP timestamps
55     * Max TCP Payload data rates over ethernet are thus:
56     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
57     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
58     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
59     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
60     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
61     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
62     */
63    const double assumed_payload_data_rate = 94.0;
64
65    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
66}
67
68/**
69***
70**/
71
72#define dbgmsg( io, ... ) \
73    do { \
74        if( tr_deepLoggingIsActive( ) ) \
75            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
76    } while( 0 )
77
78/**
79***
80**/
81
82struct tr_datatype
83{
84    struct tr_datatype * next;
85    size_t length;
86    bool isPieceData;
87};
88
89static struct tr_datatype * datatype_pool = NULL;
90
91static const struct tr_datatype TR_DATATYPE_INIT = { NULL, 0, false };
92
93static struct tr_datatype *
94datatype_new( void )
95{
96    struct tr_datatype * ret;
97
98    if( datatype_pool == NULL )
99        ret = tr_new( struct tr_datatype, 1 );
100    else {
101        ret = datatype_pool;
102        datatype_pool = datatype_pool->next;
103    }
104
105    *ret = TR_DATATYPE_INIT;
106    return ret;
107}
108
109static void
110datatype_free( struct tr_datatype * datatype )
111{
112    datatype->next = datatype_pool;
113    datatype_pool = datatype;
114}
115
116static void
117peer_io_pull_datatype( tr_peerIo * io )
118{
119    struct tr_datatype * tmp;
120
121    if(( tmp = io->outbuf_datatypes ))
122    {
123        io->outbuf_datatypes = tmp->next;
124        datatype_free( tmp );
125    }
126}
127
128static void
129peer_io_push_datatype( tr_peerIo * io, struct tr_datatype * datatype )
130{
131    struct tr_datatype * tmp;
132
133    if(( tmp = io->outbuf_datatypes )) {
134        while( tmp->next != NULL )
135            tmp = tmp->next;
136        tmp->next = datatype;
137    } else {
138        io->outbuf_datatypes = datatype;
139    }
140}
141
142/***
143****
144***/
145
146static void
147didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
148{
149     while( bytes_transferred && tr_isPeerIo( io ) )
150     {
151        struct tr_datatype * next = io->outbuf_datatypes;
152
153        const unsigned int payload = MIN( next->length, bytes_transferred );
154        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
155        const unsigned int overhead =
156            io->socket ? guessPacketOverhead( payload ) : 0;
157        const uint64_t now = tr_time_msec( );
158
159        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
160
161        if( overhead > 0 )
162            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
163
164        if( io->didWrite )
165            io->didWrite( io, payload, next->isPieceData, io->userData );
166
167        if( tr_isPeerIo( io ) )
168        {
169            bytes_transferred -= payload;
170            next->length -= payload;
171            if( !next->length )
172                peer_io_pull_datatype( io );
173        }
174    }
175}
176
177static void
178canReadWrapper( tr_peerIo * io )
179{
180    bool err = 0;
181    bool done = 0;
182    tr_session * session;
183
184    dbgmsg( io, "canRead" );
185
186    tr_peerIoRef( io );
187
188    session = io->session;
189
190    /* try to consume the input buffer */
191    if( io->canRead )
192    {
193        const uint64_t now = tr_time_msec( );
194
195        tr_sessionLock( session );
196
197        while( !done && !err )
198        {
199            size_t piece = 0;
200            const size_t oldLen = evbuffer_get_length( io->inbuf );
201            const int ret = io->canRead( io, io->userData, &piece );
202            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
203            const unsigned int overhead = guessPacketOverhead( used );
204
205            if( piece || (piece!=used) )
206            {
207                if( piece )
208                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now );
209
210                if( used != piece )
211                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now );
212            }
213
214            if( overhead > 0 )
215                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
216
217            switch( ret )
218            {
219                case READ_NOW:
220                    if( evbuffer_get_length( io->inbuf ) )
221                        continue;
222                    done = 1;
223                    break;
224
225                case READ_LATER:
226                    done = 1;
227                    break;
228
229                case READ_ERR:
230                    err = 1;
231                    break;
232            }
233
234            assert( tr_isPeerIo( io ) );
235        }
236
237        tr_sessionUnlock( session );
238    }
239
240    tr_peerIoUnref( io );
241}
242
243static void
244event_read_cb( int fd, short event UNUSED, void * vio )
245{
246    int res;
247    int e;
248    tr_peerIo * io = vio;
249
250    /* Limit the input buffer to 256K, so it doesn't grow too large */
251    unsigned int howmuch;
252    unsigned int curlen;
253    const tr_direction dir = TR_DOWN;
254    const unsigned int max = 256 * 1024;
255
256    assert( tr_isPeerIo( io ) );
257    assert( io->socket >= 0 );
258
259    io->pendingEvents &= ~EV_READ;
260
261    curlen = evbuffer_get_length( io->inbuf );
262    howmuch = curlen >= max ? 0 : max - curlen;
263    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
264
265    dbgmsg( io, "libevent says this peer is ready to read" );
266
267    /* if we don't have any bandwidth left, stop reading */
268    if( howmuch < 1 ) {
269        tr_peerIoSetEnabled( io, dir, false );
270        return;
271    }
272
273    EVUTIL_SET_SOCKET_ERROR( 0 );
274    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
275    e = EVUTIL_SOCKET_ERROR( );
276
277    if( res > 0 )
278    {
279        tr_peerIoSetEnabled( io, dir, true );
280
281        /* Invoke the user callback - must always be called last */
282        canReadWrapper( io );
283    }
284    else
285    {
286        char errstr[512];
287        short what = BEV_EVENT_READING;
288
289        if( res == 0 ) /* EOF */
290            what |= BEV_EVENT_EOF;
291        else if( res == -1 ) {
292            if( e == EAGAIN || e == EINTR ) {
293                tr_peerIoSetEnabled( io, dir, true );
294                return;
295            }
296            what |= BEV_EVENT_ERROR;
297        }
298
299        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)",
300                res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
301
302        if( io->gotError != NULL )
303            io->gotError( io, what, io->userData );
304    }
305}
306
307static int
308tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
309{
310    int e;
311    int n;
312    char errstr[256];
313
314    EVUTIL_SET_SOCKET_ERROR( 0 );
315    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
316    e = EVUTIL_SOCKET_ERROR( );
317    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
318
319    return n;
320}
321
322static void
323event_write_cb( int fd, short event UNUSED, void * vio )
324{
325    int res = 0;
326    int e;
327    short what = BEV_EVENT_WRITING;
328    tr_peerIo * io = vio;
329    size_t howmuch;
330    const tr_direction dir = TR_UP;
331    char errstr[1024];
332
333    assert( tr_isPeerIo( io ) );
334    assert( io->socket >= 0 );
335
336    io->pendingEvents &= ~EV_WRITE;
337
338    dbgmsg( io, "libevent says this peer is ready to write" );
339
340    /* Write as much as possible, since the socket is non-blocking, write() will
341     * return if it can't write any more data without blocking */
342    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
343
344    /* if we don't have any bandwidth left, stop writing */
345    if( howmuch < 1 ) {
346        tr_peerIoSetEnabled( io, dir, false );
347        return;
348    }
349
350    EVUTIL_SET_SOCKET_ERROR( 0 );
351    res = tr_evbuffer_write( io, fd, howmuch );
352    e = EVUTIL_SOCKET_ERROR( );
353
354    if (res == -1) {
355        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
356            goto reschedule;
357        /* error case */
358        what |= BEV_EVENT_ERROR;
359    } else if (res == 0) {
360        /* eof case */
361        what |= BEV_EVENT_EOF;
362    }
363    if (res <= 0)
364        goto error;
365
366    if( evbuffer_get_length( io->outbuf ) )
367        tr_peerIoSetEnabled( io, dir, true );
368
369    didWriteWrapper( io, res );
370    return;
371
372 reschedule:
373    if( evbuffer_get_length( io->outbuf ) )
374        tr_peerIoSetEnabled( io, dir, true );
375    return;
376
377 error:
378
379    tr_net_strerror( errstr, sizeof( errstr ), e );
380    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
381
382    if( io->gotError != NULL )
383        io->gotError( io, what, io->userData );
384}
385
386/**
387***
388**/
389
390static void
391maybeSetCongestionAlgorithm( int socket, const char * algorithm )
392{
393    if( algorithm && *algorithm )
394    {
395        const int rc = tr_netSetCongestionControl( socket, algorithm );
396
397        if( rc < 0 )
398            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
399                     algorithm, tr_strerror( errno ));
400    }
401}
402
403#ifdef WITH_UTP
404/* UTP callbacks */
405
406static void
407utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
408{
409    int rc;
410    tr_peerIo *io = closure;
411    assert( tr_isPeerIo( io ) );
412
413    rc = evbuffer_add( io->inbuf, buf, buflen );
414    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
415
416    if( rc < 0 ) {
417        tr_nerr( "UTP", "On read evbuffer_add" );
418        return;
419    }
420
421    tr_peerIoSetEnabled( io, TR_DOWN, true );
422    canReadWrapper( io );
423}
424
425static void
426utp_on_write(void *closure, unsigned char *buf, size_t buflen)
427{
428    int rc;
429    tr_peerIo *io = closure;
430    assert( tr_isPeerIo( io ) );
431
432    rc = evbuffer_remove( io->outbuf, buf, buflen );
433    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
434    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
435    if( rc < (long)buflen ) {
436        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
437    }
438
439    didWriteWrapper( io, buflen );
440}
441
442static size_t
443utp_get_rb_size(void *closure)
444{
445    size_t bytes;
446    tr_peerIo *io = closure;
447    assert( tr_isPeerIo( io ) );
448
449    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
450
451    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
452    return UTP_READ_BUFFER_SIZE - bytes;
453}
454
455static void
456utp_on_state_change(void *closure, int state)
457{
458    tr_peerIo *io = closure;
459    assert( tr_isPeerIo( io ) );
460
461    if( state == UTP_STATE_CONNECT ) {
462        dbgmsg( io, "utp_on_state_change -- changed to connected" );
463        io->utpSupported = true;
464    } else if( state == UTP_STATE_WRITABLE ) {
465        dbgmsg( io, "utp_on_state_change -- changed to writable" );
466    } else if( state == UTP_STATE_EOF ) {
467        if( io->gotError )
468            io->gotError( io, BEV_EVENT_EOF, io->userData );
469    } else if( state == UTP_STATE_DESTROYING ) {
470        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
471        return;
472    } else {
473        tr_nerr( "UTP", "Unknown state %d", state );
474    }
475}
476
477static void
478utp_on_error(void *closure, int errcode)
479{
480    tr_peerIo *io = closure;
481    assert( tr_isPeerIo( io ) );
482
483    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
484
485    if( io->gotError ) {
486        errno = errcode;
487        io->gotError( io, BEV_EVENT_ERROR, io->userData );
488    }
489}
490
491static void
492utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
493{
494    tr_peerIo *io = closure;
495    assert( tr_isPeerIo( io ) );
496
497    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
498
499    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
500                      count, false, tr_time_msec() );
501}
502
503static struct UTPFunctionTable utp_function_table = {
504    .on_read = utp_on_read,
505    .on_write = utp_on_write,
506    .get_rb_size = utp_get_rb_size,
507    .on_state = utp_on_state_change,
508    .on_error = utp_on_error,
509    .on_overhead = utp_on_overhead
510};
511
512
513/* Dummy UTP callbacks. */
514/* We switch a UTP socket to use these after the associated peerIo has been
515   destroyed -- see io_dtor. */
516
517static void
518dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
519{
520    /* This cannot happen, as far as I'm aware. */
521    tr_nerr( "UTP", "On_read called on closed socket" );
522
523}
524
525static void
526dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
527{
528    /* This can very well happen if we've shut down a peer connection that
529       had unflushed buffers.  Complain and send zeroes. */
530    tr_ndbg( "UTP", "On_write called on closed socket" );
531    memset( buf, 0, buflen );
532}
533
534static size_t
535dummy_get_rb_size( void * closure UNUSED )
536{
537    return 0;
538}
539
540static void
541dummy_on_state_change(void * closure UNUSED, int state UNUSED )
542{
543    return;
544}
545
546static void
547dummy_on_error( void * closure UNUSED, int errcode UNUSED )
548{
549    return;
550}
551
552static void
553dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
554{
555    return;
556}
557
558static struct UTPFunctionTable dummy_utp_function_table = {
559    .on_read = dummy_read,
560    .on_write = dummy_write,
561    .get_rb_size = dummy_get_rb_size,
562    .on_state = dummy_on_state_change,
563    .on_error = dummy_on_error,
564    .on_overhead = dummy_on_overhead
565};
566
567#endif /* #ifdef WITH_UTP */
568
569static tr_peerIo*
570tr_peerIoNew( tr_session       * session,
571              tr_bandwidth     * parent,
572              const tr_address * addr,
573              tr_port            port,
574              const uint8_t    * torrentHash,
575              bool               isIncoming,
576              bool               isSeed,
577              int                socket,
578              struct UTPSocket * utp_socket)
579{
580    tr_peerIo * io;
581
582    assert( session != NULL );
583    assert( session->events != NULL );
584    assert( tr_isBool( isIncoming ) );
585    assert( tr_isBool( isSeed ) );
586    assert( tr_amInEventThread( session ) );
587    assert( (socket < 0) == (utp_socket != NULL) );
588#ifndef WITH_UTP
589    assert( socket >= 0 );
590#endif
591
592    if( socket >= 0 ) {
593        tr_netSetTOS( socket, session->peerSocketTOS );
594        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
595    }
596
597    io = tr_new0( tr_peerIo, 1 );
598    io->magicNumber = PEER_IO_MAGIC_NUMBER;
599    io->refCount = 1;
600    tr_cryptoConstruct( &io->crypto, torrentHash, isIncoming );
601    io->session = session;
602    io->addr = *addr;
603    io->isSeed = isSeed;
604    io->port = port;
605    io->socket = socket;
606    io->utp_socket = utp_socket;
607    io->isIncoming = isIncoming != 0;
608    io->timeCreated = tr_time( );
609    io->inbuf = evbuffer_new( );
610    io->outbuf = evbuffer_new( );
611    tr_bandwidthConstruct( &io->bandwidth, session, parent );
612    tr_bandwidthSetPeer( &io->bandwidth, io );
613    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
614    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
615
616    if( io->socket >= 0 ) {
617        io->event_read = event_new( session->event_base,
618                                    io->socket, EV_READ, event_read_cb, io );
619        io->event_write = event_new( session->event_base,
620                                     io->socket, EV_WRITE, event_write_cb, io );
621    }
622#ifdef WITH_UTP
623    else {
624        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
625        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
626        UTP_SetCallbacks( utp_socket,
627                          &utp_function_table,
628                          io );
629        if( !isIncoming ) {
630            dbgmsg( io, "%s", "calling UTP_Connect" );
631            UTP_Connect( utp_socket );
632        }
633    }
634#endif
635
636    return io;
637}
638
639tr_peerIo*
640tr_peerIoNewIncoming( tr_session        * session,
641                      tr_bandwidth      * parent,
642                      const tr_address  * addr,
643                      tr_port             port,
644                      int                 fd,
645                      struct UTPSocket  * utp_socket )
646{
647    assert( session );
648    assert( tr_address_is_valid( addr ) );
649
650    return tr_peerIoNew( session, parent, addr, port, NULL, true, false,
651                         fd, utp_socket );
652}
653
654tr_peerIo*
655tr_peerIoNewOutgoing( tr_session        * session,
656                      tr_bandwidth      * parent,
657                      const tr_address  * addr,
658                      tr_port             port,
659                      const uint8_t     * torrentHash,
660                      bool                isSeed,
661                      bool                utp )
662{
663    int fd = -1;
664    struct UTPSocket * utp_socket = NULL;
665
666    assert( session );
667    assert( tr_address_is_valid( addr ) );
668    assert( torrentHash );
669
670    if( utp )
671        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
672
673    if( !utp_socket ) {
674        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
675        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
676    }
677
678    if( fd < 0 && utp_socket == NULL )
679        return NULL;
680
681    return tr_peerIoNew( session, parent, addr, port,
682                         torrentHash, false, isSeed, fd, utp_socket );
683}
684
685/***
686****
687***/
688
689static void
690event_enable( tr_peerIo * io, short event )
691{
692    assert( tr_amInEventThread( io->session ) );
693    assert( io->session != NULL );
694    assert( io->session->events != NULL );
695
696    if( io->socket < 0 )
697        return;
698
699    assert( io->session->events != NULL );
700    assert( event_initialized( io->event_read ) );
701    assert( event_initialized( io->event_write ) );
702
703    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
704    {
705        dbgmsg( io, "enabling libevent ready-to-read polling" );
706        event_add( io->event_read, NULL );
707        io->pendingEvents |= EV_READ;
708    }
709
710    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
711    {
712        dbgmsg( io, "enabling libevent ready-to-write polling" );
713        event_add( io->event_write, NULL );
714        io->pendingEvents |= EV_WRITE;
715    }
716}
717
718static void
719event_disable( struct tr_peerIo * io, short event )
720{
721    assert( tr_amInEventThread( io->session ) );
722    assert( io->session != NULL );
723
724    if( io->socket < 0 )
725        return;
726
727    assert( io->session->events != NULL );
728    assert( event_initialized( io->event_read ) );
729    assert( event_initialized( io->event_write ) );
730
731    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
732    {
733        dbgmsg( io, "disabling libevent ready-to-read polling" );
734        event_del( io->event_read );
735        io->pendingEvents &= ~EV_READ;
736    }
737
738    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
739    {
740        dbgmsg( io, "disabling libevent ready-to-write polling" );
741        event_del( io->event_write );
742        io->pendingEvents &= ~EV_WRITE;
743    }
744}
745
746void
747tr_peerIoSetEnabled( tr_peerIo    * io,
748                     tr_direction   dir,
749                     bool           isEnabled )
750{
751    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
752
753    assert( tr_isPeerIo( io ) );
754    assert( tr_isDirection( dir ) );
755    assert( tr_amInEventThread( io->session ) );
756    assert( io->session->events != NULL );
757
758    if( isEnabled )
759        event_enable( io, event );
760    else
761        event_disable( io, event );
762}
763
764/***
765****
766***/
767static void
768io_close_socket( tr_peerIo * io )
769{
770    if( io->socket >= 0 ) {
771        tr_netClose( io->session, io->socket );
772        io->socket = -1;
773    }
774
775    if( io->event_read != NULL) {
776        event_free( io->event_read );
777        io->event_read = NULL;
778    }
779
780    if( io->event_write != NULL) {
781        event_free( io->event_write );
782        io->event_write = NULL;
783    }
784
785#ifdef WITH_UTP
786    if( io->utp_socket ) {
787        UTP_SetCallbacks( io->utp_socket,
788                          &dummy_utp_function_table,
789                          NULL );
790        UTP_Close( io->utp_socket );
791
792        io->utp_socket = NULL;
793    }
794#endif
795}
796
797static void
798io_dtor( void * vio )
799{
800    tr_peerIo * io = vio;
801
802    assert( tr_isPeerIo( io ) );
803    assert( tr_amInEventThread( io->session ) );
804    assert( io->session->events != NULL );
805
806    dbgmsg( io, "in tr_peerIo destructor" );
807    event_disable( io, EV_READ | EV_WRITE );
808    tr_bandwidthDestruct( &io->bandwidth );
809    evbuffer_free( io->outbuf );
810    evbuffer_free( io->inbuf );
811    io_close_socket( io );
812    tr_cryptoDestruct( &io->crypto );
813
814    while( io->outbuf_datatypes != NULL )
815        peer_io_pull_datatype( io );
816
817    memset( io, ~0, sizeof( tr_peerIo ) );
818    tr_free( io );
819}
820
821static void
822tr_peerIoFree( tr_peerIo * io )
823{
824    if( io )
825    {
826        dbgmsg( io, "in tr_peerIoFree" );
827        io->canRead = NULL;
828        io->didWrite = NULL;
829        io->gotError = NULL;
830        tr_runInEventThread( io->session, io_dtor, io );
831    }
832}
833
834void
835tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
836{
837    assert( tr_isPeerIo( io ) );
838
839    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
840                file, line, io->refCount, io->refCount+1 );
841
842    ++io->refCount;
843}
844
845void
846tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
847{
848    assert( tr_isPeerIo( io ) );
849
850    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
851                file, line, io->refCount, io->refCount-1 );
852
853    if( !--io->refCount )
854        tr_peerIoFree( io );
855}
856
857const tr_address*
858tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
859{
860    assert( tr_isPeerIo( io ) );
861
862    if( port )
863        *port = io->port;
864
865    return &io->addr;
866}
867
868const char*
869tr_peerIoAddrStr( const tr_address * addr, tr_port port )
870{
871    static char buf[512];
872    tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_address_to_string( addr ), ntohs( port ) );
873    return buf;
874}
875
876const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
877{
878    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
879}
880
881void
882tr_peerIoSetIOFuncs( tr_peerIo        * io,
883                     tr_can_read_cb     readcb,
884                     tr_did_write_cb    writecb,
885                     tr_net_error_cb    errcb,
886                     void             * userData )
887{
888    io->canRead = readcb;
889    io->didWrite = writecb;
890    io->gotError = errcb;
891    io->userData = userData;
892}
893
894void
895tr_peerIoClear( tr_peerIo * io )
896{
897    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
898    tr_peerIoSetEnabled( io, TR_UP, false );
899    tr_peerIoSetEnabled( io, TR_DOWN, false );
900}
901
902int
903tr_peerIoReconnect( tr_peerIo * io )
904{
905    short int pendingEvents;
906    tr_session * session;
907
908    assert( tr_isPeerIo( io ) );
909    assert( !tr_peerIoIsIncoming( io ) );
910
911    session = tr_peerIoGetSession( io );
912
913    pendingEvents = io->pendingEvents;
914    event_disable( io, EV_READ | EV_WRITE );
915
916    io_close_socket( io );
917
918    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
919    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
920    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
921
922    if( io->socket >= 0 )
923    {
924        event_enable( io, pendingEvents );
925        tr_netSetTOS( io->socket, session->peerSocketTOS );
926        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
927        return 0;
928    }
929
930    return -1;
931}
932
933/**
934***
935**/
936
937void
938tr_peerIoSetTorrentHash( tr_peerIo *     io,
939                         const uint8_t * hash )
940{
941    assert( tr_isPeerIo( io ) );
942
943    tr_cryptoSetTorrentHash( &io->crypto, hash );
944}
945
946const uint8_t*
947tr_peerIoGetTorrentHash( tr_peerIo * io )
948{
949    assert( tr_isPeerIo( io ) );
950
951    return tr_cryptoGetTorrentHash( &io->crypto );
952}
953
954int
955tr_peerIoHasTorrentHash( const tr_peerIo * io )
956{
957    assert( tr_isPeerIo( io ) );
958
959    return tr_cryptoHasTorrentHash( &io->crypto );
960}
961
962/**
963***
964**/
965
966void
967tr_peerIoSetPeersId( tr_peerIo * io, const uint8_t * peer_id )
968{
969    assert( tr_isPeerIo( io ) );
970
971    if( ( io->peerIdIsSet = peer_id != NULL ) )
972        memcpy( io->peerId, peer_id, 20 );
973    else
974        memset( io->peerId, 0, 20 );
975}
976
977/**
978***
979**/
980
981static unsigned int
982getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
983{
984    /* this is all kind of arbitrary, but what seems to work well is
985     * being large enough to hold the next 20 seconds' worth of input,
986     * or a few blocks, whichever is bigger.
987     * It's okay to tweak this as needed */
988    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
989    const unsigned int period = 15u; /* arbitrary */
990    /* the 3 is arbitrary; the .5 is to leave room for messages */
991    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
992    return MAX( ceiling, currentSpeed_Bps*period );
993}
994
995size_t
996tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
997{
998    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
999    const size_t currentLen = evbuffer_get_length( io->outbuf );
1000    size_t freeSpace = 0;
1001
1002    if( desiredLen > currentLen )
1003        freeSpace = desiredLen - currentLen;
1004
1005    return freeSpace;
1006}
1007
1008/**
1009***
1010**/
1011
1012void
1013tr_peerIoSetEncryption( tr_peerIo * io, tr_encryption_type encryption_type )
1014{
1015    assert( tr_isPeerIo( io ) );
1016    assert( encryption_type == PEER_ENCRYPTION_NONE
1017         || encryption_type == PEER_ENCRYPTION_RC4 );
1018
1019    io->encryption_type = encryption_type;
1020}
1021
1022/**
1023***
1024**/
1025
1026static void
1027addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData )
1028{
1029    struct tr_datatype * d;
1030    d = datatype_new( );
1031    d->isPieceData = isPieceData != 0;
1032    d->length = byteCount;
1033    peer_io_push_datatype( io, d );
1034}
1035
1036static void
1037maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
1038{
1039    if( io->encryption_type == PEER_ENCRYPTION_RC4 )
1040    {
1041        struct evbuffer_ptr pos;
1042        struct evbuffer_iovec iovec;
1043        evbuffer_ptr_set( buf, &pos, 0, EVBUFFER_PTR_SET );
1044        do {
1045            evbuffer_peek( buf, -1, &pos, &iovec, 1 );
1046            tr_cryptoEncrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1047        } while( !evbuffer_ptr_set( buf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1048    }
1049}
1050
1051void
1052tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData )
1053{
1054    const size_t byteCount = evbuffer_get_length( buf );
1055    maybeEncryptBuffer( io, buf );
1056    evbuffer_add_buffer( io->outbuf, buf );
1057    addDatatype( io, byteCount, isPieceData );
1058}
1059
1060void
1061tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData )
1062{
1063    struct evbuffer_iovec iovec;
1064    evbuffer_reserve_space( io->outbuf, byteCount, &iovec, 1 );
1065
1066    iovec.iov_len = byteCount;
1067    if( io->encryption_type == PEER_ENCRYPTION_RC4 )
1068        tr_cryptoEncrypt( &io->crypto, iovec.iov_len, bytes, iovec.iov_base );
1069    else
1070        memcpy( iovec.iov_base, bytes, iovec.iov_len );
1071    evbuffer_commit_space( io->outbuf, &iovec, 1 );
1072
1073    addDatatype( io, byteCount, isPieceData );
1074}
1075
1076/***
1077****
1078***/
1079
1080void
1081evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1082{
1083    evbuffer_add( outbuf, &byte, 1 );
1084}
1085
1086void
1087evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1088{
1089    const uint16_t ns = htons( addme_hs );
1090    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1091}
1092
1093void
1094evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1095{
1096    const uint32_t nl = htonl( addme_hl );
1097    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1098}
1099
1100void
1101evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll )
1102{
1103    const uint64_t nll = tr_htonll( addme_hll );
1104    evbuffer_add( outbuf, &nll, sizeof( nll ) );
1105}
1106
1107/***
1108****
1109***/
1110
1111void
1112tr_peerIoReadBytesToBuf( tr_peerIo * io, struct evbuffer * inbuf, struct evbuffer * outbuf, size_t byteCount )
1113{
1114    struct evbuffer * tmp;
1115    const size_t old_length = evbuffer_get_length( outbuf );
1116
1117    assert( tr_isPeerIo( io ) );
1118    assert( evbuffer_get_length( inbuf ) >= byteCount );
1119
1120    /* append it to outbuf */
1121    tmp = evbuffer_new( );
1122    evbuffer_remove_buffer( inbuf, tmp, byteCount );
1123    evbuffer_add_buffer( outbuf, tmp );
1124    evbuffer_free( tmp );
1125
1126    /* decrypt if needed */
1127    if( io->encryption_type == PEER_ENCRYPTION_RC4 ) {
1128        struct evbuffer_ptr pos;
1129        struct evbuffer_iovec iovec;
1130        evbuffer_ptr_set( outbuf, &pos, old_length, EVBUFFER_PTR_SET );
1131        do {
1132            evbuffer_peek( outbuf, byteCount, &pos, &iovec, 1 );
1133            tr_cryptoDecrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1134            byteCount -= iovec.iov_len;
1135        } while( !evbuffer_ptr_set( outbuf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1136    }
1137}
1138
1139void
1140tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1141{
1142    assert( tr_isPeerIo( io ) );
1143    assert( evbuffer_get_length( inbuf )  >= byteCount );
1144
1145    switch( io->encryption_type )
1146    {
1147        case PEER_ENCRYPTION_NONE:
1148            evbuffer_remove( inbuf, bytes, byteCount );
1149            break;
1150
1151        case PEER_ENCRYPTION_RC4:
1152            evbuffer_remove( inbuf, bytes, byteCount );
1153            tr_cryptoDecrypt( &io->crypto, byteCount, bytes, bytes );
1154            break;
1155
1156        default:
1157            assert( 0 );
1158    }
1159}
1160
1161void
1162tr_peerIoReadUint16( tr_peerIo        * io,
1163                     struct evbuffer  * inbuf,
1164                     uint16_t         * setme )
1165{
1166    uint16_t tmp;
1167    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1168    *setme = ntohs( tmp );
1169}
1170
1171void tr_peerIoReadUint32( tr_peerIo        * io,
1172                          struct evbuffer  * inbuf,
1173                          uint32_t         * setme )
1174{
1175    uint32_t tmp;
1176    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1177    *setme = ntohl( tmp );
1178}
1179
1180void
1181tr_peerIoDrain( tr_peerIo       * io,
1182                struct evbuffer * inbuf,
1183                size_t            byteCount )
1184{
1185    char buf[4096];
1186    const size_t buflen = sizeof( buf );
1187
1188    while( byteCount > 0 )
1189    {
1190        const size_t thisPass = MIN( byteCount, buflen );
1191        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1192        byteCount -= thisPass;
1193    }
1194}
1195
1196/***
1197****
1198***/
1199
1200static int
1201tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1202{
1203    int res = 0;
1204
1205    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1206    {
1207        if( io->utp_socket != NULL ) /* utp peer connection */
1208        {
1209            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1210             * It opens up the congestion window by sending an ACK (soonish)
1211             * if one was not going to be sent. */
1212            if( evbuffer_get_length( io->inbuf ) == 0 )
1213                UTP_RBDrained( io->utp_socket );
1214        }
1215        else /* tcp peer connection */
1216        {
1217            int e;
1218
1219            EVUTIL_SET_SOCKET_ERROR( 0 );
1220            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1221            e = EVUTIL_SOCKET_ERROR( );
1222
1223            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?tr_strerror(e):"") );
1224
1225            if( evbuffer_get_length( io->inbuf ) )
1226                canReadWrapper( io );
1227
1228            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1229            {
1230                char errstr[512];
1231                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1232                if( res == 0 )
1233                    what |= BEV_EVENT_EOF;
1234                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)",
1235                        res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
1236                io->gotError( io, what, io->userData );
1237            }
1238        }
1239    }
1240
1241    return res;
1242}
1243
1244static int
1245tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1246{
1247    int n = 0;
1248    const size_t old_len = evbuffer_get_length( io->outbuf );
1249    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1250
1251    if( howmuch > old_len )
1252        howmuch = old_len;
1253
1254    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1255    {
1256        if( io->utp_socket != NULL ) /* utp peer connection */
1257        {
1258            const size_t old_len = evbuffer_get_length( io->outbuf );
1259            UTP_Write( io->utp_socket, howmuch );
1260            n = old_len - evbuffer_get_length( io->outbuf );
1261        }
1262        else
1263        {
1264            int e;
1265
1266            EVUTIL_SET_SOCKET_ERROR( 0 );
1267            n = tr_evbuffer_write( io, io->socket, howmuch );
1268            e = EVUTIL_SOCKET_ERROR( );
1269
1270            if( n > 0 )
1271                didWriteWrapper( io, n );
1272
1273            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1274            {
1275                char errstr[512];
1276                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1277
1278                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)",
1279                        n, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
1280
1281                if( io->gotError != NULL )
1282                    io->gotError( io, what, io->userData );
1283            }
1284        }
1285    }
1286
1287    return n;
1288}
1289
1290int
1291tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1292{
1293    int bytesUsed = 0;
1294
1295    assert( tr_isPeerIo( io ) );
1296    assert( tr_isDirection( dir ) );
1297
1298    if( dir == TR_DOWN )
1299        bytesUsed = tr_peerIoTryRead( io, limit );
1300    else
1301        bytesUsed = tr_peerIoTryWrite( io, limit );
1302
1303    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1304    return bytesUsed;
1305}
1306
1307int
1308tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1309{
1310    size_t byteCount = 0;
1311    const struct tr_datatype * it;
1312
1313    /* count up how many bytes are used by non-piece-data messages
1314       at the front of our outbound queue */
1315    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1316        if( it->isPieceData )
1317            break;
1318        else
1319            byteCount += it->length;
1320
1321    return tr_peerIoFlush( io, TR_UP, byteCount );
1322}
Note: See TracBrowser for help on using the repository browser.