source: trunk/libtransmission/peer-io.c @ 7475

Last change on this file since 7475 was 7475, checked in by charles, 12 years ago

(trunk libT) fix minor r7474 issue

  • Property svn:keywords set to Date Rev Author Id
File size: 23.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7475 2008-12-23 17:23:07Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <arpa/inet.h> /* inet_ntoa */
23#endif
24
25#include <event.h>
26
27#include "transmission.h"
28#include "bandwidth.h"
29#include "crypto.h"
30#include "list.h"
31#include "net.h"
32#include "peer-io.h"
33#include "trevent.h"
34#include "utils.h"
35
36#define MAGIC_NUMBER 206745
37
38static size_t
39getPacketOverhead( size_t d )
40{
41    /**
42     * http://sd.wareonearth.com/~phil/net/overhead/
43     *
44     * TCP over Ethernet:
45     * Assuming no header compression (e.g. not PPP)
46     * Add 20 IPv4 header or 40 IPv6 header (no options)
47     * Add 20 TCP header
48     * Add 12 bytes optional TCP timestamps
49     * Max TCP Payload data rates over ethernet are thus:
50     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
51     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
52     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
53     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
54     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
55     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
56     */
57    static const double assumed_payload_data_rate = 94.0;
58
59    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
60}
61
62/**
63***
64**/
65
66#define dbgmsg( io, ... ) \
67    do { \
68        if( tr_deepLoggingIsActive( ) ) \
69            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
70    } while( 0 )
71
72struct tr_datatype
73{
74    tr_bool  isPieceData;
75    size_t   length;
76    struct __tr_list head;
77};
78
79struct tr_peerIo
80{
81    tr_bool            isEncrypted;
82    tr_bool            isIncoming;
83    tr_bool            peerIdIsSet;
84    tr_bool            extendedProtocolSupported;
85    tr_bool            fastExtensionSupported;
86
87    int                magicNumber;
88
89    uint8_t            encryptionMode;
90    tr_port            port;
91    int                socket;
92
93    uint8_t            peerId[20];
94    time_t             timeCreated;
95
96    tr_session       * session;
97
98    tr_address         addr;
99
100    tr_can_read_cb     canRead;
101    tr_did_write_cb    didWrite;
102    tr_net_error_cb    gotError;
103    void *             userData;
104
105    tr_bandwidth     * bandwidth;
106    tr_crypto        * crypto;
107
108    struct evbuffer  * inbuf;
109    struct evbuffer  * outbuf;
110    struct __tr_list   outbuf_datatypes; /* struct tr_datatype */
111
112    struct event       event_read;
113    struct event       event_write;
114};
115
116/***
117****
118***/
119
120static void
121didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
122{
123    while( bytes_transferred )
124    {
125        struct tr_datatype * next = __tr_list_entry( io->outbuf_datatypes.next,
126                                                     struct tr_datatype, head );
127        const size_t payload = MIN( next->length, bytes_transferred );
128        const size_t overhead = getPacketOverhead( payload );
129
130        tr_bandwidthUsed( io->bandwidth, TR_UP, payload, next->isPieceData );
131
132        if( overhead > 0 )
133            tr_bandwidthUsed( io->bandwidth, TR_UP, overhead, FALSE );
134
135        if( io->didWrite )
136            io->didWrite( io, payload, next->isPieceData, io->userData );
137
138        bytes_transferred -= payload;
139        next->length -= payload;
140        if( !next->length ) {
141            __tr_list_remove( io->outbuf_datatypes.next );
142            tr_free( next );
143        }
144    }
145}
146
147static void
148canReadWrapper( tr_peerIo * io )
149{
150    tr_bool done = 0;
151    tr_bool err = 0;
152    tr_session * session = io->session;
153
154    dbgmsg( io, "canRead" );
155
156    /* try to consume the input buffer */
157    if( io->canRead )
158    {
159        tr_globalLock( session );
160
161        while( !done && !err )
162        {
163            size_t piece = 0;
164            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
165            const int ret = io->canRead( io, io->userData, &piece );
166
167            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
168
169            if( piece )
170                tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
171
172            if( used != piece )
173                tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
174
175            switch( ret )
176            {
177                case READ_NOW:
178                    if( EVBUFFER_LENGTH( io->inbuf ) )
179                        continue;
180                    done = 1;
181                    break;
182
183                case READ_LATER:
184                    done = 1;
185                    break;
186
187                case READ_ERR:
188                    err = 1;
189                    break;
190            }
191        }
192
193        tr_globalUnlock( session );
194    }
195}
196
197#define _isBool(b) (((b)==0 || (b)==1))
198
199tr_bool
200tr_isPeerIo( const tr_peerIo * io )
201{
202    return ( io != NULL )
203        && ( io->magicNumber == MAGIC_NUMBER )
204        && ( tr_isAddress( &io->addr ) )
205        && ( _isBool( io->isEncrypted ) )
206        && ( _isBool( io->isIncoming ) )
207        && ( _isBool( io->peerIdIsSet ) )
208        && ( _isBool( io->extendedProtocolSupported ) )
209        && ( _isBool( io->fastExtensionSupported ) );
210}
211
212static void
213event_read_cb( int fd, short event UNUSED, void * vio )
214{
215    int res;
216    tr_peerIo * io = vio;
217
218    /* Limit the input buffer to 256K, so it doesn't grow too large */
219    size_t howmuch;
220    const tr_direction dir = TR_DOWN;
221    const size_t max = 256 * 1024;
222    const size_t curlen = EVBUFFER_LENGTH( io->inbuf );
223
224    howmuch = curlen >= max ? 0 : max - curlen;
225    howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch );
226
227    assert( tr_isPeerIo( io ) );
228
229    dbgmsg( io, "libevent says this peer is ready to read" );
230
231    /* if we don't have any bandwidth left, stop reading */
232    if( howmuch < 1 ) {
233        tr_peerIoSetEnabled( io, dir, FALSE );
234        return;
235    }
236
237    res = evbuffer_read( io->inbuf, fd, howmuch );
238
239    if( res > 0 )
240    {
241        tr_peerIoSetEnabled( io, dir, TRUE );
242
243        /* Invoke the user callback - must always be called last */
244        canReadWrapper( io );
245    }
246    else
247    {
248        short what = EVBUFFER_READ;
249
250        if( res == 0 ) /* EOF */
251            what |= EVBUFFER_EOF;
252        else if( res == -1 ) {
253            if( errno == EAGAIN || errno == EINTR ) {
254                tr_peerIoSetEnabled( io, dir, TRUE );
255                return;
256            }
257            what |= EVBUFFER_ERROR;
258        }
259
260        if( io->gotError != NULL )
261            io->gotError( io, what, io->userData );
262    }
263}
264
265static int
266tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
267{
268    struct evbuffer * buffer = io->outbuf;
269    int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
270
271#ifdef WIN32
272    n = send(fd, buffer->buffer, n,  0 );
273#else
274    n = write(fd, buffer->buffer, n );
275#endif
276    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
277
278    if( n == -1 )
279        return -1;
280    if (n == 0)
281        return 0;
282    evbuffer_drain( buffer, n );
283
284    return n;
285}
286
287static void
288event_write_cb( int fd, short event UNUSED, void * vio )
289{
290    int res = 0;
291    short what = EVBUFFER_WRITE;
292    tr_peerIo * io = vio;
293    size_t howmuch;
294    const tr_direction dir = TR_UP;
295
296    assert( tr_isPeerIo( io ) );
297
298    dbgmsg( io, "libevent says this peer is ready to write" );
299
300    /* Write as much as possible, since the socket is non-blocking, write() will
301     * return if it can't write any more data without blocking */
302    howmuch = tr_bandwidthClamp( io->bandwidth, dir, EVBUFFER_LENGTH( io->outbuf ) );
303
304    /* if we don't have any bandwidth left, stop writing */
305    if( howmuch < 1 ) {
306        tr_peerIoSetEnabled( io, dir, FALSE );
307        return;
308    }
309
310    res = tr_evbuffer_write( io, fd, howmuch );
311    if (res == -1) {
312#ifndef WIN32
313/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
314 *  *set errno. thus this error checking is not portable*/
315        if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
316            goto reschedule;
317        /* error case */
318        what |= EVBUFFER_ERROR;
319
320#else
321        goto reschedule;
322#endif
323
324    } else if (res == 0) {
325        /* eof case */
326        what |= EVBUFFER_EOF;
327    }
328    if (res <= 0)
329        goto error;
330
331    if( EVBUFFER_LENGTH( io->outbuf ) )
332        tr_peerIoSetEnabled( io, dir, TRUE );
333
334    didWriteWrapper( io, res );
335    return;
336
337 reschedule:
338    if( EVBUFFER_LENGTH( io->outbuf ) )
339        tr_peerIoSetEnabled( io, dir, TRUE );
340    return;
341
342 error:
343    if( io->gotError != NULL )
344        io->gotError( io, what, io->userData );
345}
346
347/**
348***
349**/
350
351static tr_peerIo*
352tr_peerIoNew( tr_session       * session,
353              const tr_address * addr,
354              tr_port            port,
355              const uint8_t    * torrentHash,
356              int                isIncoming,
357              int                socket )
358{
359    tr_peerIo * io;
360
361    if( socket >= 0 )
362        tr_netSetTOS( socket, session->peerSocketTOS );
363
364    io = tr_new0( tr_peerIo, 1 );
365    io->magicNumber = MAGIC_NUMBER;
366    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
367    io->session = session;
368    io->addr = *addr;
369    io->port = port;
370    io->socket = socket;
371    io->isIncoming = isIncoming != 0;
372    io->timeCreated = time( NULL );
373    io->inbuf = evbuffer_new( );
374    io->outbuf = evbuffer_new( );
375
376    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
377    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
378
379    __tr_list_init( &io->outbuf_datatypes );
380
381    tr_peerIoSetBandwidth( io, session->bandwidth );
382
383    return io;
384}
385
386tr_peerIo*
387tr_peerIoNewIncoming( tr_session       * session,
388                      const tr_address * addr,
389                      tr_port            port,
390                      int                socket )
391{
392    assert( session );
393    assert( tr_isAddress( addr ) );
394    assert( socket >= 0 );
395
396    return tr_peerIoNew( session, addr, port, NULL, 1, socket );
397}
398
399tr_peerIo*
400tr_peerIoNewOutgoing( tr_session       * session,
401                      const tr_address * addr,
402                      tr_port            port,
403                      const uint8_t    * torrentHash )
404{
405    int socket;
406
407    assert( session );
408    assert( tr_isAddress( addr ) );
409    assert( torrentHash );
410
411    socket = tr_netOpenTCP( session, addr, port );
412
413    return socket < 0
414           ? NULL
415           : tr_peerIoNew( session, addr, port, torrentHash, 0, socket );
416}
417
418static void
419trDatatypeFree( void * data )
420{
421    struct tr_datatype * dt = __tr_list_entry( data, struct tr_datatype, head );
422    tr_free(dt);
423}
424
425static void
426io_dtor( void * vio )
427{
428    tr_peerIo * io = vio;
429
430    event_del( &io->event_read );
431    event_del( &io->event_write );
432    tr_peerIoSetBandwidth( io, NULL );
433    evbuffer_free( io->outbuf );
434    evbuffer_free( io->inbuf );
435    tr_netClose( io->socket );
436    tr_cryptoFree( io->crypto );
437    __tr_list_destroy( &io->outbuf_datatypes, trDatatypeFree );
438
439    io->magicNumber = 0xDEAD;
440    tr_free( io );
441}
442
443void
444tr_peerIoFree( tr_peerIo * io )
445{
446    if( io )
447    {
448        io->canRead = NULL;
449        io->didWrite = NULL;
450        io->gotError = NULL;
451        tr_runInEventThread( io->session, io_dtor, io );
452    }
453}
454
455tr_session*
456tr_peerIoGetSession( tr_peerIo * io )
457{
458    assert( tr_isPeerIo( io ) );
459    assert( io->session );
460
461    return io->session;
462}
463
464const tr_address*
465tr_peerIoGetAddress( const tr_peerIo * io,
466                           tr_port   * port )
467{
468    assert( tr_isPeerIo( io ) );
469
470    if( port )
471        *port = io->port;
472
473    return &io->addr;
474}
475
476const char*
477tr_peerIoAddrStr( const tr_address * addr, tr_port port )
478{
479    static char buf[512];
480
481    if( addr->type == TR_AF_INET ) 
482        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
483    else 
484        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
485    return buf;
486}
487
488const char*
489tr_peerIoGetAddrStr( const tr_peerIo * io )
490{
491    return tr_peerIoAddrStr( &io->addr, io->port );
492}
493
494void
495tr_peerIoSetIOFuncs( tr_peerIo        * io,
496                     tr_can_read_cb     readcb,
497                     tr_did_write_cb    writecb,
498                     tr_net_error_cb    errcb,
499                     void             * userData )
500{
501    io->canRead = readcb;
502    io->didWrite = writecb;
503    io->gotError = errcb;
504    io->userData = userData;
505}
506
507tr_bool
508tr_peerIoIsIncoming( const tr_peerIo * c )
509{
510    return c->isIncoming != 0;
511}
512
513int
514tr_peerIoReconnect( tr_peerIo * io )
515{
516    assert( !tr_peerIoIsIncoming( io ) );
517
518    if( io->socket >= 0 )
519        tr_netClose( io->socket );
520
521    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port );
522
523    if( io->socket >= 0 )
524    {
525        tr_bandwidth * bandwidth = io->bandwidth;
526        tr_peerIoSetBandwidth( io, NULL );
527        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
528        tr_peerIoSetBandwidth( io, bandwidth );
529        return 0;
530    }
531
532    return -1;
533}
534
535/**
536***
537**/
538
539void
540tr_peerIoSetTorrentHash( tr_peerIo *     io,
541                         const uint8_t * hash )
542{
543    assert( tr_isPeerIo( io ) );
544
545    tr_cryptoSetTorrentHash( io->crypto, hash );
546}
547
548const uint8_t*
549tr_peerIoGetTorrentHash( tr_peerIo * io )
550{
551    assert( tr_isPeerIo( io ) );
552    assert( io->crypto );
553
554    return tr_cryptoGetTorrentHash( io->crypto );
555}
556
557int
558tr_peerIoHasTorrentHash( const tr_peerIo * io )
559{
560    assert( tr_isPeerIo( io ) );
561    assert( io->crypto );
562
563    return tr_cryptoHasTorrentHash( io->crypto );
564}
565
566/**
567***
568**/
569
570void
571tr_peerIoSetPeersId( tr_peerIo *     io,
572                     const uint8_t * peer_id )
573{
574    assert( tr_isPeerIo( io ) );
575
576    if( ( io->peerIdIsSet = peer_id != NULL ) )
577        memcpy( io->peerId, peer_id, 20 );
578    else
579        memset( io->peerId, 0, 20 );
580}
581
582const uint8_t*
583tr_peerIoGetPeersId( const tr_peerIo * io )
584{
585    assert( tr_isPeerIo( io ) );
586    assert( io->peerIdIsSet );
587
588    return io->peerId;
589}
590
591/**
592***
593**/
594
595void
596tr_peerIoEnableFEXT( tr_peerIo * io,
597                     tr_bool     flag )
598{
599    assert( tr_isPeerIo( io ) );
600    assert( _isBool( flag ) );
601
602    dbgmsg( io, "setting FEXT support flag to %d", (flag!=0) );
603    io->fastExtensionSupported = flag;
604}
605
606tr_bool
607tr_peerIoSupportsFEXT( const tr_peerIo * io )
608{
609    assert( tr_isPeerIo( io ) );
610
611    return io->fastExtensionSupported;
612}
613
614/**
615***
616**/
617
618void
619tr_peerIoEnableLTEP( tr_peerIo  * io,
620                     tr_bool      flag )
621{
622    assert( tr_isPeerIo( io ) );
623    assert( _isBool( flag ) );
624
625    dbgmsg( io, "setting LTEP support flag to %d", (flag!=0) );
626    io->extendedProtocolSupported = flag;
627}
628
629tr_bool
630tr_peerIoSupportsLTEP( const tr_peerIo * io )
631{
632    assert( tr_isPeerIo( io ) );
633
634    return io->extendedProtocolSupported;
635}
636
637/**
638***
639**/
640
641static size_t
642getDesiredOutputBufferSize( const tr_peerIo * io )
643{
644    /* this is all kind of arbitrary, but what seems to work well is
645     * being large enough to hold the next 20 seconds' worth of input,
646     * or a few blocks, whichever is bigger.
647     * It's okay to tweak this as needed */
648    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
649    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
650    const double period = 20; /* arbitrary */
651    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
652    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
653}
654
655size_t
656tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
657{
658    const size_t desiredLen = getDesiredOutputBufferSize( io );
659    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
660    size_t freeSpace = 0;
661
662    if( desiredLen > currentLen )
663        freeSpace = desiredLen - currentLen;
664
665    return freeSpace;
666}
667
668void
669tr_peerIoSetBandwidth( tr_peerIo     * io,
670                       tr_bandwidth  * bandwidth )
671{
672    assert( tr_isPeerIo( io ) );
673
674    if( io->bandwidth )
675        tr_bandwidthRemovePeer( io->bandwidth, io );
676
677    io->bandwidth = bandwidth;
678
679    if( io->bandwidth )
680        tr_bandwidthAddPeer( io->bandwidth, io );
681}
682
683/**
684***
685**/
686
687tr_crypto*
688tr_peerIoGetCrypto( tr_peerIo * c )
689{
690    return c->crypto;
691}
692
693void
694tr_peerIoSetEncryption( tr_peerIo * io,
695                        int         encryptionMode )
696{
697    assert( tr_isPeerIo( io ) );
698    assert( encryptionMode == PEER_ENCRYPTION_NONE
699          || encryptionMode == PEER_ENCRYPTION_RC4 );
700
701    io->encryptionMode = encryptionMode;
702}
703
704int
705tr_peerIoIsEncrypted( const tr_peerIo * io )
706{
707    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
708}
709
710/**
711***
712**/
713
714void
715tr_peerIoWrite( tr_peerIo   * io,
716                const void  * writeme,
717                size_t        writemeLen,
718                int           isPieceData )
719{
720    struct tr_datatype * datatype;
721
722    assert( tr_amInEventThread( io->session ) );
723    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
724
725    datatype = tr_new( struct tr_datatype, 1 );
726    datatype->isPieceData = isPieceData != 0;
727    datatype->length = writemeLen;
728
729    __tr_list_init( &datatype->head );
730    __tr_list_append( &io->outbuf_datatypes, &datatype->head );
731
732    evbuffer_add( io->outbuf, writeme, writemeLen );
733}
734
735void
736tr_peerIoWriteBuf( tr_peerIo         * io,
737                   struct evbuffer   * buf,
738                   int                 isPieceData )
739{
740    const size_t n = EVBUFFER_LENGTH( buf );
741
742    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
743    evbuffer_drain( buf, n );
744}
745
746/**
747***
748**/
749
750void
751tr_peerIoWriteBytes( tr_peerIo       * io,
752                     struct evbuffer * outbuf,
753                     const void      * bytes,
754                     size_t            byteCount )
755{
756    uint8_t * tmp;
757
758    switch( io->encryptionMode )
759    {
760        case PEER_ENCRYPTION_NONE:
761            evbuffer_add( outbuf, bytes, byteCount );
762            break;
763
764        case PEER_ENCRYPTION_RC4:
765            tmp = tr_new( uint8_t, byteCount );
766            tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
767            evbuffer_add( outbuf, tmp, byteCount );
768            tr_free( tmp );
769            break;
770
771        default:
772            assert( 0 );
773    }
774}
775
776void
777tr_peerIoWriteUint8( tr_peerIo       * io,
778                     struct evbuffer * outbuf,
779                     uint8_t           writeme )
780{
781    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
782}
783
784void
785tr_peerIoWriteUint16( tr_peerIo       * io,
786                      struct evbuffer * outbuf,
787                      uint16_t          writeme )
788{
789    uint16_t tmp = htons( writeme );
790
791    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
792}
793
794void
795tr_peerIoWriteUint32( tr_peerIo       * io,
796                      struct evbuffer * outbuf,
797                      uint32_t          writeme )
798{
799    uint32_t tmp = htonl( writeme );
800
801    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
802}
803
804/***
805****
806***/
807
808void
809tr_peerIoReadBytes( tr_peerIo       * io,
810                    struct evbuffer * inbuf,
811                    void            * bytes,
812                    size_t            byteCount )
813{
814    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
815
816    switch( io->encryptionMode )
817    {
818        case PEER_ENCRYPTION_NONE:
819            evbuffer_remove( inbuf, bytes, byteCount );
820            break;
821
822        case PEER_ENCRYPTION_RC4:
823            evbuffer_remove( inbuf, bytes, byteCount );
824            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
825            break;
826
827        default:
828            assert( 0 );
829    }
830}
831
832void
833tr_peerIoReadUint8( tr_peerIo       * io,
834                    struct evbuffer * inbuf,
835                    uint8_t         * setme )
836{
837    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
838}
839
840void
841tr_peerIoReadUint16( tr_peerIo       * io,
842                     struct evbuffer * inbuf,
843                     uint16_t        * setme )
844{
845    uint16_t tmp;
846
847    assert( tr_isPeerIo( io ) );
848
849    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
850    *setme = ntohs( tmp );
851}
852
853void
854tr_peerIoReadUint32( tr_peerIo       * io,
855                     struct evbuffer * inbuf,
856                     uint32_t        * setme )
857{
858    uint32_t tmp;
859
860    assert( tr_isPeerIo( io ) );
861
862    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
863    *setme = ntohl( tmp );
864}
865
866void
867tr_peerIoDrain( tr_peerIo       * io,
868                struct evbuffer * inbuf,
869                size_t            byteCount )
870{
871    uint8_t * tmp;
872
873    assert( tr_isPeerIo( io ) );
874
875    tmp = tr_new( uint8_t, byteCount );
876    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
877    tr_free( tmp );
878}
879
880int
881tr_peerIoGetAge( const tr_peerIo * io )
882{
883    return time( NULL ) - io->timeCreated;
884}
885
886/***
887****
888***/
889
890static int
891tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
892{
893    int res;
894
895    assert( tr_isPeerIo( io ) );
896
897    howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch );
898
899    res = howmuch ? evbuffer_read( io->inbuf, io->socket, howmuch ) : 0;
900
901    dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(errno):"") );
902
903    if( EVBUFFER_LENGTH( io->inbuf ) )
904        canReadWrapper( io );
905
906    if( ( res <= 0 ) && ( io->gotError ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
907    {
908        short what = EVBUFFER_READ | EVBUFFER_ERROR;
909        if( res == 0 )
910            what |= EVBUFFER_EOF;
911        io->gotError( io, what, io->userData );
912    }
913
914    return res;
915}
916
917static int
918tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
919{
920    int n;
921
922    assert( tr_isPeerIo( io ) );
923
924    howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch );
925
926    n = tr_evbuffer_write( io, io->socket, (int)howmuch );
927
928    if( n > 0 )
929        didWriteWrapper( io, n );
930
931    if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) {
932        short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
933        io->gotError( io, what, io->userData );
934    }
935
936    return n;
937}
938
939int
940tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
941{
942    int ret;
943
944    assert( tr_isPeerIo( io ) );
945    assert( tr_isDirection( dir ) );
946
947    if( dir==TR_DOWN )
948        ret = tr_peerIoTryRead( io, limit );
949    else
950        ret = tr_peerIoTryWrite( io, limit );
951
952    return ret;
953}
954
955struct evbuffer *
956tr_peerIoGetReadBuffer( tr_peerIo * io )
957{
958    assert( tr_isPeerIo( io ) );
959
960    return io->inbuf;
961}
962
963tr_bool
964tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
965{
966    assert( tr_isPeerIo( io ) );
967    assert( tr_isDirection( dir ) );
968
969    return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
970}
971
972/***
973****
974****/
975
976static void
977event_enable( tr_peerIo * io, short event )
978{
979    assert( tr_isPeerIo( io ) );
980
981    if( event & EV_READ )
982        event_add( &io->event_read, NULL );
983
984    if( event & EV_WRITE )
985        event_add( &io->event_write, NULL );
986}
987
988static void
989event_disable( struct tr_peerIo * io, short event )
990{
991    assert( tr_isPeerIo( io ) );
992
993    if( event & EV_READ )
994        event_del( &io->event_read );
995
996    if( event & EV_WRITE )
997        event_del( &io->event_write );
998}
999
1000
1001void
1002tr_peerIoSetEnabled( tr_peerIo    * io,
1003                     tr_direction   dir,
1004                     tr_bool        isEnabled )
1005{
1006    short event;
1007
1008    assert( tr_isPeerIo( io ) );
1009    assert( tr_isDirection( dir ) );
1010
1011    event = dir == TR_UP ? EV_WRITE : EV_READ;
1012
1013    if( isEnabled )
1014        event_enable( io, event );
1015    else
1016        event_disable( io, event );
1017}
Note: See TracBrowser for help on using the repository browser.