source: trunk/libtransmission/peer-io.c @ 7404

Last change on this file since 7404 was 7404, checked in by charles, 12 years ago

updated email address

  • Property svn:keywords set to Date Rev Author Id
File size: 18.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7404 2008-12-16 00:20:44Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <arpa/inet.h> /* inet_ntoa */
23#endif
24
25#include <event.h>
26
27#include "transmission.h"
28#include "bandwidth.h"
29#include "crypto.h"
30#include "iobuf.h"
31#include "list.h"
32#include "net.h"
33#include "peer-io.h"
34#include "trevent.h"
35#include "utils.h"
36
37#define MAGIC_NUMBER 206745
38#define IO_TIMEOUT_SECS 8
39
40static size_t
41getPacketOverhead( size_t d )
42{
43    /**
44     * http://sd.wareonearth.com/~phil/net/overhead/
45     *
46     * TCP over Ethernet:
47     * Assuming no header compression (e.g. not PPP)
48     * Add 20 IPv4 header or 40 IPv6 header (no options)
49     * Add 20 TCP header
50     * Add 12 bytes optional TCP timestamps
51     * Max TCP Payload data rates over ethernet are thus:
52     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
53     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
54     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
55     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
56     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
57     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
58     */
59    static const double assumed_payload_data_rate = 94.0;
60
61    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
62}
63
64/**
65***
66**/
67
68#define dbgmsg( io, ... ) \
69    do { \
70        if( tr_deepLoggingIsActive( ) ) \
71            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
72    } while( 0 )
73
74struct tr_datatype
75{
76    tr_bool  isPieceData;
77    size_t   length;
78};
79
80struct tr_peerIo
81{
82    tr_bool                  isEncrypted;
83    tr_bool                  isIncoming;
84    tr_bool                  peerIdIsSet;
85    tr_bool                  extendedProtocolSupported;
86    tr_bool                  fastExtensionSupported;
87
88    int                      magicNumber;
89
90    uint8_t                  encryptionMode;
91    uint8_t                  timeout;
92    tr_port                  port;
93    int                      socket;
94
95    uint8_t                  peerId[20];
96    time_t                   timeCreated;
97
98    tr_session             * session;
99
100    tr_address               addr;
101    struct tr_iobuf        * iobuf;
102    tr_list                * output_datatypes; /* struct tr_datatype */
103
104    tr_can_read_cb           canRead;
105    tr_did_write_cb          didWrite;
106    tr_net_error_cb          gotError;
107    void *                   userData;
108
109    size_t                   bufferSize[2];
110
111    tr_bandwidth           * bandwidth;
112    tr_crypto              * crypto;
113};
114
115/***
116****
117***/
118
119static void
120didWriteWrapper( struct tr_iobuf  * iobuf,
121                 size_t             bytes_transferred,
122                 void             * vio )
123{
124    tr_peerIo *  io = vio;
125
126    while( bytes_transferred )
127    {
128        struct tr_datatype * next = io->output_datatypes->data;
129        const size_t payload = MIN( next->length, bytes_transferred );
130        const size_t overhead = getPacketOverhead( payload );
131
132        tr_bandwidthUsed( io->bandwidth, TR_UP, payload, next->isPieceData );
133
134        if( overhead > 0 )
135            tr_bandwidthUsed( io->bandwidth, TR_UP, overhead, FALSE );
136
137        if( io->didWrite )
138            io->didWrite( io, payload, next->isPieceData, io->userData );
139
140        bytes_transferred -= payload;
141        next->length -= payload;
142        if( !next->length )
143            tr_free( tr_list_pop_front( &io->output_datatypes ) );
144    }
145
146    if( EVBUFFER_LENGTH( tr_iobuf_output( iobuf ) ) )
147        tr_iobuf_enable( io->iobuf, EV_WRITE );
148}
149
150static void
151canReadWrapper( struct tr_iobuf  * iobuf,
152                size_t             bytes_transferred UNUSED,
153                void              * vio )
154{
155    int          done = 0;
156    int          err = 0;
157    tr_peerIo *  io = vio;
158    tr_session * session = io->session;
159
160    dbgmsg( io, "canRead" );
161
162    /* try to consume the input buffer */
163    if( io->canRead )
164    {
165        tr_globalLock( session );
166
167        while( !done && !err )
168        {
169            size_t piece = 0;
170            const size_t oldLen = EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
171            const int ret = io->canRead( iobuf, io->userData, &piece );
172
173            if( ret != READ_ERR )
174            {
175                const size_t used = oldLen - EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
176                if( piece )
177                    tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
178                if( used != piece )
179                    tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
180            }
181
182            switch( ret )
183            {
184                case READ_NOW:
185                    if( EVBUFFER_LENGTH( tr_iobuf_input( iobuf )))
186                        continue;
187                    done = 1;
188                    break;
189
190                case READ_LATER:
191                    done = 1;
192                    break;
193
194                case READ_ERR:
195                    err = 1;
196                    break;
197            }
198        }
199
200        tr_globalUnlock( session );
201    }
202}
203
204static void
205gotErrorWrapper( struct tr_iobuf  * iobuf,
206                 short              what,
207                 void             * userData )
208{
209    tr_peerIo * c = userData;
210
211    if( c->gotError )
212        c->gotError( iobuf, what, c->userData );
213}
214
215/**
216***
217**/
218
219static void
220bufevNew( tr_peerIo * io )
221{
222    io->iobuf = tr_iobuf_new( io->session,
223                              io->bandwidth,
224                              io->socket,
225                              EV_READ | EV_WRITE,
226                              canReadWrapper,
227                              didWriteWrapper,
228                              gotErrorWrapper,
229                              io );
230
231    tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
232}
233
234static int
235isPeerIo( const tr_peerIo * io )
236{
237    return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );
238}
239
240static int
241isFlag( int flag )
242{
243    return( ( flag == 0 ) || ( flag == 1 ) );
244}
245
246static tr_peerIo*
247tr_peerIoNew( tr_session       * session,
248              const tr_address * addr,
249              tr_port            port,
250              const uint8_t    * torrentHash,
251              int                isIncoming,
252              int                socket )
253{
254    tr_peerIo * io;
255
256    if( socket >= 0 )
257        tr_netSetTOS( socket, session->peerSocketTOS );
258
259    io = tr_new0( tr_peerIo, 1 );
260    io->magicNumber = MAGIC_NUMBER;
261    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
262    io->session = session;
263    io->addr = *addr;
264    io->port = port;
265    io->socket = socket;
266    io->isIncoming = isIncoming != 0;
267    io->timeout = IO_TIMEOUT_SECS;
268    io->timeCreated = time( NULL );
269    bufevNew( io );
270    tr_peerIoSetBandwidth( io, session->bandwidth );
271    return io;
272}
273
274tr_peerIo*
275tr_peerIoNewIncoming( tr_session       * session,
276                      const tr_address * addr,
277                      tr_port            port,
278                      int                socket )
279{
280    assert( session );
281    assert( addr );
282    assert( socket >= 0 );
283
284    return tr_peerIoNew( session, addr, port, NULL, 1, socket );
285}
286
287tr_peerIo*
288tr_peerIoNewOutgoing( tr_session       * session,
289                      const tr_address * addr,
290                      tr_port            port,
291                      const uint8_t    * torrentHash )
292{
293    int socket;
294
295    assert( session );
296    assert( addr );
297    assert( torrentHash );
298
299    socket = tr_netOpenTCP( session, addr, port );
300
301    return socket < 0
302           ? NULL
303           : tr_peerIoNew( session, addr, port, torrentHash, 0, socket );
304}
305
306static void
307io_dtor( void * vio )
308{
309    tr_peerIo * io = vio;
310
311    tr_peerIoSetBandwidth( io, NULL );
312    tr_iobuf_free( io->iobuf );
313    tr_netClose( io->socket );
314    tr_cryptoFree( io->crypto );
315    tr_list_free( &io->output_datatypes, tr_free );
316
317    io->magicNumber = 0xDEAD;
318    tr_free( io );
319}
320
321void
322tr_peerIoFree( tr_peerIo * io )
323{
324    if( io )
325    {
326        io->canRead = NULL;
327        io->didWrite = NULL;
328        io->gotError = NULL;
329        tr_runInEventThread( io->session, io_dtor, io );
330    }
331}
332
333tr_session*
334tr_peerIoGetSession( tr_peerIo * io )
335{
336    assert( isPeerIo( io ) );
337    assert( io->session );
338
339    return io->session;
340}
341
342const tr_address*
343tr_peerIoGetAddress( const tr_peerIo * io,
344                           tr_port   * port )
345{
346    assert( isPeerIo( io ) );
347
348    if( port )
349        *port = io->port;
350
351    return &io->addr;
352}
353
354const char*
355tr_peerIoAddrStr( const tr_address * addr,
356                  tr_port            port )
357{
358    static char buf[512];
359
360    if( addr->type == TR_AF_INET ) 
361        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
362    else 
363        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
364    return buf;
365}
366
367const char*
368tr_peerIoGetAddrStr( const tr_peerIo * io )
369{
370    return tr_peerIoAddrStr( &io->addr, io->port );
371}
372
373static void
374tr_peerIoTryRead( tr_peerIo * io )
375{
376    if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf )))
377        (*canReadWrapper)( io->iobuf, ~0, io );
378}
379
380void
381tr_peerIoSetIOFuncs( tr_peerIo *     io,
382                     tr_can_read_cb  readcb,
383                     tr_did_write_cb writecb,
384                     tr_net_error_cb errcb,
385                     void *          userData )
386{
387    io->canRead = readcb;
388    io->didWrite = writecb;
389    io->gotError = errcb;
390    io->userData = userData;
391
392    tr_peerIoTryRead( io );
393}
394
395int
396tr_peerIoIsIncoming( const tr_peerIo * c )
397{
398    return c->isIncoming ? 1 : 0;
399}
400
401int
402tr_peerIoReconnect( tr_peerIo * io )
403{
404    assert( !tr_peerIoIsIncoming( io ) );
405
406    if( io->socket >= 0 )
407        tr_netClose( io->socket );
408
409    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port );
410
411    if( io->socket >= 0 )
412    {
413        tr_bandwidth * bandwidth = io->bandwidth;
414        tr_peerIoSetBandwidth( io, NULL );
415
416        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
417        tr_iobuf_free( io->iobuf );
418        bufevNew( io );
419
420        tr_peerIoSetBandwidth( io, bandwidth );
421        return 0;
422    }
423
424    return -1;
425}
426
427void
428tr_peerIoSetTimeoutSecs( tr_peerIo * io,
429                         int         secs )
430{
431    io->timeout = secs;
432    tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
433    tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
434}
435
436/**
437***
438**/
439
440void
441tr_peerIoSetTorrentHash( tr_peerIo *     io,
442                         const uint8_t * hash )
443{
444    assert( isPeerIo( io ) );
445
446    tr_cryptoSetTorrentHash( io->crypto, hash );
447}
448
449const uint8_t*
450tr_peerIoGetTorrentHash( tr_peerIo * io )
451{
452    assert( isPeerIo( io ) );
453    assert( io->crypto );
454
455    return tr_cryptoGetTorrentHash( io->crypto );
456}
457
458int
459tr_peerIoHasTorrentHash( const tr_peerIo * io )
460{
461    assert( isPeerIo( io ) );
462    assert( io->crypto );
463
464    return tr_cryptoHasTorrentHash( io->crypto );
465}
466
467/**
468***
469**/
470
471void
472tr_peerIoSetPeersId( tr_peerIo *     io,
473                     const uint8_t * peer_id )
474{
475    assert( isPeerIo( io ) );
476
477    if( ( io->peerIdIsSet = peer_id != NULL ) )
478        memcpy( io->peerId, peer_id, 20 );
479    else
480        memset( io->peerId, 0, 20 );
481}
482
483const uint8_t*
484tr_peerIoGetPeersId( const tr_peerIo * io )
485{
486    assert( isPeerIo( io ) );
487    assert( io->peerIdIsSet );
488
489    return io->peerId;
490}
491
492/**
493***
494**/
495
496void
497tr_peerIoEnableFEXT( tr_peerIo * io,
498                     tr_bool     flag )
499{
500    assert( isPeerIo( io ) );
501    assert( isFlag( flag ) );
502
503    dbgmsg( io, "setting FEXT support flag to %d", (flag?1:0) );
504    io->fastExtensionSupported = flag;
505}
506
507tr_bool
508tr_peerIoSupportsFEXT( const tr_peerIo * io )
509{
510    assert( isPeerIo( io ) );
511
512    return io->fastExtensionSupported;
513}
514
515/**
516***
517**/
518
519void
520tr_peerIoEnableLTEP( tr_peerIo  * io,
521                     tr_bool      flag )
522{
523    assert( isPeerIo( io ) );
524    assert( isFlag( flag ) );
525
526    dbgmsg( io, "setting LTEP support flag to %d", (flag?1:0) );
527    io->extendedProtocolSupported = flag;
528}
529
530tr_bool
531tr_peerIoSupportsLTEP( const tr_peerIo * io )
532{
533    assert( isPeerIo( io ) );
534
535    return io->extendedProtocolSupported;
536}
537
538/**
539***
540**/
541
542static size_t
543getDesiredOutputBufferSize( const tr_peerIo * io )
544{
545    /* this is all kind of arbitrary, but what seems to work well is
546     * being large enough to hold the next 20 seconds' worth of input,
547     * or a few blocks, whichever is bigger.
548     * It's okay to tweak this as needed */
549    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
550    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
551    const double period = 20; /* arbitrary */
552    return MAX( maxBlockSize*5.5, currentSpeed*1024*period );
553}
554
555size_t
556tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
557{
558    const size_t desiredLen = getDesiredOutputBufferSize( io );
559    const size_t currentLen = EVBUFFER_LENGTH( tr_iobuf_output( io->iobuf ) );
560    size_t freeSpace = 0;
561
562    if( desiredLen > currentLen )
563        freeSpace = desiredLen - currentLen;
564
565    return freeSpace;
566}
567
568void
569tr_peerIoSetBandwidth( tr_peerIo     * io,
570                       tr_bandwidth  * bandwidth )
571{
572    assert( isPeerIo( io ) );
573
574    if( io->bandwidth )
575        tr_bandwidthRemoveBuffer( io->bandwidth, io->iobuf );
576
577    io->bandwidth = bandwidth;
578    tr_iobuf_set_bandwidth( io->iobuf, bandwidth );
579
580    if( io->bandwidth )
581        tr_bandwidthAddBuffer( io->bandwidth, io->iobuf );
582
583    tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
584}
585
586/**
587***
588**/
589
590tr_crypto*
591tr_peerIoGetCrypto( tr_peerIo * c )
592{
593    return c->crypto;
594}
595
596void
597tr_peerIoSetEncryption( tr_peerIo * io,
598                        int         encryptionMode )
599{
600    assert( isPeerIo( io ) );
601    assert( encryptionMode == PEER_ENCRYPTION_NONE
602          || encryptionMode == PEER_ENCRYPTION_RC4 );
603
604    io->encryptionMode = encryptionMode;
605}
606
607int
608tr_peerIoIsEncrypted( const tr_peerIo * io )
609{
610    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
611}
612
613/**
614***
615**/
616
617void
618tr_peerIoWrite( tr_peerIo   * io,
619                const void  * writeme,
620                size_t        writemeLen,
621                int           isPieceData )
622{
623    struct tr_datatype * datatype;
624    assert( tr_amInEventThread( io->session ) );
625    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
626
627    datatype = tr_new( struct tr_datatype, 1 );
628    datatype->isPieceData = isPieceData != 0;
629    datatype->length = writemeLen;
630    tr_list_append( &io->output_datatypes, datatype );
631
632    evbuffer_add( tr_iobuf_output( io->iobuf ), writeme, writemeLen );
633    tr_iobuf_enable( io->iobuf, EV_WRITE );
634}
635
636void
637tr_peerIoWriteBuf( tr_peerIo         * io,
638                   struct evbuffer   * buf,
639                   int                 isPieceData )
640{
641    const size_t n = EVBUFFER_LENGTH( buf );
642
643    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
644    evbuffer_drain( buf, n );
645}
646
647/**
648***
649**/
650
651void
652tr_peerIoWriteBytes( tr_peerIo *       io,
653                     struct evbuffer * outbuf,
654                     const void *      bytes,
655                     size_t            byteCount )
656{
657    uint8_t * tmp;
658
659    switch( io->encryptionMode )
660    {
661        case PEER_ENCRYPTION_NONE:
662            evbuffer_add( outbuf, bytes, byteCount );
663            break;
664
665        case PEER_ENCRYPTION_RC4:
666            tmp = tr_new( uint8_t, byteCount );
667            tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
668            evbuffer_add( outbuf, tmp, byteCount );
669            tr_free( tmp );
670            break;
671
672        default:
673            assert( 0 );
674    }
675}
676
677void
678tr_peerIoWriteUint8( tr_peerIo *       io,
679                     struct evbuffer * outbuf,
680                     uint8_t           writeme )
681{
682    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
683}
684
685void
686tr_peerIoWriteUint16( tr_peerIo *       io,
687                      struct evbuffer * outbuf,
688                      uint16_t          writeme )
689{
690    uint16_t tmp = htons( writeme );
691
692    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
693}
694
695void
696tr_peerIoWriteUint32( tr_peerIo *       io,
697                      struct evbuffer * outbuf,
698                      uint32_t          writeme )
699{
700    uint32_t tmp = htonl( writeme );
701
702    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
703}
704
705/***
706****
707***/
708
709void
710tr_peerIoReadBytes( tr_peerIo *       io,
711                    struct evbuffer * inbuf,
712                    void *            bytes,
713                    size_t            byteCount )
714{
715    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
716
717    switch( io->encryptionMode )
718    {
719        case PEER_ENCRYPTION_NONE:
720            evbuffer_remove( inbuf, bytes, byteCount );
721            break;
722
723        case PEER_ENCRYPTION_RC4:
724            evbuffer_remove( inbuf, bytes, byteCount );
725            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
726            break;
727
728        default:
729            assert( 0 );
730    }
731}
732
733void
734tr_peerIoReadUint8( tr_peerIo *       io,
735                    struct evbuffer * inbuf,
736                    uint8_t *         setme )
737{
738    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
739}
740
741void
742tr_peerIoReadUint16( tr_peerIo *       io,
743                     struct evbuffer * inbuf,
744                     uint16_t *        setme )
745{
746    uint16_t tmp;
747
748    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
749    *setme = ntohs( tmp );
750}
751
752void
753tr_peerIoReadUint32( tr_peerIo *       io,
754                     struct evbuffer * inbuf,
755                     uint32_t *        setme )
756{
757    uint32_t tmp;
758
759    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
760    *setme = ntohl( tmp );
761}
762
763void
764tr_peerIoDrain( tr_peerIo *       io,
765                struct evbuffer * inbuf,
766                size_t            byteCount )
767{
768    uint8_t * tmp = tr_new( uint8_t, byteCount );
769
770    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
771    tr_free( tmp );
772}
773
774int
775tr_peerIoGetAge( const tr_peerIo * io )
776{
777    return time( NULL ) - io->timeCreated;
778}
Note: See TracBrowser for help on using the repository browser.