source: trunk/libtransmission/peer-io.c @ 7231

Last change on this file since 7231 was 7231, checked in by charles, 13 years ago

(libT) re-apply jhujhiti's IPv6 patch. This merges in my tr_port cleanup, so any new bugs are mine :/

  • Property svn:keywords set to Date Rev Author Id
File size: 17.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7231 2008-12-02 03:41:58Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <arpa/inet.h> /* inet_ntoa */
23#endif
24
25#include <event.h>
26
27#include "transmission.h"
28#include "bandwidth.h"
29#include "crypto.h"
30#include "iobuf.h"
31#include "list.h"
32#include "net.h"
33#include "peer-io.h"
34#include "trevent.h"
35#include "utils.h"
36
37#define MAGIC_NUMBER 206745
38#define IO_TIMEOUT_SECS 8
39
40static size_t
41getPacketOverhead( size_t d )
42{
43    /**
44     * http://sd.wareonearth.com/~phil/net/overhead/
45     *
46     * TCP over Ethernet:
47     * Assuming no header compression (e.g. not PPP)
48     * Add 20 IPv4 header or 40 IPv6 header (no options)
49     * Add 20 TCP header
50     * Add 12 bytes optional TCP timestamps
51     * Max TCP Payload data rates over ethernet are thus:
52     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
53     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
54     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
55     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
56     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
57     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
58     */
59    static const double assumed_payload_data_rate = 94.0;
60
61    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
62}
63
64/**
65***
66**/
67
68#define dbgmsg( io, ... ) \
69    do { \
70        if( tr_deepLoggingIsActive( ) ) \
71            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
72    } while( 0 )
73
74struct tr_datatype
75{
76    tr_bool  isPieceData;
77    size_t   length;
78};
79
80struct tr_peerIo
81{
82    tr_bool                  isEncrypted;
83    tr_bool                  isIncoming;
84    tr_bool                  peerIdIsSet;
85    tr_bool                  extendedProtocolSupported;
86
87    int                      magicNumber;
88
89    uint8_t                  encryptionMode;
90    uint8_t                  timeout;
91    tr_port                  port;
92    int                      socket;
93
94    uint8_t                  peerId[20];
95    time_t                   timeCreated;
96
97    tr_session             * session;
98
99    tr_address               addr;
100    struct tr_iobuf        * iobuf;
101    tr_list                * output_datatypes; /* struct tr_datatype */
102
103    tr_can_read_cb           canRead;
104    tr_did_write_cb          didWrite;
105    tr_net_error_cb          gotError;
106    void *                   userData;
107
108    size_t                   bufferSize[2];
109
110    tr_bandwidth           * bandwidth;
111    tr_crypto              * crypto;
112};
113
114/***
115****
116***/
117
118static void
119didWriteWrapper( struct tr_iobuf  * iobuf,
120                 size_t             bytes_transferred,
121                 void             * vio )
122{
123    tr_peerIo *  io = vio;
124
125    while( bytes_transferred )
126    {
127        struct tr_datatype * next = io->output_datatypes->data;
128        const size_t payload = MIN( next->length, bytes_transferred );
129        const size_t overhead = getPacketOverhead( payload );
130
131        tr_bandwidthUsed( io->bandwidth, TR_UP, payload, next->isPieceData );
132
133        if( overhead > 0 )
134            tr_bandwidthUsed( io->bandwidth, TR_UP, overhead, FALSE );
135
136        if( io->didWrite )
137            io->didWrite( io, payload, next->isPieceData, io->userData );
138
139        bytes_transferred -= payload;
140        next->length -= payload;
141        if( !next->length )
142            tr_free( tr_list_pop_front( &io->output_datatypes ) );
143    }
144
145    if( EVBUFFER_LENGTH( tr_iobuf_output( iobuf ) ) )
146        tr_iobuf_enable( io->iobuf, EV_WRITE );
147}
148
149static void
150canReadWrapper( struct tr_iobuf  * iobuf,
151                size_t             bytes_transferred UNUSED,
152                void              * vio )
153{
154    int          done = 0;
155    int          err = 0;
156    tr_peerIo *  io = vio;
157    tr_session * session = io->session;
158
159    dbgmsg( io, "canRead" );
160
161    /* try to consume the input buffer */
162    if( io->canRead )
163    {
164        tr_globalLock( session );
165
166        while( !done && !err )
167        {
168            size_t piece = 0;
169            const size_t oldLen = EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
170            const int ret = io->canRead( iobuf, io->userData, &piece );
171
172            if( ret != READ_ERR )
173            {
174                const size_t used = oldLen - EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
175                if( piece )
176                    tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
177                if( used != piece )
178                    tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
179            }
180
181            switch( ret )
182            {
183                case READ_NOW:
184                    if( EVBUFFER_LENGTH( tr_iobuf_input( iobuf )))
185                        continue;
186                    done = 1;
187                    break;
188
189                case READ_LATER:
190                    done = 1;
191                    break;
192
193                case READ_ERR:
194                    err = 1;
195                    break;
196            }
197        }
198
199        tr_globalUnlock( session );
200    }
201}
202
203static void
204gotErrorWrapper( struct tr_iobuf  * iobuf,
205                 short              what,
206                 void             * userData )
207{
208    tr_peerIo * c = userData;
209
210    if( c->gotError )
211        c->gotError( iobuf, what, c->userData );
212}
213
214/**
215***
216**/
217
218static void
219bufevNew( tr_peerIo * io )
220{
221    io->iobuf = tr_iobuf_new( io->session,
222                              io->bandwidth,
223                              io->socket,
224                              EV_READ | EV_WRITE,
225                              canReadWrapper,
226                              didWriteWrapper,
227                              gotErrorWrapper,
228                              io );
229
230    tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
231}
232
233static int
234isPeerIo( const tr_peerIo * io )
235{
236    return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );
237}
238
239static tr_peerIo*
240tr_peerIoNew( tr_session       * session,
241              const tr_address * addr,
242              tr_port            port,
243              const uint8_t    * torrentHash,
244              int                isIncoming,
245              int                socket )
246{
247    tr_peerIo * io;
248
249    if( socket >= 0 )
250        tr_netSetTOS( socket, session->peerSocketTOS );
251
252    io = tr_new0( tr_peerIo, 1 );
253    io->magicNumber = MAGIC_NUMBER;
254    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
255    io->session = session;
256    io->addr = *addr;
257    io->port = port;
258    io->socket = socket;
259    io->isIncoming = isIncoming != 0;
260    io->timeout = IO_TIMEOUT_SECS;
261    io->timeCreated = time( NULL );
262    bufevNew( io );
263    tr_peerIoSetBandwidth( io, session->bandwidth );
264    return io;
265}
266
267tr_peerIo*
268tr_peerIoNewIncoming( tr_session       * session,
269                      const tr_address * addr,
270                      tr_port            port,
271                      int                socket )
272{
273    assert( session );
274    assert( addr );
275    assert( socket >= 0 );
276
277    return tr_peerIoNew( session, addr, port, NULL, 1, socket );
278}
279
280tr_peerIo*
281tr_peerIoNewOutgoing( tr_session       * session,
282                      const tr_address * addr,
283                      tr_port            port,
284                      const uint8_t    * torrentHash )
285{
286    int socket;
287
288    assert( session );
289    assert( addr );
290    assert( torrentHash );
291
292    socket = tr_netOpenTCP( session, addr, port );
293
294    return socket < 0
295           ? NULL
296           : tr_peerIoNew( session, addr, port, torrentHash, 0, socket );
297}
298
299static void
300io_dtor( void * vio )
301{
302    tr_peerIo * io = vio;
303
304    tr_peerIoSetBandwidth( io, NULL );
305    tr_iobuf_free( io->iobuf );
306    tr_netClose( io->socket );
307    tr_cryptoFree( io->crypto );
308    tr_list_free( &io->output_datatypes, tr_free );
309
310    io->magicNumber = 0xDEAD;
311    tr_free( io );
312}
313
314void
315tr_peerIoFree( tr_peerIo * io )
316{
317    if( io )
318    {
319        io->canRead = NULL;
320        io->didWrite = NULL;
321        io->gotError = NULL;
322        tr_runInEventThread( io->session, io_dtor, io );
323    }
324}
325
326tr_session*
327tr_peerIoGetSession( tr_peerIo * io )
328{
329    assert( isPeerIo( io ) );
330    assert( io->session );
331
332    return io->session;
333}
334
335const tr_address*
336tr_peerIoGetAddress( const tr_peerIo * io,
337                           tr_port   * port )
338{
339    assert( isPeerIo( io ) );
340
341    if( port )
342        *port = io->port;
343
344    return &io->addr;
345}
346
347const char*
348tr_peerIoAddrStr( const tr_address * addr,
349                  tr_port            port )
350{
351    static char buf[512];
352
353    if( addr->type == TR_AF_INET ) 
354        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
355    else 
356        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
357    return buf;
358}
359
360const char*
361tr_peerIoGetAddrStr( const tr_peerIo * io )
362{
363    return tr_peerIoAddrStr( &io->addr, io->port );
364}
365
366static void
367tr_peerIoTryRead( tr_peerIo * io )
368{
369    if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf )))
370        (*canReadWrapper)( io->iobuf, ~0, io );
371}
372
373void
374tr_peerIoSetIOFuncs( tr_peerIo *     io,
375                     tr_can_read_cb  readcb,
376                     tr_did_write_cb writecb,
377                     tr_net_error_cb errcb,
378                     void *          userData )
379{
380    io->canRead = readcb;
381    io->didWrite = writecb;
382    io->gotError = errcb;
383    io->userData = userData;
384
385    tr_peerIoTryRead( io );
386}
387
388int
389tr_peerIoIsIncoming( const tr_peerIo * c )
390{
391    return c->isIncoming ? 1 : 0;
392}
393
394int
395tr_peerIoReconnect( tr_peerIo * io )
396{
397    assert( !tr_peerIoIsIncoming( io ) );
398
399    if( io->socket >= 0 )
400        tr_netClose( io->socket );
401
402    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port );
403
404    if( io->socket >= 0 )
405    {
406        tr_bandwidth * bandwidth = io->bandwidth;
407        tr_peerIoSetBandwidth( io, NULL );
408
409        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
410        tr_iobuf_free( io->iobuf );
411        bufevNew( io );
412
413        tr_peerIoSetBandwidth( io, bandwidth );
414        return 0;
415    }
416
417    return -1;
418}
419
420void
421tr_peerIoSetTimeoutSecs( tr_peerIo * io,
422                         int         secs )
423{
424    io->timeout = secs;
425    tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
426    tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
427}
428
429/**
430***
431**/
432
433void
434tr_peerIoSetTorrentHash( tr_peerIo *     io,
435                         const uint8_t * hash )
436{
437    assert( isPeerIo( io ) );
438
439    tr_cryptoSetTorrentHash( io->crypto, hash );
440}
441
442const uint8_t*
443tr_peerIoGetTorrentHash( tr_peerIo * io )
444{
445    assert( isPeerIo( io ) );
446    assert( io->crypto );
447
448    return tr_cryptoGetTorrentHash( io->crypto );
449}
450
451int
452tr_peerIoHasTorrentHash( const tr_peerIo * io )
453{
454    assert( isPeerIo( io ) );
455    assert( io->crypto );
456
457    return tr_cryptoHasTorrentHash( io->crypto );
458}
459
460/**
461***
462**/
463
464void
465tr_peerIoSetPeersId( tr_peerIo *     io,
466                     const uint8_t * peer_id )
467{
468    assert( isPeerIo( io ) );
469
470    if( ( io->peerIdIsSet = peer_id != NULL ) )
471        memcpy( io->peerId, peer_id, 20 );
472    else
473        memset( io->peerId, 0, 20 );
474}
475
476const uint8_t*
477tr_peerIoGetPeersId( const tr_peerIo * io )
478{
479    assert( isPeerIo( io ) );
480    assert( io->peerIdIsSet );
481
482    return io->peerId;
483}
484
485/**
486***
487**/
488
489void
490tr_peerIoEnableLTEP( tr_peerIo * io,
491                     int         flag )
492{
493    assert( isPeerIo( io ) );
494    assert( flag == 0 || flag == 1 );
495
496    io->extendedProtocolSupported = flag;
497}
498
499int
500tr_peerIoSupportsLTEP( const tr_peerIo * io )
501{
502    assert( isPeerIo( io ) );
503
504    return io->extendedProtocolSupported;
505}
506
507/**
508***
509**/
510
511static size_t
512getDesiredOutputBufferSize( const tr_peerIo * io )
513{
514    /* this is all kind of arbitrary, but what seems to work well is
515     * being large enough to hold the next 15 seconds' worth of input,
516     * or two and a half blocks, whichever is bigger.
517     * It's okay to tweak this as needed */
518    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
519    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
520    const double period = 20; /* arbitrary */
521    return MAX( maxBlockSize*2.5, currentSpeed*1024*period );
522}
523
524size_t
525tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
526{
527    const size_t desiredLen = getDesiredOutputBufferSize( io );
528    const size_t currentLen = EVBUFFER_LENGTH( tr_iobuf_output( io->iobuf ) );
529    size_t freeSpace = 0;
530
531    if( desiredLen > currentLen )
532        freeSpace = desiredLen - currentLen;
533
534    return freeSpace;
535}
536
537void
538tr_peerIoSetBandwidth( tr_peerIo     * io,
539                       tr_bandwidth  * bandwidth )
540{
541    assert( isPeerIo( io ) );
542
543    if( io->bandwidth )
544        tr_bandwidthRemoveBuffer( io->bandwidth, io->iobuf );
545
546    io->bandwidth = bandwidth;
547    tr_iobuf_set_bandwidth( io->iobuf, bandwidth );
548
549    if( io->bandwidth )
550        tr_bandwidthAddBuffer( io->bandwidth, io->iobuf );
551
552    tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
553}
554
555/**
556***
557**/
558
559tr_crypto*
560tr_peerIoGetCrypto( tr_peerIo * c )
561{
562    return c->crypto;
563}
564
565void
566tr_peerIoSetEncryption( tr_peerIo * io,
567                        int         encryptionMode )
568{
569    assert( isPeerIo( io ) );
570    assert( encryptionMode == PEER_ENCRYPTION_NONE
571          || encryptionMode == PEER_ENCRYPTION_RC4 );
572
573    io->encryptionMode = encryptionMode;
574}
575
576int
577tr_peerIoIsEncrypted( const tr_peerIo * io )
578{
579    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
580}
581
582/**
583***
584**/
585
586void
587tr_peerIoWrite( tr_peerIo   * io,
588                const void  * writeme,
589                size_t        writemeLen,
590                int           isPieceData )
591{
592    struct tr_datatype * datatype;
593    assert( tr_amInEventThread( io->session ) );
594    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
595
596    datatype = tr_new( struct tr_datatype, 1 );
597    datatype->isPieceData = isPieceData != 0;
598    datatype->length = writemeLen;
599    tr_list_append( &io->output_datatypes, datatype );
600
601    evbuffer_add( tr_iobuf_output( io->iobuf ), writeme, writemeLen );
602    tr_iobuf_enable( io->iobuf, EV_WRITE );
603}
604
605void
606tr_peerIoWriteBuf( tr_peerIo         * io,
607                   struct evbuffer   * buf,
608                   int                 isPieceData )
609{
610    const size_t n = EVBUFFER_LENGTH( buf );
611
612    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
613    evbuffer_drain( buf, n );
614}
615
616/**
617***
618**/
619
620void
621tr_peerIoWriteBytes( tr_peerIo *       io,
622                     struct evbuffer * outbuf,
623                     const void *      bytes,
624                     size_t            byteCount )
625{
626    uint8_t * tmp;
627
628    switch( io->encryptionMode )
629    {
630        case PEER_ENCRYPTION_NONE:
631            evbuffer_add( outbuf, bytes, byteCount );
632            break;
633
634        case PEER_ENCRYPTION_RC4:
635            tmp = tr_new( uint8_t, byteCount );
636            tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
637            evbuffer_add( outbuf, tmp, byteCount );
638            tr_free( tmp );
639            break;
640
641        default:
642            assert( 0 );
643    }
644}
645
646void
647tr_peerIoWriteUint8( tr_peerIo *       io,
648                     struct evbuffer * outbuf,
649                     uint8_t           writeme )
650{
651    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
652}
653
654void
655tr_peerIoWriteUint16( tr_peerIo *       io,
656                      struct evbuffer * outbuf,
657                      uint16_t          writeme )
658{
659    uint16_t tmp = htons( writeme );
660
661    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
662}
663
664void
665tr_peerIoWriteUint32( tr_peerIo *       io,
666                      struct evbuffer * outbuf,
667                      uint32_t          writeme )
668{
669    uint32_t tmp = htonl( writeme );
670
671    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
672}
673
674/***
675****
676***/
677
678void
679tr_peerIoReadBytes( tr_peerIo *       io,
680                    struct evbuffer * inbuf,
681                    void *            bytes,
682                    size_t            byteCount )
683{
684    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
685
686    switch( io->encryptionMode )
687    {
688        case PEER_ENCRYPTION_NONE:
689            evbuffer_remove( inbuf, bytes, byteCount );
690            break;
691
692        case PEER_ENCRYPTION_RC4:
693            evbuffer_remove( inbuf, bytes, byteCount );
694            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
695            break;
696
697        default:
698            assert( 0 );
699    }
700}
701
702void
703tr_peerIoReadUint8( tr_peerIo *       io,
704                    struct evbuffer * inbuf,
705                    uint8_t *         setme )
706{
707    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
708}
709
710void
711tr_peerIoReadUint16( tr_peerIo *       io,
712                     struct evbuffer * inbuf,
713                     uint16_t *        setme )
714{
715    uint16_t tmp;
716
717    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
718    *setme = ntohs( tmp );
719}
720
721void
722tr_peerIoReadUint32( tr_peerIo *       io,
723                     struct evbuffer * inbuf,
724                     uint32_t *        setme )
725{
726    uint32_t tmp;
727
728    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
729    *setme = ntohl( tmp );
730}
731
732void
733tr_peerIoDrain( tr_peerIo *       io,
734                struct evbuffer * inbuf,
735                size_t            byteCount )
736{
737    uint8_t * tmp = tr_new( uint8_t, byteCount );
738
739    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
740    tr_free( tmp );
741}
742
743int
744tr_peerIoGetAge( const tr_peerIo * io )
745{
746    return time( NULL ) - io->timeCreated;
747}
Note: See TracBrowser for help on using the repository browser.