source: trunk/libtransmission/peer-io.c @ 7154

Last change on this file since 7154 was 7154, checked in by charles, 12 years ago

(libT) yet another stab at getting bandwidth management under control. this version may suck less than previous attempts. It also breaks the mac build until someone adds iobuf.[ch] to xcode...

  • Property svn:keywords set to Date Rev Author Id
File size: 17.3 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7154 2008-11-25 21:35:17Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <netinet/in.h> /* struct in_addr */
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event.h>
27
28#include "transmission.h"
29#include "bandwidth.h"
30#include "crypto.h"
31#include "iobuf.h"
32#include "list.h"
33#include "net.h"
34#include "peer-io.h"
35#include "trevent.h"
36#include "utils.h"
37
38#define MAGIC_NUMBER 206745
39#define IO_TIMEOUT_SECS 8
40
41static size_t
42addPacketOverhead( size_t d )
43{
44    /**
45     * http://sd.wareonearth.com/~phil/net/overhead/
46     *
47     * TCP over Ethernet:
48     * Assuming no header compression (e.g. not PPP)
49     * Add 20 IPv4 header or 40 IPv6 header (no options)
50     * Add 20 TCP header
51     * Add 12 bytes optional TCP timestamps
52     * Max TCP Payload data rates over ethernet are thus:
53     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
54     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
55     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
56     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
57     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
58     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
59     */
60    static const double assumed_payload_data_rate = 94.0;
61
62    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) );
63}
64
65/**
66***
67**/
68
69#define dbgmsg( io, ... ) \
70    do { \
71        if( tr_deepLoggingIsActive( ) ) \
72            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
73    } while( 0 )
74
75struct tr_datatype
76{
77    unsigned int    isPieceData : 1;
78    size_t          length;
79};
80
81struct tr_peerIo
82{
83    unsigned int             isEncrypted               : 1;
84    unsigned int             isIncoming                : 1;
85    unsigned int             peerIdIsSet               : 1;
86    unsigned int             extendedProtocolSupported : 1;
87    unsigned int             fastPeersSupported        : 1;
88
89    int                      magicNumber;
90
91    uint8_t                  encryptionMode;
92    uint8_t                  timeout;
93    uint16_t                 port;
94    int                      socket;
95
96    uint8_t                  peerId[20];
97    time_t                   timeCreated;
98
99    tr_session             * session;
100
101    struct in_addr           in_addr;
102    struct tr_iobuf        * iobuf;
103    tr_list                * output_datatypes; /* struct tr_datatype */
104
105    tr_can_read_cb           canRead;
106    tr_did_write_cb          didWrite;
107    tr_net_error_cb          gotError;
108    void *                   userData;
109
110    size_t                   bufferSize[2];
111
112    tr_bandwidth           * bandwidth;
113    tr_crypto              * crypto;
114};
115
116/***
117****
118***/
119
120static void
121didWriteWrapper( struct tr_iobuf  * iobuf,
122                 size_t             bytes_transferred,
123                 void             * vio )
124{
125    tr_peerIo *  io = vio;
126
127    while( bytes_transferred )
128    {
129        struct tr_datatype * next = io->output_datatypes->data;
130        const size_t chunk_length = MIN( next->length, bytes_transferred );
131        const size_t n = addPacketOverhead( chunk_length );
132
133        tr_bandwidthUsed( io->bandwidth, TR_UP, n, next->isPieceData );
134
135        if( io->didWrite )
136            io->didWrite( io, n, next->isPieceData, io->userData );
137
138        bytes_transferred -= chunk_length;
139        next->length -= chunk_length;
140        if( !next->length )
141            tr_free( tr_list_pop_front( &io->output_datatypes ) );
142    }
143
144    if( EVBUFFER_LENGTH( tr_iobuf_output( iobuf ) ) )
145        tr_iobuf_enable( io->iobuf, EV_WRITE );
146}
147
148static void
149canReadWrapper( struct tr_iobuf  * iobuf,
150                size_t             bytes_transferred UNUSED,
151                void              * vio )
152{
153    int          done = 0;
154    int          err = 0;
155    tr_peerIo *  io = vio;
156    tr_session * session = io->session;
157
158    dbgmsg( io, "canRead" );
159
160    /* try to consume the input buffer */
161    if( io->canRead )
162    {
163        tr_globalLock( session );
164
165        while( !done && !err )
166        {
167            size_t piece = 0;
168            const size_t oldLen = EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
169            const int ret = io->canRead( iobuf, io->userData, &piece );
170
171            if( ret != err )
172            {
173                const size_t used = oldLen - EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
174                if( piece )
175                    tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
176                if( used != piece )
177                    tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
178            }
179
180            switch( ret )
181            {
182                case READ_NOW:
183                    if( EVBUFFER_LENGTH( tr_iobuf_input( iobuf )))
184                        continue;
185                    done = 1;
186                    break;
187
188                case READ_LATER:
189                    done = 1;
190                    break;
191
192                case READ_ERR:
193                    err = 1;
194                    break;
195            }
196        }
197
198        tr_globalUnlock( session );
199    }
200}
201
202static void
203gotErrorWrapper( struct tr_iobuf  * iobuf,
204                 short              what,
205                 void             * userData )
206{
207    tr_peerIo * c = userData;
208
209    if( c->gotError )
210        c->gotError( iobuf, what, c->userData );
211}
212
213/**
214***
215**/
216
217static void
218bufevNew( tr_peerIo * io )
219{
220    io->iobuf = tr_iobuf_new( io->session,
221                              io->bandwidth,
222                              io->socket,
223                              EV_READ | EV_WRITE,
224                              canReadWrapper,
225                              didWriteWrapper,
226                              gotErrorWrapper,
227                              io );
228
229    tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
230}
231
232static int
233isPeerIo( const tr_peerIo * io )
234{
235    return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );
236}
237
238static tr_peerIo*
239tr_peerIoNew( tr_session *           session,
240              const struct in_addr * in_addr,
241              uint16_t               port,
242              const uint8_t *        torrentHash,
243              int                    isIncoming,
244              int                    socket )
245{
246    tr_peerIo * io;
247
248    if( socket >= 0 )
249        tr_netSetTOS( socket, session->peerSocketTOS );
250
251    io = tr_new0( tr_peerIo, 1 );
252    io->magicNumber = MAGIC_NUMBER;
253    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
254    io->session = session;
255    io->in_addr = *in_addr;
256    io->port = port;
257    io->socket = socket;
258    io->isIncoming = isIncoming != 0;
259    io->timeout = IO_TIMEOUT_SECS;
260    io->timeCreated = time( NULL );
261    bufevNew( io );
262    tr_peerIoSetBandwidth( io, session->bandwidth );
263    return io;
264}
265
266tr_peerIo*
267tr_peerIoNewIncoming( tr_session *           session,
268                      const struct in_addr * in_addr,
269                      uint16_t               port,
270                      int                    socket )
271{
272    assert( session );
273    assert( in_addr );
274    assert( socket >= 0 );
275
276    return tr_peerIoNew( session, in_addr, port,
277                         NULL, 1,
278                         socket );
279}
280
281tr_peerIo*
282tr_peerIoNewOutgoing( tr_session *           session,
283                      const struct in_addr * in_addr,
284                      int                    port,
285                      const uint8_t *        torrentHash )
286{
287    int socket;
288
289    assert( session );
290    assert( in_addr );
291    assert( port >= 0 );
292    assert( torrentHash );
293
294    socket = tr_netOpenTCP( session, in_addr, port );
295
296    return socket < 0
297           ? NULL
298           : tr_peerIoNew( session, in_addr, port, torrentHash, 0, socket );
299}
300
301static void
302io_dtor( void * vio )
303{
304    tr_peerIo * io = vio;
305
306    tr_peerIoSetBandwidth( io, NULL );
307    tr_iobuf_free( io->iobuf );
308    tr_netClose( io->socket );
309    tr_cryptoFree( io->crypto );
310    tr_list_free( &io->output_datatypes, tr_free );
311
312    io->magicNumber = 0xDEAD;
313    tr_free( io );
314}
315
316void
317tr_peerIoFree( tr_peerIo * io )
318{
319    if( io )
320    {
321        io->canRead = NULL;
322        io->didWrite = NULL;
323        io->gotError = NULL;
324        tr_runInEventThread( io->session, io_dtor, io );
325    }
326}
327
328tr_session*
329tr_peerIoGetSession( tr_peerIo * io )
330{
331    assert( isPeerIo( io ) );
332    assert( io->session );
333
334    return io->session;
335}
336
337const struct in_addr*
338tr_peerIoGetAddress( const tr_peerIo * io,
339                           uint16_t * port )
340{
341    assert( isPeerIo( io ) );
342
343    if( port )
344        *port = io->port;
345
346    return &io->in_addr;
347}
348
349const char*
350tr_peerIoAddrStr( const struct in_addr * addr,
351                  uint16_t               port )
352{
353    static char buf[512];
354
355    tr_snprintf( buf, sizeof( buf ), "%s:%u", inet_ntoa( *addr ),
356                ntohs( port ) );
357    return buf;
358}
359
360const char*
361tr_peerIoGetAddrStr( const tr_peerIo * io )
362{
363    return tr_peerIoAddrStr( &io->in_addr, io->port );
364}
365
366static void
367tr_peerIoTryRead( tr_peerIo * io )
368{
369    if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf )))
370        (*canReadWrapper)( io->iobuf, ~0, io );
371}
372
373void
374tr_peerIoSetIOFuncs( tr_peerIo *     io,
375                     tr_can_read_cb  readcb,
376                     tr_did_write_cb writecb,
377                     tr_net_error_cb errcb,
378                     void *          userData )
379{
380    io->canRead = readcb;
381    io->didWrite = writecb;
382    io->gotError = errcb;
383    io->userData = userData;
384
385    tr_peerIoTryRead( io );
386}
387
388int
389tr_peerIoIsIncoming( const tr_peerIo * c )
390{
391    return c->isIncoming ? 1 : 0;
392}
393
394int
395tr_peerIoReconnect( tr_peerIo * io )
396{
397    assert( !tr_peerIoIsIncoming( io ) );
398
399    if( io->socket >= 0 )
400        tr_netClose( io->socket );
401
402    io->socket = tr_netOpenTCP( io->session, &io->in_addr, io->port );
403
404    if( io->socket >= 0 )
405    {
406        tr_bandwidth * bandwidth = io->bandwidth;
407        tr_peerIoSetBandwidth( io, NULL );
408
409        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
410        tr_iobuf_free( io->iobuf );
411        bufevNew( io );
412
413        tr_peerIoSetBandwidth( io, bandwidth );
414        return 0;
415    }
416
417    return -1;
418}
419
420void
421tr_peerIoSetTimeoutSecs( tr_peerIo * io,
422                         int         secs )
423{
424    io->timeout = secs;
425    tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
426    tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
427}
428
429/**
430***
431**/
432
433void
434tr_peerIoSetTorrentHash( tr_peerIo *     io,
435                         const uint8_t * hash )
436{
437    assert( isPeerIo( io ) );
438
439    tr_cryptoSetTorrentHash( io->crypto, hash );
440}
441
442const uint8_t*
443tr_peerIoGetTorrentHash( tr_peerIo * io )
444{
445    assert( isPeerIo( io ) );
446    assert( io->crypto );
447
448    return tr_cryptoGetTorrentHash( io->crypto );
449}
450
451int
452tr_peerIoHasTorrentHash( const tr_peerIo * io )
453{
454    assert( isPeerIo( io ) );
455    assert( io->crypto );
456
457    return tr_cryptoHasTorrentHash( io->crypto );
458}
459
460/**
461***
462**/
463
464void
465tr_peerIoSetPeersId( tr_peerIo *     io,
466                     const uint8_t * peer_id )
467{
468    assert( isPeerIo( io ) );
469
470    if( ( io->peerIdIsSet = peer_id != NULL ) )
471        memcpy( io->peerId, peer_id, 20 );
472    else
473        memset( io->peerId, 0, 20 );
474}
475
476const uint8_t*
477tr_peerIoGetPeersId( const tr_peerIo * io )
478{
479    assert( isPeerIo( io ) );
480    assert( io->peerIdIsSet );
481
482    return io->peerId;
483}
484
485/**
486***
487**/
488
489void
490tr_peerIoEnableLTEP( tr_peerIo * io,
491                     int         flag )
492{
493    assert( isPeerIo( io ) );
494    assert( flag == 0 || flag == 1 );
495
496    io->extendedProtocolSupported = flag;
497}
498
499void
500tr_peerIoEnableFEXT( tr_peerIo * io,
501                     int         flag )
502{
503    assert( isPeerIo( io ) );
504    assert( flag == 0 || flag == 1 );
505
506    io->fastPeersSupported = flag;
507}
508
509int
510tr_peerIoSupportsLTEP( const tr_peerIo * io )
511{
512    assert( isPeerIo( io ) );
513
514    return io->extendedProtocolSupported;
515}
516
517int
518tr_peerIoSupportsFEXT( const tr_peerIo * io )
519{
520    assert( isPeerIo( io ) );
521
522    return io->fastPeersSupported;
523}
524
525/**
526***
527**/
528
529size_t
530tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
531{
532    const size_t desiredLen = io->session->so_sndbuf * 2; /* FIXME: bigger? */
533    const size_t currentLen = EVBUFFER_LENGTH( tr_iobuf_output( io->iobuf ) );
534    size_t freeSpace = 0;
535
536    if( desiredLen > currentLen )
537        freeSpace = desiredLen - currentLen;
538
539    return freeSpace;
540}
541
542void
543tr_peerIoSetBandwidth( tr_peerIo     * io,
544                       tr_bandwidth  * bandwidth )
545{
546    assert( isPeerIo( io ) );
547
548    if( io->bandwidth )
549        tr_bandwidthRemoveBuffer( io->bandwidth, io->iobuf );
550
551    io->bandwidth = bandwidth;
552    tr_iobuf_set_bandwidth( io->iobuf, bandwidth );
553
554    if( io->bandwidth )
555        tr_bandwidthAddBuffer( io->bandwidth, io->iobuf );
556
557    tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
558}
559
560/**
561***
562**/
563
564tr_crypto*
565tr_peerIoGetCrypto( tr_peerIo * c )
566{
567    return c->crypto;
568}
569
570void
571tr_peerIoSetEncryption( tr_peerIo * io,
572                        int         encryptionMode )
573{
574    assert( isPeerIo( io ) );
575    assert( encryptionMode == PEER_ENCRYPTION_NONE
576          || encryptionMode == PEER_ENCRYPTION_RC4 );
577
578    io->encryptionMode = encryptionMode;
579}
580
581int
582tr_peerIoIsEncrypted( const tr_peerIo * io )
583{
584    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
585}
586
587/**
588***
589**/
590
591void
592tr_peerIoWrite( tr_peerIo   * io,
593                const void  * writeme,
594                size_t        writemeLen,
595                int           isPieceData )
596{
597    struct tr_datatype * datatype;
598    assert( tr_amInEventThread( io->session ) );
599    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
600
601    datatype = tr_new( struct tr_datatype, 1 );
602    datatype->isPieceData = isPieceData != 0;
603    datatype->length = writemeLen;
604    tr_list_append( &io->output_datatypes, datatype );
605
606    evbuffer_add( tr_iobuf_output( io->iobuf ), writeme, writemeLen );
607    tr_iobuf_enable( io->iobuf, EV_WRITE );
608}
609
610void
611tr_peerIoWriteBuf( tr_peerIo         * io,
612                   struct evbuffer   * buf,
613                   int                 isPieceData )
614{
615    const size_t n = EVBUFFER_LENGTH( buf );
616
617    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
618    evbuffer_drain( buf, n );
619}
620
621/**
622***
623**/
624
625void
626tr_peerIoWriteBytes( tr_peerIo *       io,
627                     struct evbuffer * outbuf,
628                     const void *      bytes,
629                     size_t            byteCount )
630{
631    uint8_t * tmp;
632
633    switch( io->encryptionMode )
634    {
635        case PEER_ENCRYPTION_NONE:
636            evbuffer_add( outbuf, bytes, byteCount );
637            break;
638
639        case PEER_ENCRYPTION_RC4:
640            tmp = tr_new( uint8_t, byteCount );
641            tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
642            evbuffer_add( outbuf, tmp, byteCount );
643            tr_free( tmp );
644            break;
645
646        default:
647            assert( 0 );
648    }
649}
650
651void
652tr_peerIoWriteUint8( tr_peerIo *       io,
653                     struct evbuffer * outbuf,
654                     uint8_t           writeme )
655{
656    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
657}
658
659void
660tr_peerIoWriteUint16( tr_peerIo *       io,
661                      struct evbuffer * outbuf,
662                      uint16_t          writeme )
663{
664    uint16_t tmp = htons( writeme );
665
666    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
667}
668
669void
670tr_peerIoWriteUint32( tr_peerIo *       io,
671                      struct evbuffer * outbuf,
672                      uint32_t          writeme )
673{
674    uint32_t tmp = htonl( writeme );
675
676    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
677}
678
679/***
680****
681***/
682
683void
684tr_peerIoReadBytes( tr_peerIo *       io,
685                    struct evbuffer * inbuf,
686                    void *            bytes,
687                    size_t            byteCount )
688{
689    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
690
691    switch( io->encryptionMode )
692    {
693        case PEER_ENCRYPTION_NONE:
694            evbuffer_remove( inbuf, bytes, byteCount );
695            break;
696
697        case PEER_ENCRYPTION_RC4:
698            evbuffer_remove( inbuf, bytes, byteCount );
699            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
700            break;
701
702        default:
703            assert( 0 );
704    }
705}
706
707void
708tr_peerIoReadUint8( tr_peerIo *       io,
709                    struct evbuffer * inbuf,
710                    uint8_t *         setme )
711{
712    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
713}
714
715void
716tr_peerIoReadUint16( tr_peerIo *       io,
717                     struct evbuffer * inbuf,
718                     uint16_t *        setme )
719{
720    uint16_t tmp;
721
722    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
723    *setme = ntohs( tmp );
724}
725
726void
727tr_peerIoReadUint32( tr_peerIo *       io,
728                     struct evbuffer * inbuf,
729                     uint32_t *        setme )
730{
731    uint32_t tmp;
732
733    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
734    *setme = ntohl( tmp );
735}
736
737void
738tr_peerIoDrain( tr_peerIo *       io,
739                struct evbuffer * inbuf,
740                size_t            byteCount )
741{
742    uint8_t * tmp = tr_new( uint8_t, byteCount );
743
744    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
745    tr_free( tmp );
746}
747
748int
749tr_peerIoGetAge( const tr_peerIo * io )
750{
751    return time( NULL ) - io->timeCreated;
752}
Note: See TracBrowser for help on using the repository browser.