source: branches/1.4x/libtransmission/peer-io.c @ 7176

Last change on this file since 7176 was 7176, checked in by charles, 12 years ago

(1.4x libT) backport the bandwidth fixes back to the 1.4x branch, pass #2: most of the bandwidth code, plus autoconf and xcode makefile stuff

  • Property svn:keywords set to Date Rev Author Id
File size: 17.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7176 2008-11-29 16:44:24Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <netinet/in.h> /* struct in_addr */
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event.h>
27
28#include "transmission.h"
29#include "bandwidth.h"
30#include "crypto.h"
31#include "iobuf.h"
32#include "list.h"
33#include "net.h"
34#include "peer-io.h"
35#include "trevent.h"
36#include "utils.h"
37
38#define MAGIC_NUMBER 206745
39#define IO_TIMEOUT_SECS 8
40
41static size_t
42getPacketOverhead( size_t d )
43{
44    /**
45     * http://sd.wareonearth.com/~phil/net/overhead/
46     *
47     * TCP over Ethernet:
48     * Assuming no header compression (e.g. not PPP)
49     * Add 20 IPv4 header or 40 IPv6 header (no options)
50     * Add 20 TCP header
51     * Add 12 bytes optional TCP timestamps
52     * Max TCP Payload data rates over ethernet are thus:
53     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
54     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
55     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
56     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
57     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
58     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
59     */
60    static const double assumed_payload_data_rate = 94.0;
61
62    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
63}
64
65/**
66***
67**/
68
69#define dbgmsg( io, ... ) \
70    do { \
71        if( tr_deepLoggingIsActive( ) ) \
72            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
73    } while( 0 )
74
75struct tr_datatype
76{
77    tr_bool  isPieceData;
78    size_t   length;
79};
80
81struct tr_peerIo
82{
83    tr_bool                  isEncrypted;
84    tr_bool                  isIncoming;
85    tr_bool                  peerIdIsSet;
86    tr_bool                  extendedProtocolSupported;
87    tr_bool                  fastPeersSupported;
88
89    int                      magicNumber;
90
91    uint8_t                  encryptionMode;
92    uint8_t                  timeout;
93    uint16_t                 port;
94    int                      socket;
95
96    uint8_t                  peerId[20];
97    time_t                   timeCreated;
98
99    tr_session             * session;
100
101    struct in_addr           in_addr;
102    struct tr_iobuf        * iobuf;
103    tr_list                * output_datatypes; /* struct tr_datatype */
104
105    tr_can_read_cb           canRead;
106    tr_did_write_cb          didWrite;
107    tr_net_error_cb          gotError;
108    void *                   userData;
109
110    size_t                   bufferSize[2];
111
112    tr_bandwidth           * bandwidth;
113    tr_crypto              * crypto;
114};
115
116/***
117****
118***/
119
120static void
121didWriteWrapper( struct tr_iobuf  * iobuf,
122                 size_t             bytes_transferred,
123                 void             * vio )
124{
125    tr_peerIo *  io = vio;
126
127    while( bytes_transferred )
128    {
129        struct tr_datatype * next = io->output_datatypes->data;
130        const size_t payload = MIN( next->length, bytes_transferred );
131        const size_t overhead = getPacketOverhead( payload );
132
133        tr_bandwidthUsed( io->bandwidth, TR_UP, payload, next->isPieceData );
134
135        if( overhead > 0 )
136            tr_bandwidthUsed( io->bandwidth, TR_UP, overhead, FALSE );
137
138        if( io->didWrite )
139            io->didWrite( io, payload, next->isPieceData, io->userData );
140
141        bytes_transferred -= payload;
142        next->length -= payload;
143        if( !next->length )
144            tr_free( tr_list_pop_front( &io->output_datatypes ) );
145    }
146
147    if( EVBUFFER_LENGTH( tr_iobuf_output( iobuf ) ) )
148        tr_iobuf_enable( io->iobuf, EV_WRITE );
149}
150
151static void
152canReadWrapper( struct tr_iobuf  * iobuf,
153                size_t             bytes_transferred UNUSED,
154                void              * vio )
155{
156    int          done = 0;
157    int          err = 0;
158    tr_peerIo *  io = vio;
159    tr_session * session = io->session;
160
161    dbgmsg( io, "canRead" );
162
163    /* try to consume the input buffer */
164    if( io->canRead )
165    {
166        tr_globalLock( session );
167
168        while( !done && !err )
169        {
170            size_t piece = 0;
171            const size_t oldLen = EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
172            const int ret = io->canRead( iobuf, io->userData, &piece );
173
174            if( ret != READ_ERR )
175            {
176                const size_t used = oldLen - EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
177                if( piece )
178                    tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
179                if( used != piece )
180                    tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
181            }
182
183            switch( ret )
184            {
185                case READ_NOW:
186                    if( EVBUFFER_LENGTH( tr_iobuf_input( iobuf )))
187                        continue;
188                    done = 1;
189                    break;
190
191                case READ_LATER:
192                    done = 1;
193                    break;
194
195                case READ_ERR:
196                    err = 1;
197                    break;
198            }
199        }
200
201        tr_globalUnlock( session );
202    }
203}
204
205static void
206gotErrorWrapper( struct tr_iobuf  * iobuf,
207                 short              what,
208                 void             * userData )
209{
210    tr_peerIo * c = userData;
211
212    if( c->gotError )
213        c->gotError( iobuf, what, c->userData );
214}
215
216/**
217***
218**/
219
220static void
221bufevNew( tr_peerIo * io )
222{
223    io->iobuf = tr_iobuf_new( io->session,
224                              io->bandwidth,
225                              io->socket,
226                              EV_READ | EV_WRITE,
227                              canReadWrapper,
228                              didWriteWrapper,
229                              gotErrorWrapper,
230                              io );
231
232    tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
233}
234
235static int
236isPeerIo( const tr_peerIo * io )
237{
238    return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );
239}
240
241static tr_peerIo*
242tr_peerIoNew( tr_session *           session,
243              const struct in_addr * in_addr,
244              uint16_t               port,
245              const uint8_t *        torrentHash,
246              int                    isIncoming,
247              int                    socket )
248{
249    tr_peerIo * io;
250
251    if( socket >= 0 )
252        tr_netSetTOS( socket, session->peerSocketTOS );
253
254    io = tr_new0( tr_peerIo, 1 );
255    io->magicNumber = MAGIC_NUMBER;
256    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
257    io->session = session;
258    io->in_addr = *in_addr;
259    io->port = port;
260    io->socket = socket;
261    io->isIncoming = isIncoming != 0;
262    io->timeout = IO_TIMEOUT_SECS;
263    io->timeCreated = time( NULL );
264    bufevNew( io );
265    tr_peerIoSetBandwidth( io, session->bandwidth );
266    return io;
267}
268
269tr_peerIo*
270tr_peerIoNewIncoming( tr_session *           session,
271                      const struct in_addr * in_addr,
272                      uint16_t               port,
273                      int                    socket )
274{
275    assert( session );
276    assert( in_addr );
277    assert( socket >= 0 );
278
279    return tr_peerIoNew( session, in_addr, port,
280                         NULL, 1,
281                         socket );
282}
283
284tr_peerIo*
285tr_peerIoNewOutgoing( tr_session *           session,
286                      const struct in_addr * in_addr,
287                      int                    port,
288                      const uint8_t *        torrentHash )
289{
290    int socket;
291
292    assert( session );
293    assert( in_addr );
294    assert( port >= 0 );
295    assert( torrentHash );
296
297    socket = tr_netOpenTCP( session, in_addr, port );
298
299    return socket < 0
300           ? NULL
301           : tr_peerIoNew( session, in_addr, port, torrentHash, 0, socket );
302}
303
304static void
305io_dtor( void * vio )
306{
307    tr_peerIo * io = vio;
308
309    tr_peerIoSetBandwidth( io, NULL );
310    tr_iobuf_free( io->iobuf );
311    tr_netClose( io->socket );
312    tr_cryptoFree( io->crypto );
313    tr_list_free( &io->output_datatypes, tr_free );
314
315    io->magicNumber = 0xDEAD;
316    tr_free( io );
317}
318
319void
320tr_peerIoFree( tr_peerIo * io )
321{
322    if( io )
323    {
324        io->canRead = NULL;
325        io->didWrite = NULL;
326        io->gotError = NULL;
327        tr_runInEventThread( io->session, io_dtor, io );
328    }
329}
330
331tr_session*
332tr_peerIoGetSession( tr_peerIo * io )
333{
334    assert( isPeerIo( io ) );
335    assert( io->session );
336
337    return io->session;
338}
339
340const struct in_addr*
341tr_peerIoGetAddress( const tr_peerIo * io,
342                           uint16_t * port )
343{
344    assert( isPeerIo( io ) );
345
346    if( port )
347        *port = io->port;
348
349    return &io->in_addr;
350}
351
352const char*
353tr_peerIoAddrStr( const struct in_addr * addr,
354                  uint16_t               port )
355{
356    static char buf[512];
357
358    tr_snprintf( buf, sizeof( buf ), "%s:%u", inet_ntoa( *addr ),
359                ntohs( port ) );
360    return buf;
361}
362
363const char*
364tr_peerIoGetAddrStr( const tr_peerIo * io )
365{
366    return tr_peerIoAddrStr( &io->in_addr, io->port );
367}
368
369static void
370tr_peerIoTryRead( tr_peerIo * io )
371{
372    if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf )))
373        (*canReadWrapper)( io->iobuf, ~0, io );
374}
375
376void
377tr_peerIoSetIOFuncs( tr_peerIo *     io,
378                     tr_can_read_cb  readcb,
379                     tr_did_write_cb writecb,
380                     tr_net_error_cb errcb,
381                     void *          userData )
382{
383    io->canRead = readcb;
384    io->didWrite = writecb;
385    io->gotError = errcb;
386    io->userData = userData;
387
388    tr_peerIoTryRead( io );
389}
390
391int
392tr_peerIoIsIncoming( const tr_peerIo * c )
393{
394    return c->isIncoming ? 1 : 0;
395}
396
397int
398tr_peerIoReconnect( tr_peerIo * io )
399{
400    assert( !tr_peerIoIsIncoming( io ) );
401
402    if( io->socket >= 0 )
403        tr_netClose( io->socket );
404
405    io->socket = tr_netOpenTCP( io->session, &io->in_addr, io->port );
406
407    if( io->socket >= 0 )
408    {
409        tr_bandwidth * bandwidth = io->bandwidth;
410        tr_peerIoSetBandwidth( io, NULL );
411
412        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
413        tr_iobuf_free( io->iobuf );
414        bufevNew( io );
415
416        tr_peerIoSetBandwidth( io, bandwidth );
417        return 0;
418    }
419
420    return -1;
421}
422
423void
424tr_peerIoSetTimeoutSecs( tr_peerIo * io,
425                         int         secs )
426{
427    io->timeout = secs;
428    tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
429    tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
430}
431
432/**
433***
434**/
435
436void
437tr_peerIoSetTorrentHash( tr_peerIo *     io,
438                         const uint8_t * hash )
439{
440    assert( isPeerIo( io ) );
441
442    tr_cryptoSetTorrentHash( io->crypto, hash );
443}
444
445const uint8_t*
446tr_peerIoGetTorrentHash( tr_peerIo * io )
447{
448    assert( isPeerIo( io ) );
449    assert( io->crypto );
450
451    return tr_cryptoGetTorrentHash( io->crypto );
452}
453
454int
455tr_peerIoHasTorrentHash( const tr_peerIo * io )
456{
457    assert( isPeerIo( io ) );
458    assert( io->crypto );
459
460    return tr_cryptoHasTorrentHash( io->crypto );
461}
462
463/**
464***
465**/
466
467void
468tr_peerIoSetPeersId( tr_peerIo *     io,
469                     const uint8_t * peer_id )
470{
471    assert( isPeerIo( io ) );
472
473    if( ( io->peerIdIsSet = peer_id != NULL ) )
474        memcpy( io->peerId, peer_id, 20 );
475    else
476        memset( io->peerId, 0, 20 );
477}
478
479const uint8_t*
480tr_peerIoGetPeersId( const tr_peerIo * io )
481{
482    assert( isPeerIo( io ) );
483    assert( io->peerIdIsSet );
484
485    return io->peerId;
486}
487
488/**
489***
490**/
491
492void
493tr_peerIoEnableLTEP( tr_peerIo * io,
494                     int         flag )
495{
496    assert( isPeerIo( io ) );
497    assert( flag == 0 || flag == 1 );
498
499    io->extendedProtocolSupported = flag;
500}
501
502void
503tr_peerIoEnableFEXT( tr_peerIo * io,
504                     int         flag )
505{
506    assert( isPeerIo( io ) );
507    assert( flag == 0 || flag == 1 );
508
509    io->fastPeersSupported = flag;
510}
511
512int
513tr_peerIoSupportsLTEP( const tr_peerIo * io )
514{
515    assert( isPeerIo( io ) );
516
517    return io->extendedProtocolSupported;
518}
519
520int
521tr_peerIoSupportsFEXT( const tr_peerIo * io )
522{
523    assert( isPeerIo( io ) );
524
525    return io->fastPeersSupported;
526}
527
528/**
529***
530**/
531
532static size_t
533getDesiredOutputBufferSize( const tr_peerIo * io )
534{
535    /* this is all kind of arbitrary, but what seems to work well is
536     * being large enough to hold the next 15 seconds' worth of input,
537     * or two and a half blocks, whichever is bigger.
538     * It's okay to tweak this as needed */
539    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
540    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
541    const double period = 20; /* arbitrary */
542    return MAX( maxBlockSize*2.5, currentSpeed*1024*period );
543}
544
545size_t
546tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
547{
548    const size_t desiredLen = getDesiredOutputBufferSize( io );
549    const size_t currentLen = EVBUFFER_LENGTH( tr_iobuf_output( io->iobuf ) );
550    size_t freeSpace = 0;
551
552    if( desiredLen > currentLen )
553        freeSpace = desiredLen - currentLen;
554
555    return freeSpace;
556}
557
558void
559tr_peerIoSetBandwidth( tr_peerIo     * io,
560                       tr_bandwidth  * bandwidth )
561{
562    assert( isPeerIo( io ) );
563
564    if( io->bandwidth )
565        tr_bandwidthRemoveBuffer( io->bandwidth, io->iobuf );
566
567    io->bandwidth = bandwidth;
568    tr_iobuf_set_bandwidth( io->iobuf, bandwidth );
569
570    if( io->bandwidth )
571        tr_bandwidthAddBuffer( io->bandwidth, io->iobuf );
572
573    tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
574}
575
576/**
577***
578**/
579
580tr_crypto*
581tr_peerIoGetCrypto( tr_peerIo * c )
582{
583    return c->crypto;
584}
585
586void
587tr_peerIoSetEncryption( tr_peerIo * io,
588                        int         encryptionMode )
589{
590    assert( isPeerIo( io ) );
591    assert( encryptionMode == PEER_ENCRYPTION_NONE
592          || encryptionMode == PEER_ENCRYPTION_RC4 );
593
594    io->encryptionMode = encryptionMode;
595}
596
597int
598tr_peerIoIsEncrypted( const tr_peerIo * io )
599{
600    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
601}
602
603/**
604***
605**/
606
607void
608tr_peerIoWrite( tr_peerIo   * io,
609                const void  * writeme,
610                size_t        writemeLen,
611                int           isPieceData )
612{
613    struct tr_datatype * datatype;
614    assert( tr_amInEventThread( io->session ) );
615    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
616
617    datatype = tr_new( struct tr_datatype, 1 );
618    datatype->isPieceData = isPieceData != 0;
619    datatype->length = writemeLen;
620    tr_list_append( &io->output_datatypes, datatype );
621
622    evbuffer_add( tr_iobuf_output( io->iobuf ), writeme, writemeLen );
623    tr_iobuf_enable( io->iobuf, EV_WRITE );
624}
625
626void
627tr_peerIoWriteBuf( tr_peerIo         * io,
628                   struct evbuffer   * buf,
629                   int                 isPieceData )
630{
631    const size_t n = EVBUFFER_LENGTH( buf );
632
633    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
634    evbuffer_drain( buf, n );
635}
636
637/**
638***
639**/
640
641void
642tr_peerIoWriteBytes( tr_peerIo *       io,
643                     struct evbuffer * outbuf,
644                     const void *      bytes,
645                     size_t            byteCount )
646{
647    uint8_t * tmp;
648
649    switch( io->encryptionMode )
650    {
651        case PEER_ENCRYPTION_NONE:
652            evbuffer_add( outbuf, bytes, byteCount );
653            break;
654
655        case PEER_ENCRYPTION_RC4:
656            tmp = tr_new( uint8_t, byteCount );
657            tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
658            evbuffer_add( outbuf, tmp, byteCount );
659            tr_free( tmp );
660            break;
661
662        default:
663            assert( 0 );
664    }
665}
666
667void
668tr_peerIoWriteUint8( tr_peerIo *       io,
669                     struct evbuffer * outbuf,
670                     uint8_t           writeme )
671{
672    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
673}
674
675void
676tr_peerIoWriteUint16( tr_peerIo *       io,
677                      struct evbuffer * outbuf,
678                      uint16_t          writeme )
679{
680    uint16_t tmp = htons( writeme );
681
682    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
683}
684
685void
686tr_peerIoWriteUint32( tr_peerIo *       io,
687                      struct evbuffer * outbuf,
688                      uint32_t          writeme )
689{
690    uint32_t tmp = htonl( writeme );
691
692    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
693}
694
695/***
696****
697***/
698
699void
700tr_peerIoReadBytes( tr_peerIo *       io,
701                    struct evbuffer * inbuf,
702                    void *            bytes,
703                    size_t            byteCount )
704{
705    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
706
707    switch( io->encryptionMode )
708    {
709        case PEER_ENCRYPTION_NONE:
710            evbuffer_remove( inbuf, bytes, byteCount );
711            break;
712
713        case PEER_ENCRYPTION_RC4:
714            evbuffer_remove( inbuf, bytes, byteCount );
715            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
716            break;
717
718        default:
719            assert( 0 );
720    }
721}
722
723void
724tr_peerIoReadUint8( tr_peerIo *       io,
725                    struct evbuffer * inbuf,
726                    uint8_t *         setme )
727{
728    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
729}
730
731void
732tr_peerIoReadUint16( tr_peerIo *       io,
733                     struct evbuffer * inbuf,
734                     uint16_t *        setme )
735{
736    uint16_t tmp;
737
738    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
739    *setme = ntohs( tmp );
740}
741
742void
743tr_peerIoReadUint32( tr_peerIo *       io,
744                     struct evbuffer * inbuf,
745                     uint32_t *        setme )
746{
747    uint32_t tmp;
748
749    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
750    *setme = ntohl( tmp );
751}
752
753void
754tr_peerIoDrain( tr_peerIo *       io,
755                struct evbuffer * inbuf,
756                size_t            byteCount )
757{
758    uint8_t * tmp = tr_new( uint8_t, byteCount );
759
760    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
761    tr_free( tmp );
762}
763
764int
765tr_peerIoGetAge( const tr_peerIo * io )
766{
767    return time( NULL ) - io->timeCreated;
768}
Note: See TracBrowser for help on using the repository browser.