source: trunk/libtransmission/peer-io.c @ 7476

Last change on this file since 7476 was 7476, checked in by charles, 13 years ago

(trunk libT) #include "session.h" cleanup from wereHamster

  • Property svn:keywords set to Date Rev Author Id
File size: 23.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7476 2008-12-23 17:27:15Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <arpa/inet.h> /* inet_ntoa */
23#endif
24
25#include <event.h>
26
27#include "transmission.h"
28#include "session.h"
29#include "bandwidth.h"
30#include "crypto.h"
31#include "list.h"
32#include "net.h"
33#include "peer-io.h"
34#include "trevent.h"
35#include "utils.h"
36
37#define MAGIC_NUMBER 206745
38
39static size_t
40getPacketOverhead( size_t d )
41{
42    /**
43     * http://sd.wareonearth.com/~phil/net/overhead/
44     *
45     * TCP over Ethernet:
46     * Assuming no header compression (e.g. not PPP)
47     * Add 20 IPv4 header or 40 IPv6 header (no options)
48     * Add 20 TCP header
49     * Add 12 bytes optional TCP timestamps
50     * Max TCP Payload data rates over ethernet are thus:
51     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
52     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
53     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
54     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
55     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
56     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
57     */
58    static const double assumed_payload_data_rate = 94.0;
59
60    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
61}
62
63/**
64***
65**/
66
67#define dbgmsg( io, ... ) \
68    do { \
69        if( tr_deepLoggingIsActive( ) ) \
70            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
71    } while( 0 )
72
73struct tr_datatype
74{
75    tr_bool  isPieceData;
76    size_t   length;
77    struct __tr_list head;
78};
79
80struct tr_peerIo
81{
82    tr_bool            isEncrypted;
83    tr_bool            isIncoming;
84    tr_bool            peerIdIsSet;
85    tr_bool            extendedProtocolSupported;
86    tr_bool            fastExtensionSupported;
87
88    int                magicNumber;
89
90    uint8_t            encryptionMode;
91    tr_port            port;
92    int                socket;
93
94    uint8_t            peerId[20];
95    time_t             timeCreated;
96
97    tr_session       * session;
98
99    tr_address         addr;
100
101    tr_can_read_cb     canRead;
102    tr_did_write_cb    didWrite;
103    tr_net_error_cb    gotError;
104    void *             userData;
105
106    tr_bandwidth     * bandwidth;
107    tr_crypto        * crypto;
108
109    struct evbuffer  * inbuf;
110    struct evbuffer  * outbuf;
111    struct __tr_list   outbuf_datatypes; /* struct tr_datatype */
112
113    struct event       event_read;
114    struct event       event_write;
115};
116
117/***
118****
119***/
120
121static void
122didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
123{
124    while( bytes_transferred )
125    {
126        struct tr_datatype * next = __tr_list_entry( io->outbuf_datatypes.next,
127                                                     struct tr_datatype, head );
128        const size_t payload = MIN( next->length, bytes_transferred );
129        const size_t overhead = getPacketOverhead( payload );
130
131        tr_bandwidthUsed( io->bandwidth, TR_UP, payload, next->isPieceData );
132
133        if( overhead > 0 )
134            tr_bandwidthUsed( io->bandwidth, TR_UP, overhead, FALSE );
135
136        if( io->didWrite )
137            io->didWrite( io, payload, next->isPieceData, io->userData );
138
139        bytes_transferred -= payload;
140        next->length -= payload;
141        if( !next->length ) {
142            __tr_list_remove( io->outbuf_datatypes.next );
143            tr_free( next );
144        }
145    }
146}
147
148static void
149canReadWrapper( tr_peerIo * io )
150{
151    tr_bool done = 0;
152    tr_bool err = 0;
153    tr_session * session = io->session;
154
155    dbgmsg( io, "canRead" );
156
157    /* try to consume the input buffer */
158    if( io->canRead )
159    {
160        tr_globalLock( session );
161
162        while( !done && !err )
163        {
164            size_t piece = 0;
165            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
166            const int ret = io->canRead( io, io->userData, &piece );
167
168            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
169
170            if( piece )
171                tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
172
173            if( used != piece )
174                tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
175
176            switch( ret )
177            {
178                case READ_NOW:
179                    if( EVBUFFER_LENGTH( io->inbuf ) )
180                        continue;
181                    done = 1;
182                    break;
183
184                case READ_LATER:
185                    done = 1;
186                    break;
187
188                case READ_ERR:
189                    err = 1;
190                    break;
191            }
192        }
193
194        tr_globalUnlock( session );
195    }
196}
197
198#define _isBool(b) (((b)==0 || (b)==1))
199
200tr_bool
201tr_isPeerIo( const tr_peerIo * io )
202{
203    return ( io != NULL )
204        && ( io->magicNumber == MAGIC_NUMBER )
205        && ( tr_isAddress( &io->addr ) )
206        && ( _isBool( io->isEncrypted ) )
207        && ( _isBool( io->isIncoming ) )
208        && ( _isBool( io->peerIdIsSet ) )
209        && ( _isBool( io->extendedProtocolSupported ) )
210        && ( _isBool( io->fastExtensionSupported ) );
211}
212
213static void
214event_read_cb( int fd, short event UNUSED, void * vio )
215{
216    int res;
217    tr_peerIo * io = vio;
218
219    /* Limit the input buffer to 256K, so it doesn't grow too large */
220    size_t howmuch;
221    const tr_direction dir = TR_DOWN;
222    const size_t max = 256 * 1024;
223    const size_t curlen = EVBUFFER_LENGTH( io->inbuf );
224
225    howmuch = curlen >= max ? 0 : max - curlen;
226    howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch );
227
228    assert( tr_isPeerIo( io ) );
229
230    dbgmsg( io, "libevent says this peer is ready to read" );
231
232    /* if we don't have any bandwidth left, stop reading */
233    if( howmuch < 1 ) {
234        tr_peerIoSetEnabled( io, dir, FALSE );
235        return;
236    }
237
238    res = evbuffer_read( io->inbuf, fd, howmuch );
239
240    if( res > 0 )
241    {
242        tr_peerIoSetEnabled( io, dir, TRUE );
243
244        /* Invoke the user callback - must always be called last */
245        canReadWrapper( io );
246    }
247    else
248    {
249        short what = EVBUFFER_READ;
250
251        if( res == 0 ) /* EOF */
252            what |= EVBUFFER_EOF;
253        else if( res == -1 ) {
254            if( errno == EAGAIN || errno == EINTR ) {
255                tr_peerIoSetEnabled( io, dir, TRUE );
256                return;
257            }
258            what |= EVBUFFER_ERROR;
259        }
260
261        if( io->gotError != NULL )
262            io->gotError( io, what, io->userData );
263    }
264}
265
266static int
267tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
268{
269    struct evbuffer * buffer = io->outbuf;
270    int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
271
272#ifdef WIN32
273    n = send(fd, buffer->buffer, n,  0 );
274#else
275    n = write(fd, buffer->buffer, n );
276#endif
277    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
278
279    if( n == -1 )
280        return -1;
281    if (n == 0)
282        return 0;
283    evbuffer_drain( buffer, n );
284
285    return n;
286}
287
288static void
289event_write_cb( int fd, short event UNUSED, void * vio )
290{
291    int res = 0;
292    short what = EVBUFFER_WRITE;
293    tr_peerIo * io = vio;
294    size_t howmuch;
295    const tr_direction dir = TR_UP;
296
297    assert( tr_isPeerIo( io ) );
298
299    dbgmsg( io, "libevent says this peer is ready to write" );
300
301    /* Write as much as possible, since the socket is non-blocking, write() will
302     * return if it can't write any more data without blocking */
303    howmuch = tr_bandwidthClamp( io->bandwidth, dir, EVBUFFER_LENGTH( io->outbuf ) );
304
305    /* if we don't have any bandwidth left, stop writing */
306    if( howmuch < 1 ) {
307        tr_peerIoSetEnabled( io, dir, FALSE );
308        return;
309    }
310
311    res = tr_evbuffer_write( io, fd, howmuch );
312    if (res == -1) {
313#ifndef WIN32
314/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
315 *  *set errno. thus this error checking is not portable*/
316        if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
317            goto reschedule;
318        /* error case */
319        what |= EVBUFFER_ERROR;
320
321#else
322        goto reschedule;
323#endif
324
325    } else if (res == 0) {
326        /* eof case */
327        what |= EVBUFFER_EOF;
328    }
329    if (res <= 0)
330        goto error;
331
332    if( EVBUFFER_LENGTH( io->outbuf ) )
333        tr_peerIoSetEnabled( io, dir, TRUE );
334
335    didWriteWrapper( io, res );
336    return;
337
338 reschedule:
339    if( EVBUFFER_LENGTH( io->outbuf ) )
340        tr_peerIoSetEnabled( io, dir, TRUE );
341    return;
342
343 error:
344    if( io->gotError != NULL )
345        io->gotError( io, what, io->userData );
346}
347
348/**
349***
350**/
351
352static tr_peerIo*
353tr_peerIoNew( tr_session       * session,
354              const tr_address * addr,
355              tr_port            port,
356              const uint8_t    * torrentHash,
357              int                isIncoming,
358              int                socket )
359{
360    tr_peerIo * io;
361
362    if( socket >= 0 )
363        tr_netSetTOS( socket, session->peerSocketTOS );
364
365    io = tr_new0( tr_peerIo, 1 );
366    io->magicNumber = MAGIC_NUMBER;
367    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
368    io->session = session;
369    io->addr = *addr;
370    io->port = port;
371    io->socket = socket;
372    io->isIncoming = isIncoming != 0;
373    io->timeCreated = time( NULL );
374    io->inbuf = evbuffer_new( );
375    io->outbuf = evbuffer_new( );
376
377    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
378    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
379
380    __tr_list_init( &io->outbuf_datatypes );
381
382    tr_peerIoSetBandwidth( io, session->bandwidth );
383
384    return io;
385}
386
387tr_peerIo*
388tr_peerIoNewIncoming( tr_session       * session,
389                      const tr_address * addr,
390                      tr_port            port,
391                      int                socket )
392{
393    assert( session );
394    assert( tr_isAddress( addr ) );
395    assert( socket >= 0 );
396
397    return tr_peerIoNew( session, addr, port, NULL, 1, socket );
398}
399
400tr_peerIo*
401tr_peerIoNewOutgoing( tr_session       * session,
402                      const tr_address * addr,
403                      tr_port            port,
404                      const uint8_t    * torrentHash )
405{
406    int socket;
407
408    assert( session );
409    assert( tr_isAddress( addr ) );
410    assert( torrentHash );
411
412    socket = tr_netOpenTCP( session, addr, port );
413
414    return socket < 0
415           ? NULL
416           : tr_peerIoNew( session, addr, port, torrentHash, 0, socket );
417}
418
419static void
420trDatatypeFree( void * data )
421{
422    struct tr_datatype * dt = __tr_list_entry( data, struct tr_datatype, head );
423    tr_free(dt);
424}
425
426static void
427io_dtor( void * vio )
428{
429    tr_peerIo * io = vio;
430
431    event_del( &io->event_read );
432    event_del( &io->event_write );
433    tr_peerIoSetBandwidth( io, NULL );
434    evbuffer_free( io->outbuf );
435    evbuffer_free( io->inbuf );
436    tr_netClose( io->socket );
437    tr_cryptoFree( io->crypto );
438    __tr_list_destroy( &io->outbuf_datatypes, trDatatypeFree );
439
440    io->magicNumber = 0xDEAD;
441    tr_free( io );
442}
443
444void
445tr_peerIoFree( tr_peerIo * io )
446{
447    if( io )
448    {
449        io->canRead = NULL;
450        io->didWrite = NULL;
451        io->gotError = NULL;
452        tr_runInEventThread( io->session, io_dtor, io );
453    }
454}
455
456tr_session*
457tr_peerIoGetSession( tr_peerIo * io )
458{
459    assert( tr_isPeerIo( io ) );
460    assert( io->session );
461
462    return io->session;
463}
464
465const tr_address*
466tr_peerIoGetAddress( const tr_peerIo * io,
467                           tr_port   * port )
468{
469    assert( tr_isPeerIo( io ) );
470
471    if( port )
472        *port = io->port;
473
474    return &io->addr;
475}
476
477const char*
478tr_peerIoAddrStr( const tr_address * addr, tr_port port )
479{
480    static char buf[512];
481
482    if( addr->type == TR_AF_INET ) 
483        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
484    else 
485        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
486    return buf;
487}
488
489const char*
490tr_peerIoGetAddrStr( const tr_peerIo * io )
491{
492    return tr_peerIoAddrStr( &io->addr, io->port );
493}
494
495void
496tr_peerIoSetIOFuncs( tr_peerIo        * io,
497                     tr_can_read_cb     readcb,
498                     tr_did_write_cb    writecb,
499                     tr_net_error_cb    errcb,
500                     void             * userData )
501{
502    io->canRead = readcb;
503    io->didWrite = writecb;
504    io->gotError = errcb;
505    io->userData = userData;
506}
507
508tr_bool
509tr_peerIoIsIncoming( const tr_peerIo * c )
510{
511    return c->isIncoming != 0;
512}
513
514int
515tr_peerIoReconnect( tr_peerIo * io )
516{
517    assert( !tr_peerIoIsIncoming( io ) );
518
519    if( io->socket >= 0 )
520        tr_netClose( io->socket );
521
522    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port );
523
524    if( io->socket >= 0 )
525    {
526        tr_bandwidth * bandwidth = io->bandwidth;
527        tr_peerIoSetBandwidth( io, NULL );
528        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
529        tr_peerIoSetBandwidth( io, bandwidth );
530        return 0;
531    }
532
533    return -1;
534}
535
536/**
537***
538**/
539
540void
541tr_peerIoSetTorrentHash( tr_peerIo *     io,
542                         const uint8_t * hash )
543{
544    assert( tr_isPeerIo( io ) );
545
546    tr_cryptoSetTorrentHash( io->crypto, hash );
547}
548
549const uint8_t*
550tr_peerIoGetTorrentHash( tr_peerIo * io )
551{
552    assert( tr_isPeerIo( io ) );
553    assert( io->crypto );
554
555    return tr_cryptoGetTorrentHash( io->crypto );
556}
557
558int
559tr_peerIoHasTorrentHash( const tr_peerIo * io )
560{
561    assert( tr_isPeerIo( io ) );
562    assert( io->crypto );
563
564    return tr_cryptoHasTorrentHash( io->crypto );
565}
566
567/**
568***
569**/
570
571void
572tr_peerIoSetPeersId( tr_peerIo *     io,
573                     const uint8_t * peer_id )
574{
575    assert( tr_isPeerIo( io ) );
576
577    if( ( io->peerIdIsSet = peer_id != NULL ) )
578        memcpy( io->peerId, peer_id, 20 );
579    else
580        memset( io->peerId, 0, 20 );
581}
582
583const uint8_t*
584tr_peerIoGetPeersId( const tr_peerIo * io )
585{
586    assert( tr_isPeerIo( io ) );
587    assert( io->peerIdIsSet );
588
589    return io->peerId;
590}
591
592/**
593***
594**/
595
596void
597tr_peerIoEnableFEXT( tr_peerIo * io,
598                     tr_bool     flag )
599{
600    assert( tr_isPeerIo( io ) );
601    assert( _isBool( flag ) );
602
603    dbgmsg( io, "setting FEXT support flag to %d", (flag!=0) );
604    io->fastExtensionSupported = flag;
605}
606
607tr_bool
608tr_peerIoSupportsFEXT( const tr_peerIo * io )
609{
610    assert( tr_isPeerIo( io ) );
611
612    return io->fastExtensionSupported;
613}
614
615/**
616***
617**/
618
619void
620tr_peerIoEnableLTEP( tr_peerIo  * io,
621                     tr_bool      flag )
622{
623    assert( tr_isPeerIo( io ) );
624    assert( _isBool( flag ) );
625
626    dbgmsg( io, "setting LTEP support flag to %d", (flag!=0) );
627    io->extendedProtocolSupported = flag;
628}
629
630tr_bool
631tr_peerIoSupportsLTEP( const tr_peerIo * io )
632{
633    assert( tr_isPeerIo( io ) );
634
635    return io->extendedProtocolSupported;
636}
637
638/**
639***
640**/
641
642static size_t
643getDesiredOutputBufferSize( const tr_peerIo * io )
644{
645    /* this is all kind of arbitrary, but what seems to work well is
646     * being large enough to hold the next 20 seconds' worth of input,
647     * or a few blocks, whichever is bigger.
648     * It's okay to tweak this as needed */
649    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
650    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
651    const double period = 20; /* arbitrary */
652    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
653    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
654}
655
656size_t
657tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
658{
659    const size_t desiredLen = getDesiredOutputBufferSize( io );
660    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
661    size_t freeSpace = 0;
662
663    if( desiredLen > currentLen )
664        freeSpace = desiredLen - currentLen;
665
666    return freeSpace;
667}
668
669void
670tr_peerIoSetBandwidth( tr_peerIo     * io,
671                       tr_bandwidth  * bandwidth )
672{
673    assert( tr_isPeerIo( io ) );
674
675    if( io->bandwidth )
676        tr_bandwidthRemovePeer( io->bandwidth, io );
677
678    io->bandwidth = bandwidth;
679
680    if( io->bandwidth )
681        tr_bandwidthAddPeer( io->bandwidth, io );
682}
683
684/**
685***
686**/
687
688tr_crypto*
689tr_peerIoGetCrypto( tr_peerIo * c )
690{
691    return c->crypto;
692}
693
694void
695tr_peerIoSetEncryption( tr_peerIo * io,
696                        int         encryptionMode )
697{
698    assert( tr_isPeerIo( io ) );
699    assert( encryptionMode == PEER_ENCRYPTION_NONE
700          || encryptionMode == PEER_ENCRYPTION_RC4 );
701
702    io->encryptionMode = encryptionMode;
703}
704
705int
706tr_peerIoIsEncrypted( const tr_peerIo * io )
707{
708    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
709}
710
711/**
712***
713**/
714
715void
716tr_peerIoWrite( tr_peerIo   * io,
717                const void  * writeme,
718                size_t        writemeLen,
719                int           isPieceData )
720{
721    struct tr_datatype * datatype;
722
723    assert( tr_amInEventThread( io->session ) );
724    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
725
726    datatype = tr_new( struct tr_datatype, 1 );
727    datatype->isPieceData = isPieceData != 0;
728    datatype->length = writemeLen;
729
730    __tr_list_init( &datatype->head );
731    __tr_list_append( &io->outbuf_datatypes, &datatype->head );
732
733    evbuffer_add( io->outbuf, writeme, writemeLen );
734}
735
736void
737tr_peerIoWriteBuf( tr_peerIo         * io,
738                   struct evbuffer   * buf,
739                   int                 isPieceData )
740{
741    const size_t n = EVBUFFER_LENGTH( buf );
742
743    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
744    evbuffer_drain( buf, n );
745}
746
747/**
748***
749**/
750
751void
752tr_peerIoWriteBytes( tr_peerIo       * io,
753                     struct evbuffer * outbuf,
754                     const void      * bytes,
755                     size_t            byteCount )
756{
757    uint8_t * tmp;
758
759    switch( io->encryptionMode )
760    {
761        case PEER_ENCRYPTION_NONE:
762            evbuffer_add( outbuf, bytes, byteCount );
763            break;
764
765        case PEER_ENCRYPTION_RC4:
766            tmp = tr_new( uint8_t, byteCount );
767            tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
768            evbuffer_add( outbuf, tmp, byteCount );
769            tr_free( tmp );
770            break;
771
772        default:
773            assert( 0 );
774    }
775}
776
777void
778tr_peerIoWriteUint8( tr_peerIo       * io,
779                     struct evbuffer * outbuf,
780                     uint8_t           writeme )
781{
782    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
783}
784
785void
786tr_peerIoWriteUint16( tr_peerIo       * io,
787                      struct evbuffer * outbuf,
788                      uint16_t          writeme )
789{
790    uint16_t tmp = htons( writeme );
791
792    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
793}
794
795void
796tr_peerIoWriteUint32( tr_peerIo       * io,
797                      struct evbuffer * outbuf,
798                      uint32_t          writeme )
799{
800    uint32_t tmp = htonl( writeme );
801
802    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
803}
804
805/***
806****
807***/
808
809void
810tr_peerIoReadBytes( tr_peerIo       * io,
811                    struct evbuffer * inbuf,
812                    void            * bytes,
813                    size_t            byteCount )
814{
815    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
816
817    switch( io->encryptionMode )
818    {
819        case PEER_ENCRYPTION_NONE:
820            evbuffer_remove( inbuf, bytes, byteCount );
821            break;
822
823        case PEER_ENCRYPTION_RC4:
824            evbuffer_remove( inbuf, bytes, byteCount );
825            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
826            break;
827
828        default:
829            assert( 0 );
830    }
831}
832
833void
834tr_peerIoReadUint8( tr_peerIo       * io,
835                    struct evbuffer * inbuf,
836                    uint8_t         * setme )
837{
838    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
839}
840
841void
842tr_peerIoReadUint16( tr_peerIo       * io,
843                     struct evbuffer * inbuf,
844                     uint16_t        * setme )
845{
846    uint16_t tmp;
847
848    assert( tr_isPeerIo( io ) );
849
850    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
851    *setme = ntohs( tmp );
852}
853
854void
855tr_peerIoReadUint32( tr_peerIo       * io,
856                     struct evbuffer * inbuf,
857                     uint32_t        * setme )
858{
859    uint32_t tmp;
860
861    assert( tr_isPeerIo( io ) );
862
863    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
864    *setme = ntohl( tmp );
865}
866
867void
868tr_peerIoDrain( tr_peerIo       * io,
869                struct evbuffer * inbuf,
870                size_t            byteCount )
871{
872    uint8_t * tmp;
873
874    assert( tr_isPeerIo( io ) );
875
876    tmp = tr_new( uint8_t, byteCount );
877    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
878    tr_free( tmp );
879}
880
881int
882tr_peerIoGetAge( const tr_peerIo * io )
883{
884    return time( NULL ) - io->timeCreated;
885}
886
887/***
888****
889***/
890
891static int
892tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
893{
894    int res;
895
896    assert( tr_isPeerIo( io ) );
897
898    howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch );
899
900    res = howmuch ? evbuffer_read( io->inbuf, io->socket, howmuch ) : 0;
901
902    dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(errno):"") );
903
904    if( EVBUFFER_LENGTH( io->inbuf ) )
905        canReadWrapper( io );
906
907    if( ( res <= 0 ) && ( io->gotError ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
908    {
909        short what = EVBUFFER_READ | EVBUFFER_ERROR;
910        if( res == 0 )
911            what |= EVBUFFER_EOF;
912        io->gotError( io, what, io->userData );
913    }
914
915    return res;
916}
917
918static int
919tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
920{
921    int n;
922
923    assert( tr_isPeerIo( io ) );
924
925    howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch );
926
927    n = tr_evbuffer_write( io, io->socket, (int)howmuch );
928
929    if( n > 0 )
930        didWriteWrapper( io, n );
931
932    if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) {
933        short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
934        io->gotError( io, what, io->userData );
935    }
936
937    return n;
938}
939
940int
941tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
942{
943    int ret;
944
945    assert( tr_isPeerIo( io ) );
946    assert( tr_isDirection( dir ) );
947
948    if( dir==TR_DOWN )
949        ret = tr_peerIoTryRead( io, limit );
950    else
951        ret = tr_peerIoTryWrite( io, limit );
952
953    return ret;
954}
955
956struct evbuffer *
957tr_peerIoGetReadBuffer( tr_peerIo * io )
958{
959    assert( tr_isPeerIo( io ) );
960
961    return io->inbuf;
962}
963
964tr_bool
965tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
966{
967    assert( tr_isPeerIo( io ) );
968    assert( tr_isDirection( dir ) );
969
970    return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
971}
972
973/***
974****
975****/
976
977static void
978event_enable( tr_peerIo * io, short event )
979{
980    assert( tr_isPeerIo( io ) );
981
982    if( event & EV_READ )
983        event_add( &io->event_read, NULL );
984
985    if( event & EV_WRITE )
986        event_add( &io->event_write, NULL );
987}
988
989static void
990event_disable( struct tr_peerIo * io, short event )
991{
992    assert( tr_isPeerIo( io ) );
993
994    if( event & EV_READ )
995        event_del( &io->event_read );
996
997    if( event & EV_WRITE )
998        event_del( &io->event_write );
999}
1000
1001
1002void
1003tr_peerIoSetEnabled( tr_peerIo    * io,
1004                     tr_direction   dir,
1005                     tr_bool        isEnabled )
1006{
1007    short event;
1008
1009    assert( tr_isPeerIo( io ) );
1010    assert( tr_isDirection( dir ) );
1011
1012    event = dir == TR_UP ? EV_WRITE : EV_READ;
1013
1014    if( isEnabled )
1015        event_enable( io, event );
1016    else
1017        event_disable( io, event );
1018}
Note: See TracBrowser for help on using the repository browser.