source: trunk/libtransmission/peer-io.c @ 7474

Last change on this file since 7474 was 7474, checked in by charles, 12 years ago

(trunk libT) socket performance tweak by wereHamster

  • Property svn:keywords set to Date Rev Author Id
File size: 23.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7474 2008-12-23 17:11:31Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <arpa/inet.h> /* inet_ntoa */
23#endif
24
25#include <event.h>
26
27#include "transmission.h"
28#include "bandwidth.h"
29#include "crypto.h"
30#include "list.h"
31#include "net.h"
32#include "peer-io.h"
33#include "trevent.h"
34#include "utils.h"
35
36#define MAGIC_NUMBER 206745
37
38static size_t
39getPacketOverhead( size_t d )
40{
41    /**
42     * http://sd.wareonearth.com/~phil/net/overhead/
43     *
44     * TCP over Ethernet:
45     * Assuming no header compression (e.g. not PPP)
46     * Add 20 IPv4 header or 40 IPv6 header (no options)
47     * Add 20 TCP header
48     * Add 12 bytes optional TCP timestamps
49     * Max TCP Payload data rates over ethernet are thus:
50     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
51     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
52     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
53     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
54     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
55     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
56     */
57    static const double assumed_payload_data_rate = 94.0;
58
59    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
60}
61
62/**
63***
64**/
65
66#define dbgmsg( io, ... ) \
67    do { \
68        if( tr_deepLoggingIsActive( ) ) \
69            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
70    } while( 0 )
71
72struct tr_datatype
73{
74    tr_bool  isPieceData;
75    size_t   length;
76    struct __tr_list head;
77};
78
79struct tr_peerIo
80{
81    tr_bool            isEncrypted;
82    tr_bool            isIncoming;
83    tr_bool            peerIdIsSet;
84    tr_bool            extendedProtocolSupported;
85    tr_bool            fastExtensionSupported;
86
87    int                magicNumber;
88
89    uint8_t            encryptionMode;
90    tr_port            port;
91    int                socket;
92
93    uint8_t            peerId[20];
94    time_t             timeCreated;
95
96    tr_session       * session;
97
98    tr_address         addr;
99
100    tr_can_read_cb     canRead;
101    tr_did_write_cb    didWrite;
102    tr_net_error_cb    gotError;
103    void *             userData;
104
105    tr_bandwidth     * bandwidth;
106    tr_crypto        * crypto;
107
108    struct evbuffer  * inbuf;
109    struct evbuffer  * outbuf;
110    struct __tr_list   outbuf_datatypes; /* struct tr_datatype */
111
112    struct event       event_read;
113    struct event       event_write;
114};
115
116/***
117****
118***/
119
120static void
121didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
122{
123    while( bytes_transferred )
124    {
125        struct tr_datatype * next = __tr_list_entry( io->outbuf_datatypes.next,
126                                                     struct tr_datatype, head );
127        const size_t payload = MIN( next->length, bytes_transferred );
128        const size_t overhead = getPacketOverhead( payload );
129
130        tr_bandwidthUsed( io->bandwidth, TR_UP, payload, next->isPieceData );
131
132        if( overhead > 0 )
133            tr_bandwidthUsed( io->bandwidth, TR_UP, overhead, FALSE );
134
135        if( io->didWrite )
136            io->didWrite( io, payload, next->isPieceData, io->userData );
137
138        bytes_transferred -= payload;
139        next->length -= payload;
140        if( !next->length ) {
141            __tr_list_remove( io->outbuf_datatypes.next );
142            tr_free( next );
143        }
144    }
145}
146
147static void
148canReadWrapper( tr_peerIo * io )
149{
150    tr_bool done = 0;
151    tr_bool err = 0;
152    tr_session * session = io->session;
153
154    dbgmsg( io, "canRead" );
155
156    /* try to consume the input buffer */
157    if( io->canRead )
158    {
159        tr_globalLock( session );
160
161        while( !done && !err )
162        {
163            size_t piece = 0;
164            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
165            const int ret = io->canRead( io, io->userData, &piece );
166
167            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
168
169            if( piece )
170                tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
171
172            if( used != piece )
173                tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
174
175            switch( ret )
176            {
177                case READ_NOW:
178                    if( EVBUFFER_LENGTH( io->inbuf ) )
179                        continue;
180                    done = 1;
181                    break;
182
183                case READ_LATER:
184                    done = 1;
185                    break;
186
187                case READ_ERR:
188                    err = 1;
189                    break;
190            }
191        }
192
193        tr_globalUnlock( session );
194    }
195}
196
197#define _isBool(b) (((b)==0 || (b)==1))
198
199tr_bool
200tr_isPeerIo( const tr_peerIo * io )
201{
202    return ( io != NULL )
203        && ( io->magicNumber == MAGIC_NUMBER )
204        && ( tr_isAddress( &io->addr ) )
205        && ( _isBool( io->isEncrypted ) )
206        && ( _isBool( io->isIncoming ) )
207        && ( _isBool( io->peerIdIsSet ) )
208        && ( _isBool( io->extendedProtocolSupported ) )
209        && ( _isBool( io->fastExtensionSupported ) );
210}
211
212static void
213event_read_cb( int fd, short event UNUSED, void * vio )
214{
215    int res;
216    tr_peerIo * io = vio;
217
218    /* Limit the input buffer to 256K, so it doesn't grow too large */
219    const size_t canread = 256 * 1024 - EVBUFFER_LENGTH( io->inbuf );
220    const size_t howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, canread );
221    const tr_direction dir = TR_DOWN;
222
223    assert( tr_isPeerIo( io ) );
224
225    dbgmsg( io, "libevent says this peer is ready to read" );
226
227    /* if we don't have any bandwidth left, stop reading */
228    if( howmuch < 1 ) {
229        tr_peerIoSetEnabled( io, dir, FALSE );
230        return;
231    }
232
233    res = evbuffer_read( io->inbuf, fd, howmuch );
234
235    if( res > 0 )
236    {
237        tr_peerIoSetEnabled( io, dir, TRUE );
238
239        /* Invoke the user callback - must always be called last */
240        canReadWrapper( io );
241    }
242    else
243    {
244        short what = EVBUFFER_READ;
245
246        if( res == 0 ) /* EOF */
247            what |= EVBUFFER_EOF;
248        else if( res == -1 ) {
249            if( errno == EAGAIN || errno == EINTR ) {
250                tr_peerIoSetEnabled( io, dir, TRUE );
251                return;
252            }
253            what |= EVBUFFER_ERROR;
254        }
255
256        if( io->gotError != NULL )
257            io->gotError( io, what, io->userData );
258    }
259}
260
261static int
262tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
263{
264    struct evbuffer * buffer = io->outbuf;
265    int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
266
267#ifdef WIN32
268    n = send(fd, buffer->buffer, n,  0 );
269#else
270    n = write(fd, buffer->buffer, n );
271#endif
272    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
273
274    if( n == -1 )
275        return -1;
276    if (n == 0)
277        return 0;
278    evbuffer_drain( buffer, n );
279
280    return n;
281}
282
283static void
284event_write_cb( int fd, short event UNUSED, void * vio )
285{
286    int res = 0;
287    short what = EVBUFFER_WRITE;
288    tr_peerIo * io = vio;
289    size_t howmuch;
290    const tr_direction dir = TR_UP;
291
292    assert( tr_isPeerIo( io ) );
293
294    dbgmsg( io, "libevent says this peer is ready to write" );
295
296    /* Write as much as possible, since the socket is non-blocking, write() will
297     * return if it can't write any more data without blocking */
298    howmuch = tr_bandwidthClamp( io->bandwidth, dir, EVBUFFER_LENGTH( io->outbuf ) );
299
300    /* if we don't have any bandwidth left, stop writing */
301    if( howmuch < 1 ) {
302        tr_peerIoSetEnabled( io, dir, FALSE );
303        return;
304    }
305
306    res = tr_evbuffer_write( io, fd, howmuch );
307    if (res == -1) {
308#ifndef WIN32
309/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
310 *  *set errno. thus this error checking is not portable*/
311        if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
312            goto reschedule;
313        /* error case */
314        what |= EVBUFFER_ERROR;
315
316#else
317        goto reschedule;
318#endif
319
320    } else if (res == 0) {
321        /* eof case */
322        what |= EVBUFFER_EOF;
323    }
324    if (res <= 0)
325        goto error;
326
327    if( EVBUFFER_LENGTH( io->outbuf ) )
328        tr_peerIoSetEnabled( io, dir, TRUE );
329
330    didWriteWrapper( io, res );
331    return;
332
333 reschedule:
334    if( EVBUFFER_LENGTH( io->outbuf ) )
335        tr_peerIoSetEnabled( io, dir, TRUE );
336    return;
337
338 error:
339    if( io->gotError != NULL )
340        io->gotError( io, what, io->userData );
341}
342
343/**
344***
345**/
346
347static tr_peerIo*
348tr_peerIoNew( tr_session       * session,
349              const tr_address * addr,
350              tr_port            port,
351              const uint8_t    * torrentHash,
352              int                isIncoming,
353              int                socket )
354{
355    tr_peerIo * io;
356
357    if( socket >= 0 )
358        tr_netSetTOS( socket, session->peerSocketTOS );
359
360    io = tr_new0( tr_peerIo, 1 );
361    io->magicNumber = MAGIC_NUMBER;
362    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
363    io->session = session;
364    io->addr = *addr;
365    io->port = port;
366    io->socket = socket;
367    io->isIncoming = isIncoming != 0;
368    io->timeCreated = time( NULL );
369    io->inbuf = evbuffer_new( );
370    io->outbuf = evbuffer_new( );
371
372    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
373    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
374
375    __tr_list_init( &io->outbuf_datatypes );
376
377    tr_peerIoSetBandwidth( io, session->bandwidth );
378
379    return io;
380}
381
382tr_peerIo*
383tr_peerIoNewIncoming( tr_session       * session,
384                      const tr_address * addr,
385                      tr_port            port,
386                      int                socket )
387{
388    assert( session );
389    assert( tr_isAddress( addr ) );
390    assert( socket >= 0 );
391
392    return tr_peerIoNew( session, addr, port, NULL, 1, socket );
393}
394
395tr_peerIo*
396tr_peerIoNewOutgoing( tr_session       * session,
397                      const tr_address * addr,
398                      tr_port            port,
399                      const uint8_t    * torrentHash )
400{
401    int socket;
402
403    assert( session );
404    assert( tr_isAddress( addr ) );
405    assert( torrentHash );
406
407    socket = tr_netOpenTCP( session, addr, port );
408
409    return socket < 0
410           ? NULL
411           : tr_peerIoNew( session, addr, port, torrentHash, 0, socket );
412}
413
414static void
415trDatatypeFree( void * data )
416{
417    struct tr_datatype * dt = __tr_list_entry( data, struct tr_datatype, head );
418    tr_free(dt);
419}
420
421static void
422io_dtor( void * vio )
423{
424    tr_peerIo * io = vio;
425
426    event_del( &io->event_read );
427    event_del( &io->event_write );
428    tr_peerIoSetBandwidth( io, NULL );
429    evbuffer_free( io->outbuf );
430    evbuffer_free( io->inbuf );
431    tr_netClose( io->socket );
432    tr_cryptoFree( io->crypto );
433    __tr_list_destroy( &io->outbuf_datatypes, trDatatypeFree );
434
435    io->magicNumber = 0xDEAD;
436    tr_free( io );
437}
438
439void
440tr_peerIoFree( tr_peerIo * io )
441{
442    if( io )
443    {
444        io->canRead = NULL;
445        io->didWrite = NULL;
446        io->gotError = NULL;
447        tr_runInEventThread( io->session, io_dtor, io );
448    }
449}
450
451tr_session*
452tr_peerIoGetSession( tr_peerIo * io )
453{
454    assert( tr_isPeerIo( io ) );
455    assert( io->session );
456
457    return io->session;
458}
459
460const tr_address*
461tr_peerIoGetAddress( const tr_peerIo * io,
462                           tr_port   * port )
463{
464    assert( tr_isPeerIo( io ) );
465
466    if( port )
467        *port = io->port;
468
469    return &io->addr;
470}
471
472const char*
473tr_peerIoAddrStr( const tr_address * addr, tr_port port )
474{
475    static char buf[512];
476
477    if( addr->type == TR_AF_INET ) 
478        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
479    else 
480        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
481    return buf;
482}
483
484const char*
485tr_peerIoGetAddrStr( const tr_peerIo * io )
486{
487    return tr_peerIoAddrStr( &io->addr, io->port );
488}
489
490void
491tr_peerIoSetIOFuncs( tr_peerIo        * io,
492                     tr_can_read_cb     readcb,
493                     tr_did_write_cb    writecb,
494                     tr_net_error_cb    errcb,
495                     void             * userData )
496{
497    io->canRead = readcb;
498    io->didWrite = writecb;
499    io->gotError = errcb;
500    io->userData = userData;
501}
502
503tr_bool
504tr_peerIoIsIncoming( const tr_peerIo * c )
505{
506    return c->isIncoming != 0;
507}
508
509int
510tr_peerIoReconnect( tr_peerIo * io )
511{
512    assert( !tr_peerIoIsIncoming( io ) );
513
514    if( io->socket >= 0 )
515        tr_netClose( io->socket );
516
517    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port );
518
519    if( io->socket >= 0 )
520    {
521        tr_bandwidth * bandwidth = io->bandwidth;
522        tr_peerIoSetBandwidth( io, NULL );
523        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
524        tr_peerIoSetBandwidth( io, bandwidth );
525        return 0;
526    }
527
528    return -1;
529}
530
531/**
532***
533**/
534
535void
536tr_peerIoSetTorrentHash( tr_peerIo *     io,
537                         const uint8_t * hash )
538{
539    assert( tr_isPeerIo( io ) );
540
541    tr_cryptoSetTorrentHash( io->crypto, hash );
542}
543
544const uint8_t*
545tr_peerIoGetTorrentHash( tr_peerIo * io )
546{
547    assert( tr_isPeerIo( io ) );
548    assert( io->crypto );
549
550    return tr_cryptoGetTorrentHash( io->crypto );
551}
552
553int
554tr_peerIoHasTorrentHash( const tr_peerIo * io )
555{
556    assert( tr_isPeerIo( io ) );
557    assert( io->crypto );
558
559    return tr_cryptoHasTorrentHash( io->crypto );
560}
561
562/**
563***
564**/
565
566void
567tr_peerIoSetPeersId( tr_peerIo *     io,
568                     const uint8_t * peer_id )
569{
570    assert( tr_isPeerIo( io ) );
571
572    if( ( io->peerIdIsSet = peer_id != NULL ) )
573        memcpy( io->peerId, peer_id, 20 );
574    else
575        memset( io->peerId, 0, 20 );
576}
577
578const uint8_t*
579tr_peerIoGetPeersId( const tr_peerIo * io )
580{
581    assert( tr_isPeerIo( io ) );
582    assert( io->peerIdIsSet );
583
584    return io->peerId;
585}
586
587/**
588***
589**/
590
591void
592tr_peerIoEnableFEXT( tr_peerIo * io,
593                     tr_bool     flag )
594{
595    assert( tr_isPeerIo( io ) );
596    assert( _isBool( flag ) );
597
598    dbgmsg( io, "setting FEXT support flag to %d", (flag!=0) );
599    io->fastExtensionSupported = flag;
600}
601
602tr_bool
603tr_peerIoSupportsFEXT( const tr_peerIo * io )
604{
605    assert( tr_isPeerIo( io ) );
606
607    return io->fastExtensionSupported;
608}
609
610/**
611***
612**/
613
614void
615tr_peerIoEnableLTEP( tr_peerIo  * io,
616                     tr_bool      flag )
617{
618    assert( tr_isPeerIo( io ) );
619    assert( _isBool( flag ) );
620
621    dbgmsg( io, "setting LTEP support flag to %d", (flag!=0) );
622    io->extendedProtocolSupported = flag;
623}
624
625tr_bool
626tr_peerIoSupportsLTEP( const tr_peerIo * io )
627{
628    assert( tr_isPeerIo( io ) );
629
630    return io->extendedProtocolSupported;
631}
632
633/**
634***
635**/
636
637static size_t
638getDesiredOutputBufferSize( const tr_peerIo * io )
639{
640    /* this is all kind of arbitrary, but what seems to work well is
641     * being large enough to hold the next 20 seconds' worth of input,
642     * or a few blocks, whichever is bigger.
643     * It's okay to tweak this as needed */
644    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
645    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
646    const double period = 20; /* arbitrary */
647    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
648    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
649}
650
651size_t
652tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
653{
654    const size_t desiredLen = getDesiredOutputBufferSize( io );
655    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
656    size_t freeSpace = 0;
657
658    if( desiredLen > currentLen )
659        freeSpace = desiredLen - currentLen;
660
661    return freeSpace;
662}
663
664void
665tr_peerIoSetBandwidth( tr_peerIo     * io,
666                       tr_bandwidth  * bandwidth )
667{
668    assert( tr_isPeerIo( io ) );
669
670    if( io->bandwidth )
671        tr_bandwidthRemovePeer( io->bandwidth, io );
672
673    io->bandwidth = bandwidth;
674
675    if( io->bandwidth )
676        tr_bandwidthAddPeer( io->bandwidth, io );
677}
678
679/**
680***
681**/
682
683tr_crypto*
684tr_peerIoGetCrypto( tr_peerIo * c )
685{
686    return c->crypto;
687}
688
689void
690tr_peerIoSetEncryption( tr_peerIo * io,
691                        int         encryptionMode )
692{
693    assert( tr_isPeerIo( io ) );
694    assert( encryptionMode == PEER_ENCRYPTION_NONE
695          || encryptionMode == PEER_ENCRYPTION_RC4 );
696
697    io->encryptionMode = encryptionMode;
698}
699
700int
701tr_peerIoIsEncrypted( const tr_peerIo * io )
702{
703    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
704}
705
706/**
707***
708**/
709
710void
711tr_peerIoWrite( tr_peerIo   * io,
712                const void  * writeme,
713                size_t        writemeLen,
714                int           isPieceData )
715{
716    struct tr_datatype * datatype;
717
718    assert( tr_amInEventThread( io->session ) );
719    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
720
721    datatype = tr_new( struct tr_datatype, 1 );
722    datatype->isPieceData = isPieceData != 0;
723    datatype->length = writemeLen;
724
725    __tr_list_init( &datatype->head );
726    __tr_list_append( &io->outbuf_datatypes, &datatype->head );
727
728    evbuffer_add( io->outbuf, writeme, writemeLen );
729}
730
731void
732tr_peerIoWriteBuf( tr_peerIo         * io,
733                   struct evbuffer   * buf,
734                   int                 isPieceData )
735{
736    const size_t n = EVBUFFER_LENGTH( buf );
737
738    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
739    evbuffer_drain( buf, n );
740}
741
742/**
743***
744**/
745
746void
747tr_peerIoWriteBytes( tr_peerIo       * io,
748                     struct evbuffer * outbuf,
749                     const void      * bytes,
750                     size_t            byteCount )
751{
752    uint8_t * tmp;
753
754    switch( io->encryptionMode )
755    {
756        case PEER_ENCRYPTION_NONE:
757            evbuffer_add( outbuf, bytes, byteCount );
758            break;
759
760        case PEER_ENCRYPTION_RC4:
761            tmp = tr_new( uint8_t, byteCount );
762            tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
763            evbuffer_add( outbuf, tmp, byteCount );
764            tr_free( tmp );
765            break;
766
767        default:
768            assert( 0 );
769    }
770}
771
772void
773tr_peerIoWriteUint8( tr_peerIo       * io,
774                     struct evbuffer * outbuf,
775                     uint8_t           writeme )
776{
777    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
778}
779
780void
781tr_peerIoWriteUint16( tr_peerIo       * io,
782                      struct evbuffer * outbuf,
783                      uint16_t          writeme )
784{
785    uint16_t tmp = htons( writeme );
786
787    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
788}
789
790void
791tr_peerIoWriteUint32( tr_peerIo       * io,
792                      struct evbuffer * outbuf,
793                      uint32_t          writeme )
794{
795    uint32_t tmp = htonl( writeme );
796
797    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
798}
799
800/***
801****
802***/
803
804void
805tr_peerIoReadBytes( tr_peerIo       * io,
806                    struct evbuffer * inbuf,
807                    void            * bytes,
808                    size_t            byteCount )
809{
810    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
811
812    switch( io->encryptionMode )
813    {
814        case PEER_ENCRYPTION_NONE:
815            evbuffer_remove( inbuf, bytes, byteCount );
816            break;
817
818        case PEER_ENCRYPTION_RC4:
819            evbuffer_remove( inbuf, bytes, byteCount );
820            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
821            break;
822
823        default:
824            assert( 0 );
825    }
826}
827
828void
829tr_peerIoReadUint8( tr_peerIo       * io,
830                    struct evbuffer * inbuf,
831                    uint8_t         * setme )
832{
833    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
834}
835
836void
837tr_peerIoReadUint16( tr_peerIo       * io,
838                     struct evbuffer * inbuf,
839                     uint16_t        * setme )
840{
841    uint16_t tmp;
842
843    assert( tr_isPeerIo( io ) );
844
845    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
846    *setme = ntohs( tmp );
847}
848
849void
850tr_peerIoReadUint32( tr_peerIo       * io,
851                     struct evbuffer * inbuf,
852                     uint32_t        * setme )
853{
854    uint32_t tmp;
855
856    assert( tr_isPeerIo( io ) );
857
858    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
859    *setme = ntohl( tmp );
860}
861
862void
863tr_peerIoDrain( tr_peerIo       * io,
864                struct evbuffer * inbuf,
865                size_t            byteCount )
866{
867    uint8_t * tmp;
868
869    assert( tr_isPeerIo( io ) );
870
871    tmp = tr_new( uint8_t, byteCount );
872    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
873    tr_free( tmp );
874}
875
876int
877tr_peerIoGetAge( const tr_peerIo * io )
878{
879    return time( NULL ) - io->timeCreated;
880}
881
882/***
883****
884***/
885
886static int
887tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
888{
889    int res;
890
891    assert( tr_isPeerIo( io ) );
892
893    howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch );
894
895    res = howmuch ? evbuffer_read( io->inbuf, io->socket, howmuch ) : 0;
896
897    dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(errno):"") );
898
899    if( EVBUFFER_LENGTH( io->inbuf ) )
900        canReadWrapper( io );
901
902    if( ( res <= 0 ) && ( io->gotError ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
903    {
904        short what = EVBUFFER_READ | EVBUFFER_ERROR;
905        if( res == 0 )
906            what |= EVBUFFER_EOF;
907        io->gotError( io, what, io->userData );
908    }
909
910    return res;
911}
912
913static int
914tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
915{
916    int n;
917
918    assert( tr_isPeerIo( io ) );
919
920    howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch );
921
922    n = tr_evbuffer_write( io, io->socket, (int)howmuch );
923
924    if( n > 0 )
925        didWriteWrapper( io, n );
926
927    if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) {
928        short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
929        io->gotError( io, what, io->userData );
930    }
931
932    return n;
933}
934
935int
936tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
937{
938    int ret;
939
940    assert( tr_isPeerIo( io ) );
941    assert( tr_isDirection( dir ) );
942
943    if( dir==TR_DOWN )
944        ret = tr_peerIoTryRead( io, limit );
945    else
946        ret = tr_peerIoTryWrite( io, limit );
947
948    return ret;
949}
950
951struct evbuffer *
952tr_peerIoGetReadBuffer( tr_peerIo * io )
953{
954    assert( tr_isPeerIo( io ) );
955
956    return io->inbuf;
957}
958
959tr_bool
960tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
961{
962    assert( tr_isPeerIo( io ) );
963    assert( tr_isDirection( dir ) );
964
965    return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
966}
967
968/***
969****
970****/
971
972static void
973event_enable( tr_peerIo * io, short event )
974{
975    assert( tr_isPeerIo( io ) );
976
977    if( event & EV_READ )
978        event_add( &io->event_read, NULL );
979
980    if( event & EV_WRITE )
981        event_add( &io->event_write, NULL );
982}
983
984static void
985event_disable( struct tr_peerIo * io, short event )
986{
987    assert( tr_isPeerIo( io ) );
988
989    if( event & EV_READ )
990        event_del( &io->event_read );
991
992    if( event & EV_WRITE )
993        event_del( &io->event_write );
994}
995
996
997void
998tr_peerIoSetEnabled( tr_peerIo    * io,
999                     tr_direction   dir,
1000                     tr_bool        isEnabled )
1001{
1002    short event;
1003
1004    assert( tr_isPeerIo( io ) );
1005    assert( tr_isDirection( dir ) );
1006
1007    event = dir == TR_UP ? EV_WRITE : EV_READ;
1008
1009    if( isEnabled )
1010        event_enable( io, event );
1011    else
1012        event_disable( io, event );
1013}
Note: See TracBrowser for help on using the repository browser.