source: trunk/libtransmission/peer-io.c @ 12204

Last change on this file since 12204 was 12204, checked in by jordan, 11 years ago

(trunk) #4138 "use stdbool.h instead of tr_bool" -- done.

  • Property svn:keywords set to Date Rev Author Id
File size: 32.6 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 12204 2011-03-22 15:19:54Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28#include <libutp/utp.h>
29
30#include "transmission.h"
31#include "session.h"
32#include "bandwidth.h"
33#include "crypto.h"
34#include "list.h"
35#include "net.h"
36#include "peer-common.h" /* MAX_BLOCK_SIZE */
37#include "peer-io.h"
38#include "trevent.h" /* tr_runInEventThread() */
39#include "utils.h"
40
41
42#ifdef WIN32
43 #define EAGAIN       WSAEWOULDBLOCK
44 #define EINTR        WSAEINTR
45 #define EINPROGRESS  WSAEINPROGRESS
46 #define EPIPE        WSAECONNRESET
47#endif
48
49/* The amount of read bufferring that we allow for uTP sockets. */
50
51#define UTP_READ_BUFFER_SIZE (256 * 1024)
52
53static size_t
54guessPacketOverhead( size_t d )
55{
56    /**
57     * http://sd.wareonearth.com/~phil/net/overhead/
58     *
59     * TCP over Ethernet:
60     * Assuming no header compression (e.g. not PPP)
61     * Add 20 IPv4 header or 40 IPv6 header (no options)
62     * Add 20 TCP header
63     * Add 12 bytes optional TCP timestamps
64     * Max TCP Payload data rates over ethernet are thus:
65     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
66     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
67     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
68     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
69     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
70     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
71     */
72    const double assumed_payload_data_rate = 94.0;
73
74    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
75}
76
77/**
78***
79**/
80
81#define dbgmsg( io, ... ) \
82    do { \
83        if( tr_deepLoggingIsActive( ) ) \
84            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
85    } while( 0 )
86
87struct tr_datatype
88{
89    bool    isPieceData;
90    size_t  length;
91};
92
93
94/***
95****
96***/
97
98static void
99didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
100{
101     while( bytes_transferred && tr_isPeerIo( io ) )
102     {
103        struct tr_datatype * next = io->outbuf_datatypes->data;
104
105        const unsigned int payload = MIN( next->length, bytes_transferred );
106        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
107        const unsigned int overhead =
108            io->socket ? guessPacketOverhead( payload ) : 0;
109        const uint64_t now = tr_time_msec( );
110
111        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
112
113        if( overhead > 0 )
114            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
115
116        if( io->didWrite )
117            io->didWrite( io, payload, next->isPieceData, io->userData );
118
119        if( tr_isPeerIo( io ) )
120        {
121            bytes_transferred -= payload;
122            next->length -= payload;
123            if( !next->length ) {
124                tr_list_pop_front( &io->outbuf_datatypes );
125                tr_free( next );
126            }
127        }
128    }
129}
130
131static void
132canReadWrapper( tr_peerIo * io )
133{
134    bool err = 0;
135    bool done = 0;
136    tr_session * session;
137
138    dbgmsg( io, "canRead" );
139
140    tr_peerIoRef( io );
141
142    session = io->session;
143
144    /* try to consume the input buffer */
145    if( io->canRead )
146    {
147        const uint64_t now = tr_time_msec( );
148
149        tr_sessionLock( session );
150
151        while( !done && !err )
152        {
153            size_t piece = 0;
154            const size_t oldLen = evbuffer_get_length( io->inbuf );
155            const int ret = io->canRead( io, io->userData, &piece );
156            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
157            const unsigned int overhead = guessPacketOverhead( used );
158
159            if( piece || (piece!=used) )
160            {
161                if( piece )
162                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now );
163
164                if( used != piece )
165                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now );
166            }
167
168            if( overhead > 0 )
169                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
170
171            switch( ret )
172            {
173                case READ_NOW:
174                    if( evbuffer_get_length( io->inbuf ) )
175                        continue;
176                    done = 1;
177                    break;
178
179                case READ_LATER:
180                    done = 1;
181                    break;
182
183                case READ_ERR:
184                    err = 1;
185                    break;
186            }
187
188            assert( tr_isPeerIo( io ) );
189        }
190
191        tr_sessionUnlock( session );
192    }
193
194    tr_peerIoUnref( io );
195}
196
197static void
198event_read_cb( int fd, short event UNUSED, void * vio )
199{
200    int res;
201    int e;
202    tr_peerIo * io = vio;
203
204    /* Limit the input buffer to 256K, so it doesn't grow too large */
205    unsigned int howmuch;
206    unsigned int curlen;
207    const tr_direction dir = TR_DOWN;
208    const unsigned int max = 256 * 1024;
209
210    assert( tr_isPeerIo( io ) );
211    assert( io->socket >= 0 );
212
213    io->pendingEvents &= ~EV_READ;
214
215    curlen = evbuffer_get_length( io->inbuf );
216    howmuch = curlen >= max ? 0 : max - curlen;
217    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
218
219    dbgmsg( io, "libevent says this peer is ready to read" );
220
221    /* if we don't have any bandwidth left, stop reading */
222    if( howmuch < 1 ) {
223        tr_peerIoSetEnabled( io, dir, false );
224        return;
225    }
226
227    EVUTIL_SET_SOCKET_ERROR( 0 );
228    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
229    e = EVUTIL_SOCKET_ERROR( );
230
231    if( res > 0 )
232    {
233        tr_peerIoSetEnabled( io, dir, true );
234
235        /* Invoke the user callback - must always be called last */
236        canReadWrapper( io );
237    }
238    else
239    {
240        char errstr[512];
241        short what = BEV_EVENT_READING;
242
243        if( res == 0 ) /* EOF */
244            what |= BEV_EVENT_EOF;
245        else if( res == -1 ) {
246            if( e == EAGAIN || e == EINTR ) {
247                tr_peerIoSetEnabled( io, dir, true );
248                return;
249            }
250            what |= BEV_EVENT_ERROR;
251        }
252
253        tr_net_strerror( errstr, sizeof( errstr ), e );
254        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
255
256        if( io->gotError != NULL )
257            io->gotError( io, what, io->userData );
258    }
259}
260
261static int
262tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
263{
264    int e;
265    int n;
266    char errstr[256];
267
268    EVUTIL_SET_SOCKET_ERROR( 0 );
269    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
270    e = EVUTIL_SOCKET_ERROR( );
271    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
272
273    return n;
274}
275
276static void
277event_write_cb( int fd, short event UNUSED, void * vio )
278{
279    int res = 0;
280    int e;
281    short what = BEV_EVENT_WRITING;
282    tr_peerIo * io = vio;
283    size_t howmuch;
284    const tr_direction dir = TR_UP;
285
286    assert( tr_isPeerIo( io ) );
287    assert( io->socket >= 0 );
288
289    io->pendingEvents &= ~EV_WRITE;
290
291    dbgmsg( io, "libevent says this peer is ready to write" );
292
293    /* Write as much as possible, since the socket is non-blocking, write() will
294     * return if it can't write any more data without blocking */
295    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
296
297    /* if we don't have any bandwidth left, stop writing */
298    if( howmuch < 1 ) {
299        tr_peerIoSetEnabled( io, dir, false );
300        return;
301    }
302
303    EVUTIL_SET_SOCKET_ERROR( 0 );
304    res = tr_evbuffer_write( io, fd, howmuch );
305    e = EVUTIL_SOCKET_ERROR( );
306
307    if (res == -1) {
308        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
309            goto reschedule;
310        /* error case */
311        what |= BEV_EVENT_ERROR;
312    } else if (res == 0) {
313        /* eof case */
314        what |= BEV_EVENT_EOF;
315    }
316    if (res <= 0)
317        goto error;
318
319    if( evbuffer_get_length( io->outbuf ) )
320        tr_peerIoSetEnabled( io, dir, true );
321
322    didWriteWrapper( io, res );
323    return;
324
325 reschedule:
326    if( evbuffer_get_length( io->outbuf ) )
327        tr_peerIoSetEnabled( io, dir, true );
328    return;
329
330 error:
331
332    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
333
334    if( io->gotError != NULL )
335        io->gotError( io, what, io->userData );
336}
337
338/**
339***
340**/
341
342static void
343maybeSetCongestionAlgorithm( int socket, const char * algorithm )
344{
345    if( algorithm && *algorithm )
346    {
347        const int rc = tr_netSetCongestionControl( socket, algorithm );
348
349        if( rc < 0 )
350            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
351                     algorithm, tr_strerror( errno ));
352    }
353}
354
355#ifdef WITH_UTP
356/* UTP callbacks */
357
358static void
359utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
360{
361    int rc;
362    tr_peerIo *io = closure;
363    assert( tr_isPeerIo( io ) );
364
365    rc = evbuffer_add( io->inbuf, buf, buflen );
366    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
367
368    if( rc < 0 ) {
369        tr_nerr( "UTP", "On read evbuffer_add" );
370        return;
371    }
372
373    tr_peerIoSetEnabled( io, TR_DOWN, true );
374    canReadWrapper( io );
375}
376
377static void
378utp_on_write(void *closure, unsigned char *buf, size_t buflen)
379{
380    int rc;
381    tr_peerIo *io = closure;
382    assert( tr_isPeerIo( io ) );
383
384    rc = evbuffer_remove( io->outbuf, buf, buflen );
385    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
386    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
387    if( rc < (long)buflen ) {
388        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
389    }
390
391    didWriteWrapper( io, buflen );
392}
393
394static size_t
395utp_get_rb_size(void *closure)
396{
397    size_t bytes;
398    tr_peerIo *io = closure;
399    assert( tr_isPeerIo( io ) );
400
401    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
402
403    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
404    return UTP_READ_BUFFER_SIZE - bytes;
405}
406
407static void
408utp_on_state_change(void *closure, int state)
409{
410    tr_peerIo *io = closure;
411    assert( tr_isPeerIo( io ) );
412
413    if( state == UTP_STATE_CONNECT ) {
414        dbgmsg( io, "utp_on_state_change -- changed to connected" );
415        io->utpSupported = true;
416    } else if( state == UTP_STATE_WRITABLE ) {
417        dbgmsg( io, "utp_on_state_change -- changed to writable" );
418    } else if( state == UTP_STATE_EOF ) {
419        if( io->gotError )
420            io->gotError( io, BEV_EVENT_EOF, io->userData );
421    } else if( state == UTP_STATE_DESTROYING ) {
422        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
423        return;
424    } else {
425        tr_nerr( "UTP", "Unknown state %d", state );
426    }
427}
428
429static void
430utp_on_error(void *closure, int errcode)
431{
432    tr_peerIo *io = closure;
433    assert( tr_isPeerIo( io ) );
434
435    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
436
437    if( io->gotError ) {
438        errno = errcode;
439        io->gotError( io, BEV_EVENT_ERROR, io->userData );
440    }
441}
442
443static void
444utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
445{
446    tr_peerIo *io = closure;
447    assert( tr_isPeerIo( io ) );
448
449    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
450
451    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
452                      count, false, tr_time_msec() );
453}
454
455static struct UTPFunctionTable utp_function_table = {
456    .on_read = utp_on_read,
457    .on_write = utp_on_write,
458    .get_rb_size = utp_get_rb_size,
459    .on_state = utp_on_state_change,
460    .on_error = utp_on_error,
461    .on_overhead = utp_on_overhead
462};
463
464
465/* Dummy UTP callbacks. */
466/* We switch a UTP socket to use these after the associated peerIo has been
467   destroyed -- see io_dtor. */
468
469static void
470dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
471{
472    /* This cannot happen, as far as I'm aware. */
473    tr_nerr( "UTP", "On_read called on closed socket" );
474
475}
476
477static void
478dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
479{
480    /* This can very well happen if we've shut down a peer connection that
481       had unflushed buffers.  Complain and send zeroes. */
482    tr_ndbg( "UTP", "On_write called on closed socket" );
483    memset( buf, 0, buflen );
484}
485
486static size_t
487dummy_get_rb_size( void * closure UNUSED )
488{
489    return 0;
490}
491
492static void
493dummy_on_state_change(void * closure UNUSED, int state UNUSED )
494{
495    return;
496}
497
498static void
499dummy_on_error( void * closure UNUSED, int errcode UNUSED )
500{
501    return;
502}
503
504static void
505dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
506{
507    return;
508}
509
510static struct UTPFunctionTable dummy_utp_function_table = {
511    .on_read = dummy_read,
512    .on_write = dummy_write,
513    .get_rb_size = dummy_get_rb_size,
514    .on_state = dummy_on_state_change,
515    .on_error = dummy_on_error,
516    .on_overhead = dummy_on_overhead
517};
518
519#endif /* #ifdef WITH_UTP */
520
521static tr_peerIo*
522tr_peerIoNew( tr_session       * session,
523              tr_bandwidth     * parent,
524              const tr_address * addr,
525              tr_port            port,
526              const uint8_t    * torrentHash,
527              bool               isIncoming,
528              bool               isSeed,
529              int                socket,
530              struct UTPSocket * utp_socket)
531{
532    tr_peerIo * io;
533
534    assert( session != NULL );
535    assert( session->events != NULL );
536    assert( tr_isBool( isIncoming ) );
537    assert( tr_isBool( isSeed ) );
538    assert( tr_amInEventThread( session ) );
539    assert( (socket < 0) == (utp_socket != NULL) );
540#ifndef WITH_UTP
541    assert( socket >= 0 );
542#endif
543
544    if( socket >= 0 ) {
545        tr_netSetTOS( socket, session->peerSocketTOS );
546        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
547    }
548
549    io = tr_new0( tr_peerIo, 1 );
550    io->magicNumber = PEER_IO_MAGIC_NUMBER;
551    io->refCount = 1;
552    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
553    io->session = session;
554    io->addr = *addr;
555    io->isSeed = isSeed;
556    io->port = port;
557    io->socket = socket;
558    io->utp_socket = utp_socket;
559    io->isIncoming = isIncoming != 0;
560    io->timeCreated = tr_time( );
561    io->inbuf = evbuffer_new( );
562    io->outbuf = evbuffer_new( );
563    tr_bandwidthConstruct( &io->bandwidth, session, parent );
564    tr_bandwidthSetPeer( &io->bandwidth, io );
565    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
566    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
567
568    if( io->socket >= 0 ) {
569        io->event_read = event_new( session->event_base,
570                                    io->socket, EV_READ, event_read_cb, io );
571        io->event_write = event_new( session->event_base,
572                                     io->socket, EV_WRITE, event_write_cb, io );
573    }
574#ifdef WITH_UTP
575    else {
576        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
577        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
578        UTP_SetCallbacks( utp_socket,
579                          &utp_function_table,
580                          io );
581        if( !isIncoming ) {
582            dbgmsg( io, "%s", "calling UTP_Connect" );
583            UTP_Connect( utp_socket );
584        }
585    }
586#endif
587
588    return io;
589}
590
591tr_peerIo*
592tr_peerIoNewIncoming( tr_session        * session,
593                      tr_bandwidth      * parent,
594                      const tr_address  * addr,
595                      tr_port             port,
596                      int                 fd,
597                      struct UTPSocket  * utp_socket )
598{
599    assert( session );
600    assert( tr_isAddress( addr ) );
601
602    return tr_peerIoNew( session, parent, addr, port, NULL, true, false,
603                         fd, utp_socket );
604}
605
606tr_peerIo*
607tr_peerIoNewOutgoing( tr_session        * session,
608                      tr_bandwidth      * parent,
609                      const tr_address  * addr,
610                      tr_port             port,
611                      const uint8_t     * torrentHash,
612                      bool                isSeed,
613                      bool                utp )
614{
615    int fd = -1;
616    struct UTPSocket *utp_socket = NULL;
617
618    assert( session );
619    assert( tr_isAddress( addr ) );
620    assert( torrentHash );
621
622    if( utp )
623        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
624
625    if( !utp_socket ) {
626        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
627        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
628    }
629
630    if( fd < 0 && utp_socket == NULL )
631        return NULL;
632
633    return tr_peerIoNew( session, parent, addr, port,
634                         torrentHash, false, isSeed, fd, utp_socket );
635}
636
637/***
638****
639***/
640
641static void
642event_enable( tr_peerIo * io, short event )
643{
644    assert( tr_amInEventThread( io->session ) );
645    assert( io->session != NULL );
646    assert( io->session->events != NULL );
647
648    if( io->socket < 0 )
649        return;
650
651    assert( io->session->events != NULL );
652    assert( event_initialized( io->event_read ) );
653    assert( event_initialized( io->event_write ) );
654
655    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
656    {
657        dbgmsg( io, "enabling libevent ready-to-read polling" );
658        event_add( io->event_read, NULL );
659        io->pendingEvents |= EV_READ;
660    }
661
662    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
663    {
664        dbgmsg( io, "enabling libevent ready-to-write polling" );
665        event_add( io->event_write, NULL );
666        io->pendingEvents |= EV_WRITE;
667    }
668}
669
670static void
671event_disable( struct tr_peerIo * io, short event )
672{
673    assert( tr_amInEventThread( io->session ) );
674    assert( io->session != NULL );
675
676    if( io->socket < 0 )
677        return;
678
679    assert( io->session->events != NULL );
680    assert( event_initialized( io->event_read ) );
681    assert( event_initialized( io->event_write ) );
682
683    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
684    {
685        dbgmsg( io, "disabling libevent ready-to-read polling" );
686        event_del( io->event_read );
687        io->pendingEvents &= ~EV_READ;
688    }
689
690    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
691    {
692        dbgmsg( io, "disabling libevent ready-to-write polling" );
693        event_del( io->event_write );
694        io->pendingEvents &= ~EV_WRITE;
695    }
696}
697
698void
699tr_peerIoSetEnabled( tr_peerIo    * io,
700                     tr_direction   dir,
701                     bool           isEnabled )
702{
703    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
704
705    assert( tr_isPeerIo( io ) );
706    assert( tr_isDirection( dir ) );
707    assert( tr_amInEventThread( io->session ) );
708    assert( io->session->events != NULL );
709
710    if( isEnabled )
711        event_enable( io, event );
712    else
713        event_disable( io, event );
714}
715
716/***
717****
718***/
719static void
720io_close_socket( tr_peerIo * io )
721{
722    if( io->socket >= 0 ) {
723        tr_netClose( io->session, io->socket );
724        io->socket = -1;
725        event_free( io->event_read );
726        event_free( io->event_write );
727    }
728
729#ifdef WITH_UTP
730    if( io->utp_socket ) {
731        UTP_SetCallbacks( io->utp_socket,
732                          &dummy_utp_function_table,
733                          NULL );
734        UTP_Close( io->utp_socket );
735
736        io->utp_socket = NULL;
737    }
738#endif
739}
740
741static void
742io_dtor( void * vio )
743{
744    tr_peerIo * io = vio;
745
746    assert( tr_isPeerIo( io ) );
747    assert( tr_amInEventThread( io->session ) );
748    assert( io->session->events != NULL );
749
750    dbgmsg( io, "in tr_peerIo destructor" );
751    event_disable( io, EV_READ | EV_WRITE );
752    tr_bandwidthDestruct( &io->bandwidth );
753    evbuffer_free( io->outbuf );
754    evbuffer_free( io->inbuf );
755    io_close_socket( io );
756    tr_cryptoFree( io->crypto );
757    tr_list_free( &io->outbuf_datatypes, tr_free );
758
759    memset( io, ~0, sizeof( tr_peerIo ) );
760    tr_free( io );
761}
762
763static void
764tr_peerIoFree( tr_peerIo * io )
765{
766    if( io )
767    {
768        dbgmsg( io, "in tr_peerIoFree" );
769        io->canRead = NULL;
770        io->didWrite = NULL;
771        io->gotError = NULL;
772        tr_runInEventThread( io->session, io_dtor, io );
773    }
774}
775
776void
777tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
778{
779    assert( tr_isPeerIo( io ) );
780
781    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
782                file, line, io->refCount, io->refCount+1 );
783
784    ++io->refCount;
785}
786
787void
788tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
789{
790    assert( tr_isPeerIo( io ) );
791
792    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
793                file, line, io->refCount, io->refCount-1 );
794
795    if( !--io->refCount )
796        tr_peerIoFree( io );
797}
798
799const tr_address*
800tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
801{
802    assert( tr_isPeerIo( io ) );
803
804    if( port )
805        *port = io->port;
806
807    return &io->addr;
808}
809
810const char*
811tr_peerIoAddrStr( const tr_address * addr, tr_port port )
812{
813    static char buf[512];
814
815    if( addr->type == TR_AF_INET )
816        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
817    else
818        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
819    return buf;
820}
821
822const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
823{
824    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
825}
826
827void
828tr_peerIoSetIOFuncs( tr_peerIo        * io,
829                     tr_can_read_cb     readcb,
830                     tr_did_write_cb    writecb,
831                     tr_net_error_cb    errcb,
832                     void             * userData )
833{
834    io->canRead = readcb;
835    io->didWrite = writecb;
836    io->gotError = errcb;
837    io->userData = userData;
838}
839
840void
841tr_peerIoClear( tr_peerIo * io )
842{
843    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
844    tr_peerIoSetEnabled( io, TR_UP, false );
845    tr_peerIoSetEnabled( io, TR_DOWN, false );
846}
847
848int
849tr_peerIoReconnect( tr_peerIo * io )
850{
851    short int pendingEvents;
852    tr_session * session;
853
854    assert( tr_isPeerIo( io ) );
855    assert( !tr_peerIoIsIncoming( io ) );
856
857    session = tr_peerIoGetSession( io );
858
859    pendingEvents = io->pendingEvents;
860    event_disable( io, EV_READ | EV_WRITE );
861
862    io_close_socket( io );
863
864    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
865    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
866    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
867
868    if( io->socket >= 0 )
869    {
870        event_enable( io, pendingEvents );
871        tr_netSetTOS( io->socket, session->peerSocketTOS );
872        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
873        return 0;
874    }
875
876    return -1;
877}
878
879/**
880***
881**/
882
883void
884tr_peerIoSetTorrentHash( tr_peerIo *     io,
885                         const uint8_t * hash )
886{
887    assert( tr_isPeerIo( io ) );
888
889    tr_cryptoSetTorrentHash( io->crypto, hash );
890}
891
892const uint8_t*
893tr_peerIoGetTorrentHash( tr_peerIo * io )
894{
895    assert( tr_isPeerIo( io ) );
896    assert( io->crypto );
897
898    return tr_cryptoGetTorrentHash( io->crypto );
899}
900
901int
902tr_peerIoHasTorrentHash( const tr_peerIo * io )
903{
904    assert( tr_isPeerIo( io ) );
905    assert( io->crypto );
906
907    return tr_cryptoHasTorrentHash( io->crypto );
908}
909
910/**
911***
912**/
913
914void
915tr_peerIoSetPeersId( tr_peerIo *     io,
916                     const uint8_t * peer_id )
917{
918    assert( tr_isPeerIo( io ) );
919
920    if( ( io->peerIdIsSet = peer_id != NULL ) )
921        memcpy( io->peerId, peer_id, 20 );
922    else
923        memset( io->peerId, 0, 20 );
924}
925
926/**
927***
928**/
929
930static unsigned int
931getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
932{
933    /* this is all kind of arbitrary, but what seems to work well is
934     * being large enough to hold the next 20 seconds' worth of input,
935     * or a few blocks, whichever is bigger.
936     * It's okay to tweak this as needed */
937    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
938    const unsigned int period = 15u; /* arbitrary */
939    /* the 3 is arbitrary; the .5 is to leave room for messages */
940    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
941    return MAX( ceiling, currentSpeed_Bps*period );
942}
943
944size_t
945tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
946{
947    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
948    const size_t currentLen = evbuffer_get_length( io->outbuf );
949    size_t freeSpace = 0;
950
951    if( desiredLen > currentLen )
952        freeSpace = desiredLen - currentLen;
953
954    return freeSpace;
955}
956
957/**
958***
959**/
960
961void
962tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
963{
964    assert( tr_isPeerIo( io ) );
965    assert( encryptionMode == PEER_ENCRYPTION_NONE
966         || encryptionMode == PEER_ENCRYPTION_RC4 );
967
968    io->encryptionMode = encryptionMode;
969}
970
971/**
972***
973**/
974
975static void
976addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData )
977{
978    struct tr_datatype * d;
979
980    d = tr_new( struct tr_datatype, 1 );
981    d->isPieceData = isPieceData != 0;
982    d->length = byteCount;
983    tr_list_append( &io->outbuf_datatypes, d );
984}
985
986static struct evbuffer_iovec *
987evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
988{
989    const size_t byteCount = evbuffer_get_length( buf );
990    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
991    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
992    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
993    assert( n == vecCount );
994    *setme_vecCount = n;
995    return iovec;
996}
997
998static void
999maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
1000{
1001    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
1002    {
1003        size_t i, n;
1004        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
1005
1006        for( i=0; i<n; ++i )
1007            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
1008
1009        tr_free( iovec );
1010    }
1011}
1012
1013void
1014tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData )
1015{
1016    const size_t byteCount = evbuffer_get_length( buf );
1017    maybeEncryptBuffer( io, buf );
1018    evbuffer_add_buffer( io->outbuf, buf );
1019    addDatatype( io, byteCount, isPieceData );
1020}
1021
1022void
1023tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData )
1024{
1025    struct evbuffer * buf = evbuffer_new( );
1026    evbuffer_add( buf, bytes, byteCount );
1027    tr_peerIoWriteBuf( io, buf, isPieceData );
1028    evbuffer_free( buf );
1029}
1030
1031/***
1032****
1033***/
1034
1035void
1036evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1037{
1038    evbuffer_add( outbuf, &byte, 1 );
1039}
1040
1041void
1042evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1043{
1044    const uint16_t ns = htons( addme_hs );
1045    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1046}
1047
1048void
1049evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1050{
1051    const uint32_t nl = htonl( addme_hl );
1052    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1053}
1054
1055void
1056evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll )
1057{
1058    const uint64_t nll = tr_htonll( addme_hll );
1059    evbuffer_add( outbuf, &nll, sizeof( nll ) );
1060}
1061
1062/***
1063****
1064***/
1065
1066void
1067tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1068{
1069    assert( tr_isPeerIo( io ) );
1070    assert( evbuffer_get_length( inbuf )  >= byteCount );
1071
1072    switch( io->encryptionMode )
1073    {
1074        case PEER_ENCRYPTION_NONE:
1075            evbuffer_remove( inbuf, bytes, byteCount );
1076            break;
1077
1078        case PEER_ENCRYPTION_RC4:
1079            evbuffer_remove( inbuf, bytes, byteCount );
1080            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1081            break;
1082
1083        default:
1084            assert( 0 );
1085    }
1086}
1087
1088void
1089tr_peerIoReadUint16( tr_peerIo        * io,
1090                     struct evbuffer  * inbuf,
1091                     uint16_t         * setme )
1092{
1093    uint16_t tmp;
1094    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1095    *setme = ntohs( tmp );
1096}
1097
1098void tr_peerIoReadUint32( tr_peerIo        * io,
1099                          struct evbuffer  * inbuf,
1100                          uint32_t         * setme )
1101{
1102    uint32_t tmp;
1103    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1104    *setme = ntohl( tmp );
1105}
1106
1107void
1108tr_peerIoDrain( tr_peerIo       * io,
1109                struct evbuffer * inbuf,
1110                size_t            byteCount )
1111{
1112    void * buf = tr_sessionGetBuffer( io->session );
1113    const size_t buflen = SESSION_BUFFER_SIZE;
1114
1115    while( byteCount > 0 )
1116    {
1117        const size_t thisPass = MIN( byteCount, buflen );
1118        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1119        byteCount -= thisPass;
1120    }
1121
1122    tr_sessionReleaseBuffer( io->session );
1123}
1124
1125/***
1126****
1127***/
1128
1129static int
1130tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1131{
1132    int res = 0;
1133
1134    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1135    {
1136        if( io->utp_socket != NULL ) /* utp peer connection */
1137        {
1138            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1139             * It opens up the congestion window by sending an ACK (soonish)
1140             * if one was not going to be sent. */
1141            if( evbuffer_get_length( io->inbuf ) == 0 )
1142                UTP_RBDrained( io->utp_socket );
1143        }
1144        else /* tcp peer connection */
1145        {
1146            int e;
1147
1148            EVUTIL_SET_SOCKET_ERROR( 0 );
1149            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1150            e = EVUTIL_SOCKET_ERROR( );
1151
1152            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1153
1154            if( evbuffer_get_length( io->inbuf ) )
1155                canReadWrapper( io );
1156
1157            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1158            {
1159                char errstr[512];
1160                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1161                if( res == 0 )
1162                    what |= BEV_EVENT_EOF;
1163                tr_net_strerror( errstr, sizeof( errstr ), e );
1164                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1165                io->gotError( io, what, io->userData );
1166            }
1167        }
1168    }
1169
1170    return res;
1171}
1172
1173static int
1174tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1175{
1176    int n = 0;
1177    const size_t old_len = evbuffer_get_length( io->outbuf );
1178    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1179
1180    if( howmuch > old_len )
1181        howmuch = old_len;
1182
1183    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1184    {
1185        if( io->utp_socket != NULL ) /* utp peer connection */
1186        {
1187            const size_t old_len = evbuffer_get_length( io->outbuf );
1188            UTP_Write( io->utp_socket, howmuch );
1189            n = old_len - evbuffer_get_length( io->outbuf );
1190        }
1191        else
1192        {
1193            int e;
1194
1195            EVUTIL_SET_SOCKET_ERROR( 0 );
1196            n = tr_evbuffer_write( io, io->socket, howmuch );
1197            e = EVUTIL_SOCKET_ERROR( );
1198
1199            if( n > 0 )
1200                didWriteWrapper( io, n );
1201
1202            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1203            {
1204                char errstr[512];
1205                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1206
1207                tr_net_strerror( errstr, sizeof( errstr ), e );
1208                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1209
1210                if( io->gotError != NULL )
1211                    io->gotError( io, what, io->userData );
1212            }
1213        }
1214    }
1215
1216    return n;
1217}
1218
1219int
1220tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1221{
1222    int bytesUsed = 0;
1223
1224    assert( tr_isPeerIo( io ) );
1225    assert( tr_isDirection( dir ) );
1226
1227    if( dir == TR_DOWN )
1228        bytesUsed = tr_peerIoTryRead( io, limit );
1229    else
1230        bytesUsed = tr_peerIoTryWrite( io, limit );
1231
1232    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1233    return bytesUsed;
1234}
1235
1236int
1237tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1238{
1239    size_t byteCount = 0;
1240    tr_list * it;
1241
1242    /* count up how many bytes are used by non-piece-data messages
1243       at the front of our outbound queue */
1244    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1245    {
1246        struct tr_datatype * d = it->data;
1247
1248        if( d->isPieceData )
1249            break;
1250
1251        byteCount += d->length;
1252    }
1253
1254    return tr_peerIoFlush( io, TR_UP, byteCount );
1255}
Note: See TracBrowser for help on using the repository browser.