source: trunk/libtransmission/peer-io.c @ 7147

Last change on this file since 7147 was 7147, checked in by charles, 12 years ago

(libT) #1468: another stab at getting the peer transfer speeds both fast and a little more consistent.

  • Property svn:keywords set to Date Rev Author Id
File size: 18.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7147 2008-11-24 04:21:23Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <netinet/in.h> /* struct in_addr */
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event.h>
27
28#include "transmission.h"
29#include "bandwidth.h"
30#include "crypto.h"
31#include "list.h"
32#include "net.h"
33#include "peer-io.h"
34#include "trevent.h"
35#include "utils.h"
36
37#define IO_TIMEOUT_SECS 8
38
39static size_t
40addPacketOverhead( size_t d )
41{
42    /**
43     * http://sd.wareonearth.com/~phil/net/overhead/
44     *
45     * TCP over Ethernet:
46     * Assuming no header compression (e.g. not PPP)
47     * Add 20 IPv4 header or 40 IPv6 header (no options)
48     * Add 20 TCP header
49     * Add 12 bytes optional TCP timestamps
50     * Max TCP Payload data rates over ethernet are thus:
51     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
52     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
53     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
54     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
55     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
56     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
57     */
58    static const double assumed_payload_data_rate = 94.0;
59
60    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) );
61}
62
63/**
64***
65**/
66
67#define dbgmsg( io, ... ) \
68    do { \
69        if( tr_deepLoggingIsActive( ) ) \
70            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
71    } while( 0 )
72
73struct tr_datatype
74{
75    unsigned int    isPieceData : 1;
76    size_t          length;
77};
78
79struct tr_peerIo
80{
81    unsigned int           isEncrypted               : 1;
82    unsigned int           isIncoming                : 1;
83    unsigned int           peerIdIsSet               : 1;
84    unsigned int           extendedProtocolSupported : 1;
85    unsigned int           fastPeersSupported        : 1;
86
87    uint8_t                encryptionMode;
88    uint8_t                timeout;
89    uint16_t               port;
90    int                    socket;
91
92    uint8_t                peerId[20];
93    time_t                 timeCreated;
94
95    tr_session           * session;
96
97    struct in_addr         in_addr;
98    struct bufferevent   * bufev;
99    struct evbuffer      * output;
100    tr_list              * output_datatypes; /* struct tr_datatype */
101
102    tr_can_read_cb         canRead;
103    tr_did_write_cb        didWrite;
104    tr_net_error_cb        gotError;
105    void *                 userData;
106
107    size_t                 bufferSize[2];
108
109    tr_bandwidth         * bandwidth[2];
110    tr_crypto            * crypto;
111};
112
113/**
114***
115**/
116
117static void
118adjustOutputBuffer( tr_peerIo * io )
119{
120    struct evbuffer * live = EVBUFFER_OUTPUT( io->bufev );
121    size_t curLive = EVBUFFER_LENGTH( live );
122    size_t maxLive = tr_bandwidthClamp( io->bandwidth[TR_UP], io->session->so_sndbuf );
123
124    if( ( curLive < maxLive ) && EVBUFFER_LENGTH( io->output ) )
125    {
126        size_t freeSpace = maxLive - curLive;
127        size_t n = MIN( freeSpace, EVBUFFER_LENGTH( io->output ) );
128        bufferevent_write( io->bufev, EVBUFFER_DATA( io->output ), n );
129        evbuffer_drain( io->output, n );
130        curLive += n;
131    }
132
133    io->bufferSize[TR_UP] = curLive;
134
135    if( curLive )
136        bufferevent_enable( io->bufev, EV_WRITE );
137
138    dbgmsg( io, "after adjusting the output buffer, its size is now %zu", curLive );
139}
140
141static void
142adjustInputBuffer( tr_peerIo * io )
143{
144    /* FIXME: the max read size probably needs to vary depending on the
145     * number of peers that we have connected...  1024 is going to force
146     * us way over the limit when there are lots of peers */
147    static const int maxBufSize = 1024;
148    const size_t n = tr_bandwidthClamp( io->bandwidth[TR_DOWN], maxBufSize );
149
150    if( !n )
151    {
152        dbgmsg( io, "disabling reads because we've hit our limit" );
153        bufferevent_disable( io->bufev, EV_READ );
154    }
155    else
156    {
157        dbgmsg( io, "enabling reading of %zu more bytes", n );
158        bufferevent_setwatermark( io->bufev, EV_READ, 0, n );
159        bufferevent_enable( io->bufev, EV_READ );
160    }
161
162    io->bufferSize[TR_DOWN] = EVBUFFER_LENGTH( EVBUFFER_INPUT( io->bufev ) );
163}
164
165/***
166****
167***/
168
169static void
170didWriteWrapper( struct bufferevent * e,
171                 void *               vio )
172{
173    tr_peerIo *  io = vio;
174    const size_t len = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( e ) );
175
176    dbgmsg( io, "didWrite... io->outputBufferSize was %zu, is now %zu",
177            io->bufferSize[TR_UP], len );
178
179    if( len < io->bufferSize[TR_UP] )
180    {
181        size_t payload = io->bufferSize[TR_UP] - len;
182
183        while( payload )
184        {
185            struct tr_datatype * next = io->output_datatypes->data;
186            const size_t chunk_length = MIN( next->length, payload );
187            const size_t n = addPacketOverhead( chunk_length );
188
189            if( io->didWrite )
190                io->didWrite( io, n, next->isPieceData, io->userData );
191
192            payload -= chunk_length;
193            next->length -= chunk_length;
194            if( !next->length )
195                tr_free( tr_list_pop_front( &io->output_datatypes ) );
196        }
197    }
198
199    adjustOutputBuffer( io );
200
201}
202
203static void
204canReadWrapper( struct bufferevent * e,
205                void *               vio )
206{
207    int          done = 0;
208    int          err = 0;
209    tr_peerIo *  io = vio;
210    tr_session * session = io->session;
211
212    dbgmsg( io, "canRead" );
213
214    /* try to consume the input buffer */
215    if( io->canRead )
216    {
217        tr_globalLock( session );
218
219        while( !done && !err )
220        {
221            const int ret = io->canRead( e, io->userData );
222
223            switch( ret )
224            {
225                case READ_NOW:
226                    if( EVBUFFER_LENGTH( e->input ) )
227                        continue;
228                    done = 1;
229                    break;
230
231                case READ_LATER:
232                    done = 1;
233                    break;
234
235                case READ_ERR:
236                    err = 1;
237                    break;
238            }
239        }
240
241        tr_globalUnlock( session );
242    }
243
244    if( !err )
245        adjustInputBuffer( io );
246}
247
248static void
249gotErrorWrapper( struct bufferevent * e,
250                 short                what,
251                 void *               userData )
252{
253    tr_peerIo * c = userData;
254
255    if( c->gotError )
256        c->gotError( e, what, c->userData );
257}
258
259/**
260***
261**/
262
263static void
264bufevNew( tr_peerIo * io )
265{
266    io->bufev = bufferevent_new( io->socket,
267                                 canReadWrapper,
268                                 didWriteWrapper,
269                                 gotErrorWrapper,
270                                 io );
271
272    /* tell libevent to call didWriteWrapper after every write,
273     * not just when the write buffer is empty */
274    bufferevent_setwatermark( io->bufev, EV_WRITE, INT_MAX, 0 );
275
276    bufferevent_settimeout( io->bufev, io->timeout, io->timeout );
277
278    bufferevent_enable( io->bufev, EV_READ | EV_WRITE );
279}
280
281static tr_peerIo*
282tr_peerIoNew( tr_session *           session,
283              const struct in_addr * in_addr,
284              uint16_t               port,
285              const uint8_t *        torrentHash,
286              int                    isIncoming,
287              int                    socket )
288{
289    tr_peerIo * io;
290
291    if( socket >= 0 )
292        tr_netSetTOS( socket, session->peerSocketTOS );
293
294    io = tr_new0( tr_peerIo, 1 );
295    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
296    io->session = session;
297    io->in_addr = *in_addr;
298    io->port = port;
299    io->socket = socket;
300    io->isIncoming = isIncoming != 0;
301    io->timeout = IO_TIMEOUT_SECS;
302    io->timeCreated = time( NULL );
303    io->output = evbuffer_new( );
304    bufevNew( io );
305    return io;
306}
307
308tr_peerIo*
309tr_peerIoNewIncoming( tr_session *           session,
310                      const struct in_addr * in_addr,
311                      uint16_t               port,
312                      int                    socket )
313{
314    assert( session );
315    assert( in_addr );
316    assert( socket >= 0 );
317
318    return tr_peerIoNew( session, in_addr, port,
319                         NULL, 1,
320                         socket );
321}
322
323tr_peerIo*
324tr_peerIoNewOutgoing( tr_session *           session,
325                      const struct in_addr * in_addr,
326                      int                    port,
327                      const uint8_t *        torrentHash )
328{
329    int socket;
330
331    assert( session );
332    assert( in_addr );
333    assert( port >= 0 );
334    assert( torrentHash );
335
336    socket = tr_netOpenTCP( session, in_addr, port );
337
338    return socket < 0
339           ? NULL
340           : tr_peerIoNew( session, in_addr, port, torrentHash, 0, socket );
341}
342
343static void
344io_dtor( void * vio )
345{
346    tr_peerIo * io = vio;
347
348    evbuffer_free( io->output );
349    bufferevent_free( io->bufev );
350    tr_netClose( io->socket );
351    tr_cryptoFree( io->crypto );
352    tr_list_free( &io->output_datatypes, tr_free );
353    tr_free( io );
354}
355
356void
357tr_peerIoFree( tr_peerIo * io )
358{
359    if( io )
360    {
361        io->canRead = NULL;
362        io->didWrite = NULL;
363        io->gotError = NULL;
364        tr_runInEventThread( io->session, io_dtor, io );
365    }
366}
367
368tr_session*
369tr_peerIoGetSession( tr_peerIo * io )
370{
371    assert( io );
372    assert( io->session );
373
374    return io->session;
375}
376
377const struct in_addr*
378tr_peerIoGetAddress( const tr_peerIo * io,
379                           uint16_t * port )
380{
381    assert( io );
382
383    if( port )
384        *port = io->port;
385
386    return &io->in_addr;
387}
388
389const char*
390tr_peerIoAddrStr( const struct in_addr * addr,
391                  uint16_t               port )
392{
393    static char buf[512];
394
395    tr_snprintf( buf, sizeof( buf ), "%s:%u", inet_ntoa( *addr ),
396                ntohs( port ) );
397    return buf;
398}
399
400const char*
401tr_peerIoGetAddrStr( const tr_peerIo * io )
402{
403    return tr_peerIoAddrStr( &io->in_addr, io->port );
404}
405
406static void
407tr_peerIoTryRead( tr_peerIo * io )
408{
409    if( EVBUFFER_LENGTH( io->bufev->input ) )
410        canReadWrapper( io->bufev, io );
411}
412
413void
414tr_peerIoSetIOFuncs( tr_peerIo *     io,
415                     tr_can_read_cb  readcb,
416                     tr_did_write_cb writecb,
417                     tr_net_error_cb errcb,
418                     void *          userData )
419{
420    io->canRead = readcb;
421    io->didWrite = writecb;
422    io->gotError = errcb;
423    io->userData = userData;
424
425    tr_peerIoTryRead( io );
426}
427
428int
429tr_peerIoIsIncoming( const tr_peerIo * c )
430{
431    return c->isIncoming ? 1 : 0;
432}
433
434int
435tr_peerIoReconnect( tr_peerIo * io )
436{
437    assert( !tr_peerIoIsIncoming( io ) );
438
439    if( io->socket >= 0 )
440        tr_netClose( io->socket );
441
442    io->socket = tr_netOpenTCP( io->session, &io->in_addr, io->port );
443
444    if( io->socket >= 0 )
445    {
446        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
447
448        bufferevent_free( io->bufev );
449        bufevNew( io );
450        return 0;
451    }
452
453    return -1;
454}
455
456void
457tr_peerIoSetTimeoutSecs( tr_peerIo * io,
458                         int         secs )
459{
460    io->timeout = secs;
461    bufferevent_settimeout( io->bufev, io->timeout, io->timeout );
462    bufferevent_enable( io->bufev, EV_READ | EV_WRITE );
463}
464
465/**
466***
467**/
468
469void
470tr_peerIoSetTorrentHash( tr_peerIo *     io,
471                         const uint8_t * hash )
472{
473    assert( io );
474
475    tr_cryptoSetTorrentHash( io->crypto, hash );
476}
477
478const uint8_t*
479tr_peerIoGetTorrentHash( tr_peerIo * io )
480{
481    assert( io );
482    assert( io->crypto );
483
484    return tr_cryptoGetTorrentHash( io->crypto );
485}
486
487int
488tr_peerIoHasTorrentHash( const tr_peerIo * io )
489{
490    assert( io );
491    assert( io->crypto );
492
493    return tr_cryptoHasTorrentHash( io->crypto );
494}
495
496/**
497***
498**/
499
500void
501tr_peerIoSetPeersId( tr_peerIo *     io,
502                     const uint8_t * peer_id )
503{
504    assert( io );
505
506    if( ( io->peerIdIsSet = peer_id != NULL ) )
507        memcpy( io->peerId, peer_id, 20 );
508    else
509        memset( io->peerId, 0, 20 );
510}
511
512const uint8_t*
513tr_peerIoGetPeersId( const tr_peerIo * io )
514{
515    assert( io );
516    assert( io->peerIdIsSet );
517
518    return io->peerId;
519}
520
521/**
522***
523**/
524
525void
526tr_peerIoEnableLTEP( tr_peerIo * io,
527                     int         flag )
528{
529    assert( io );
530    assert( flag == 0 || flag == 1 );
531
532    io->extendedProtocolSupported = flag;
533}
534
535void
536tr_peerIoEnableFEXT( tr_peerIo * io,
537                     int         flag )
538{
539    assert( io );
540    assert( flag == 0 || flag == 1 );
541
542    io->fastPeersSupported = flag;
543}
544
545int
546tr_peerIoSupportsLTEP( const tr_peerIo * io )
547{
548    assert( io );
549
550    return io->extendedProtocolSupported;
551}
552
553int
554tr_peerIoSupportsFEXT( const tr_peerIo * io )
555{
556    assert( io );
557
558    return io->fastPeersSupported;
559}
560
561/**
562***
563**/
564
565size_t
566tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
567{
568    const size_t desiredLiveLen = tr_bandwidthClamp( io->bandwidth[TR_UP], io->session->so_rcvbuf );
569    const size_t currentLiveLen = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( io->bufev ) );
570    const size_t desiredQueueLen = io->session->so_sndbuf;
571    const size_t currentQueueLen = EVBUFFER_LENGTH( io->output );
572    const size_t desiredLen = desiredLiveLen + desiredQueueLen;
573    const size_t currentLen = currentLiveLen + currentQueueLen;
574    size_t freeSpace = 0;
575
576    if( desiredLen > currentLen )
577        freeSpace = desiredLen - currentLen;
578
579    return freeSpace;
580}
581
582void
583tr_peerIoSetBandwidth( tr_peerIo     * io,
584                       tr_direction    direction,
585                       tr_bandwidth  * bandwidth )
586{
587    assert( io );
588    assert( direction == TR_UP || direction == TR_DOWN );
589
590    io->bandwidth[direction] = bandwidth;
591
592    if( direction ==  TR_UP )
593        adjustOutputBuffer( io );
594    else
595        adjustInputBuffer( io );
596}
597
598/**
599***
600**/
601
602tr_crypto*
603tr_peerIoGetCrypto( tr_peerIo * c )
604{
605    return c->crypto;
606}
607
608void
609tr_peerIoSetEncryption( tr_peerIo * io,
610                        int         encryptionMode )
611{
612    assert( io );
613    assert( encryptionMode == PEER_ENCRYPTION_NONE
614          || encryptionMode == PEER_ENCRYPTION_RC4 );
615
616    io->encryptionMode = encryptionMode;
617}
618
619int
620tr_peerIoIsEncrypted( const tr_peerIo * io )
621{
622    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
623}
624
625/**
626***
627**/
628
629void
630tr_peerIoWrite( tr_peerIo   * io,
631                const void  * writeme,
632                size_t        writemeLen,
633                int           isPieceData )
634{
635    struct tr_datatype * datatype;
636    assert( tr_amInEventThread( io->session ) );
637    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
638
639    evbuffer_add( io->output, writeme, writemeLen );
640
641    datatype = tr_new( struct tr_datatype, 1 );
642    datatype->isPieceData = isPieceData != 0;
643    datatype->length = writemeLen;
644    tr_list_append( &io->output_datatypes, datatype );
645
646    adjustOutputBuffer( io );
647}
648
649void
650tr_peerIoWriteBuf( tr_peerIo         * io,
651                   struct evbuffer   * buf,
652                   int                 isPieceData )
653{
654    const size_t n = EVBUFFER_LENGTH( buf );
655
656    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
657    evbuffer_drain( buf, n );
658}
659
660/**
661***
662**/
663
664void
665tr_peerIoWriteBytes( tr_peerIo *       io,
666                     struct evbuffer * outbuf,
667                     const void *      bytes,
668                     size_t            byteCount )
669{
670    uint8_t * tmp;
671
672    switch( io->encryptionMode )
673    {
674        case PEER_ENCRYPTION_NONE:
675            evbuffer_add( outbuf, bytes, byteCount );
676            break;
677
678        case PEER_ENCRYPTION_RC4:
679            tmp = tr_new( uint8_t, byteCount );
680            tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
681            evbuffer_add( outbuf, tmp, byteCount );
682            tr_free( tmp );
683            break;
684
685        default:
686            assert( 0 );
687    }
688}
689
690void
691tr_peerIoWriteUint8( tr_peerIo *       io,
692                     struct evbuffer * outbuf,
693                     uint8_t           writeme )
694{
695    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
696}
697
698void
699tr_peerIoWriteUint16( tr_peerIo *       io,
700                      struct evbuffer * outbuf,
701                      uint16_t          writeme )
702{
703    uint16_t tmp = htons( writeme );
704
705    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
706}
707
708void
709tr_peerIoWriteUint32( tr_peerIo *       io,
710                      struct evbuffer * outbuf,
711                      uint32_t          writeme )
712{
713    uint32_t tmp = htonl( writeme );
714
715    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
716}
717
718/***
719****
720***/
721
722void
723tr_peerIoReadBytes( tr_peerIo *       io,
724                    struct evbuffer * inbuf,
725                    void *            bytes,
726                    size_t            byteCount )
727{
728    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
729
730    switch( io->encryptionMode )
731    {
732        case PEER_ENCRYPTION_NONE:
733            evbuffer_remove( inbuf, bytes, byteCount );
734            break;
735
736        case PEER_ENCRYPTION_RC4:
737            evbuffer_remove( inbuf, bytes, byteCount );
738            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
739            break;
740
741        default:
742            assert( 0 );
743    }
744}
745
746void
747tr_peerIoReadUint8( tr_peerIo *       io,
748                    struct evbuffer * inbuf,
749                    uint8_t *         setme )
750{
751    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
752}
753
754void
755tr_peerIoReadUint16( tr_peerIo *       io,
756                     struct evbuffer * inbuf,
757                     uint16_t *        setme )
758{
759    uint16_t tmp;
760
761    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
762    *setme = ntohs( tmp );
763}
764
765void
766tr_peerIoReadUint32( tr_peerIo *       io,
767                     struct evbuffer * inbuf,
768                     uint32_t *        setme )
769{
770    uint32_t tmp;
771
772    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
773    *setme = ntohl( tmp );
774}
775
776void
777tr_peerIoDrain( tr_peerIo *       io,
778                struct evbuffer * inbuf,
779                size_t            byteCount )
780{
781    uint8_t * tmp = tr_new( uint8_t, byteCount );
782
783    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
784    tr_free( tmp );
785}
786
787int
788tr_peerIoGetAge( const tr_peerIo * io )
789{
790    return time( NULL ) - io->timeCreated;
791}
Note: See TracBrowser for help on using the repository browser.