source: trunk/libtransmission/peer-io.c @ 7456

Last change on this file since 7456 was 7456, checked in by charles, 12 years ago

(trunk libT) minor cleanups found while diffing for backport to 1.4x in r7455

  • Property svn:keywords set to Date Rev Author Id
File size: 22.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7456 2008-12-22 00:52:44Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <arpa/inet.h> /* inet_ntoa */
23#endif
24
25#include <event.h>
26
27#include "transmission.h"
28#include "bandwidth.h"
29#include "crypto.h"
30#include "list.h"
31#include "net.h"
32#include "peer-io.h"
33#include "trevent.h"
34#include "utils.h"
35
36#define MAGIC_NUMBER 206745
37
38static size_t
39getPacketOverhead( size_t d )
40{
41    /**
42     * http://sd.wareonearth.com/~phil/net/overhead/
43     *
44     * TCP over Ethernet:
45     * Assuming no header compression (e.g. not PPP)
46     * Add 20 IPv4 header or 40 IPv6 header (no options)
47     * Add 20 TCP header
48     * Add 12 bytes optional TCP timestamps
49     * Max TCP Payload data rates over ethernet are thus:
50     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
51     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
52     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
53     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
54     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
55     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
56     */
57    static const double assumed_payload_data_rate = 94.0;
58
59    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
60}
61
62/**
63***
64**/
65
66#define dbgmsg( io, ... ) \
67    do { \
68        if( tr_deepLoggingIsActive( ) ) \
69            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
70    } while( 0 )
71
72struct tr_datatype
73{
74    tr_bool  isPieceData;
75    size_t   length;
76};
77
78struct tr_peerIo
79{
80    tr_bool            isEncrypted;
81    tr_bool            isIncoming;
82    tr_bool            peerIdIsSet;
83    tr_bool            extendedProtocolSupported;
84    tr_bool            fastExtensionSupported;
85
86    int                magicNumber;
87
88    uint8_t            encryptionMode;
89    tr_port            port;
90    int                socket;
91
92    uint8_t            peerId[20];
93    time_t             timeCreated;
94
95    tr_session       * session;
96
97    tr_address         addr;
98    tr_list          * output_datatypes; /* struct tr_datatype */
99
100    tr_can_read_cb     canRead;
101    tr_did_write_cb    didWrite;
102    tr_net_error_cb    gotError;
103    void *             userData;
104
105    size_t             bufferSize[2];
106
107    tr_bandwidth     * bandwidth;
108    tr_crypto        * crypto;
109
110    struct evbuffer  * inbuf;
111    struct evbuffer  * outbuf;
112
113    struct event       event_read;
114    struct event       event_write;
115};
116
117/***
118****
119***/
120
121static void
122didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
123{
124    while( bytes_transferred )
125    {
126        struct tr_datatype * next = io->output_datatypes->data;
127        const size_t payload = MIN( next->length, bytes_transferred );
128        const size_t overhead = getPacketOverhead( payload );
129
130        tr_bandwidthUsed( io->bandwidth, TR_UP, payload, next->isPieceData );
131
132        if( overhead > 0 )
133            tr_bandwidthUsed( io->bandwidth, TR_UP, overhead, FALSE );
134
135        if( io->didWrite )
136            io->didWrite( io, payload, next->isPieceData, io->userData );
137
138        bytes_transferred -= payload;
139        next->length -= payload;
140        if( !next->length )
141            tr_free( tr_list_pop_front( &io->output_datatypes ) );
142    }
143}
144
145static void
146canReadWrapper( tr_peerIo * io )
147{
148    tr_bool done = 0;
149    tr_bool err = 0;
150    tr_session * session = io->session;
151
152    dbgmsg( io, "canRead" );
153
154    /* try to consume the input buffer */
155    if( io->canRead )
156    {
157        tr_globalLock( session );
158
159        while( !done && !err )
160        {
161            size_t piece = 0;
162            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
163            const int ret = io->canRead( io, io->userData, &piece );
164
165            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
166
167            if( piece )
168                tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
169
170            if( used != piece )
171                tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
172
173            switch( ret )
174            {
175                case READ_NOW:
176                    if( EVBUFFER_LENGTH( io->inbuf ) )
177                        continue;
178                    done = 1;
179                    break;
180
181                case READ_LATER:
182                    done = 1;
183                    break;
184
185                case READ_ERR:
186                    err = 1;
187                    break;
188            }
189        }
190
191        tr_globalUnlock( session );
192    }
193}
194
195#define _isBool(b) (((b)==0 || (b)==1))
196
197tr_bool
198tr_isPeerIo( const tr_peerIo * io )
199{
200    return ( io != NULL )
201        && ( io->magicNumber == MAGIC_NUMBER )
202        && ( tr_isAddress( &io->addr ) )
203        && ( _isBool( io->isEncrypted ) )
204        && ( _isBool( io->isIncoming ) )
205        && ( _isBool( io->peerIdIsSet ) )
206        && ( _isBool( io->extendedProtocolSupported ) )
207        && ( _isBool( io->fastExtensionSupported ) );
208}
209
210static void
211event_read_cb( int fd, short event UNUSED, void * vio )
212{
213    int res;
214    short what = EVBUFFER_READ;
215    tr_peerIo * io = vio;
216    const size_t howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, io->session->so_rcvbuf );
217    const tr_direction dir = TR_DOWN;
218
219    assert( tr_isPeerIo( io ) );
220
221    dbgmsg( io, "libevent says this peer is ready to read" );
222
223    /* if we don't have any bandwidth left, stop reading */
224    if( howmuch < 1 ) {
225        tr_peerIoSetEnabled( io, dir, FALSE );
226        return;
227    }
228
229    res = evbuffer_read( io->inbuf, fd, howmuch );
230    if( res == -1 ) {
231        if( errno == EAGAIN || errno == EINTR )
232            goto reschedule;
233        /* error case */
234        what |= EVBUFFER_ERROR;
235    } else if( res == 0 ) {
236        /* eof case */
237        what |= EVBUFFER_EOF;
238    }
239
240    if( res <= 0 )
241        goto error;
242
243    tr_peerIoSetEnabled( io, dir, TRUE );
244
245    /* Invoke the user callback - must always be called last */
246    canReadWrapper( io );
247
248    return;
249
250 reschedule:
251    tr_peerIoSetEnabled( io, dir, TRUE );
252    return;
253
254 error:
255    if( io->gotError != NULL )
256        io->gotError( io, what, io->userData );
257}
258
259static int
260tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
261{
262    struct evbuffer * buffer = io->outbuf;
263    int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
264
265#ifdef WIN32
266    n = send(fd, buffer->buffer, n,  0 );
267#else
268    n = write(fd, buffer->buffer, n );
269#endif
270    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
271
272    if( n == -1 )
273        return -1;
274    if (n == 0)
275        return 0;
276    evbuffer_drain( buffer, n );
277
278    return n;
279}
280
281static void
282event_write_cb( int fd, short event UNUSED, void * vio )
283{
284    int res = 0;
285    short what = EVBUFFER_WRITE;
286    tr_peerIo * io = vio;
287    size_t howmuch;
288    const tr_direction dir = TR_UP;
289
290    assert( tr_isPeerIo( io ) );
291
292    dbgmsg( io, "libevent says this peer is ready to write" );
293
294    howmuch = MIN( (size_t)io->session->so_sndbuf, EVBUFFER_LENGTH( io->outbuf ) );
295    howmuch = tr_bandwidthClamp( io->bandwidth, dir, howmuch );
296
297    /* if we don't have any bandwidth left, stop writing */
298    if( howmuch < 1 ) {
299        tr_peerIoSetEnabled( io, dir, FALSE );
300        return;
301    }
302
303    res = tr_evbuffer_write( io, fd, howmuch );
304    if (res == -1) {
305#ifndef WIN32
306/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
307 *  *set errno. thus this error checking is not portable*/
308        if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
309            goto reschedule;
310        /* error case */
311        what |= EVBUFFER_ERROR;
312
313#else
314        goto reschedule;
315#endif
316
317    } else if (res == 0) {
318        /* eof case */
319        what |= EVBUFFER_EOF;
320    }
321    if (res <= 0)
322        goto error;
323
324    if( EVBUFFER_LENGTH( io->outbuf ) )
325        tr_peerIoSetEnabled( io, dir, TRUE );
326
327    didWriteWrapper( io, res );
328    return;
329
330 reschedule:
331    if( EVBUFFER_LENGTH( io->outbuf ) )
332        tr_peerIoSetEnabled( io, dir, TRUE );
333    return;
334
335 error:
336    if( io->gotError != NULL )
337        io->gotError( io, what, io->userData );
338}
339
340/**
341***
342**/
343
344
345static int
346isFlag( int flag )
347{
348    return( ( flag == 0 ) || ( flag == 1 ) );
349}
350
351static tr_peerIo*
352tr_peerIoNew( tr_session       * session,
353              const tr_address * addr,
354              tr_port            port,
355              const uint8_t    * torrentHash,
356              int                isIncoming,
357              int                socket )
358{
359    tr_peerIo * io;
360
361    if( socket >= 0 )
362        tr_netSetTOS( socket, session->peerSocketTOS );
363
364    io = tr_new0( tr_peerIo, 1 );
365    io->magicNumber = MAGIC_NUMBER;
366    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
367    io->session = session;
368    io->addr = *addr;
369    io->port = port;
370    io->socket = socket;
371    io->isIncoming = isIncoming != 0;
372    io->timeCreated = time( NULL );
373    io->inbuf = evbuffer_new( );
374    io->outbuf = evbuffer_new( );
375    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
376    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
377#if 0
378    bufevNew( io );
379#endif
380    tr_peerIoSetBandwidth( io, session->bandwidth );
381    return io;
382}
383
384tr_peerIo*
385tr_peerIoNewIncoming( tr_session       * session,
386                      const tr_address * addr,
387                      tr_port            port,
388                      int                socket )
389{
390    assert( session );
391    assert( addr );
392    assert( socket >= 0 );
393
394    return tr_peerIoNew( session, addr, port, NULL, 1, socket );
395}
396
397tr_peerIo*
398tr_peerIoNewOutgoing( tr_session       * session,
399                      const tr_address * addr,
400                      tr_port            port,
401                      const uint8_t    * torrentHash )
402{
403    int socket;
404
405    assert( session );
406    assert( addr );
407    assert( torrentHash );
408
409    socket = tr_netOpenTCP( session, addr, port );
410
411    return socket < 0
412           ? NULL
413           : tr_peerIoNew( session, addr, port, torrentHash, 0, socket );
414}
415
416static void
417io_dtor( void * vio )
418{
419    tr_peerIo * io = vio;
420
421    event_del( &io->event_read );
422    event_del( &io->event_write );
423    tr_peerIoSetBandwidth( io, NULL );
424    evbuffer_free( io->outbuf );
425    evbuffer_free( io->inbuf );
426    tr_netClose( io->socket );
427    tr_cryptoFree( io->crypto );
428    tr_list_free( &io->output_datatypes, tr_free );
429
430    io->magicNumber = 0xDEAD;
431    tr_free( io );
432}
433
434void
435tr_peerIoFree( tr_peerIo * io )
436{
437    if( io )
438    {
439        io->canRead = NULL;
440        io->didWrite = NULL;
441        io->gotError = NULL;
442        tr_runInEventThread( io->session, io_dtor, io );
443    }
444}
445
446tr_session*
447tr_peerIoGetSession( tr_peerIo * io )
448{
449    assert( tr_isPeerIo( io ) );
450    assert( io->session );
451
452    return io->session;
453}
454
455const tr_address*
456tr_peerIoGetAddress( const tr_peerIo * io,
457                           tr_port   * port )
458{
459    assert( tr_isPeerIo( io ) );
460
461    if( port )
462        *port = io->port;
463
464    return &io->addr;
465}
466
467const char*
468tr_peerIoAddrStr( const tr_address * addr, tr_port port )
469{
470    static char buf[512];
471
472    if( addr->type == TR_AF_INET ) 
473        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
474    else 
475        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
476    return buf;
477}
478
479const char*
480tr_peerIoGetAddrStr( const tr_peerIo * io )
481{
482    return tr_peerIoAddrStr( &io->addr, io->port );
483}
484
485void
486tr_peerIoSetIOFuncs( tr_peerIo        * io,
487                     tr_can_read_cb     readcb,
488                     tr_did_write_cb    writecb,
489                     tr_net_error_cb    errcb,
490                     void             * userData )
491{
492    io->canRead = readcb;
493    io->didWrite = writecb;
494    io->gotError = errcb;
495    io->userData = userData;
496}
497
498tr_bool
499tr_peerIoIsIncoming( const tr_peerIo * c )
500{
501    return c->isIncoming != 0;
502}
503
504int
505tr_peerIoReconnect( tr_peerIo * io )
506{
507    assert( !tr_peerIoIsIncoming( io ) );
508
509    if( io->socket >= 0 )
510        tr_netClose( io->socket );
511
512    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port );
513
514    if( io->socket >= 0 )
515    {
516        tr_bandwidth * bandwidth = io->bandwidth;
517        tr_peerIoSetBandwidth( io, NULL );
518
519        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
520#if 0
521        bufevNew( io );
522#endif
523
524        tr_peerIoSetBandwidth( io, bandwidth );
525        return 0;
526    }
527
528    return -1;
529}
530
531/**
532***
533**/
534
535void
536tr_peerIoSetTorrentHash( tr_peerIo *     io,
537                         const uint8_t * hash )
538{
539    assert( tr_isPeerIo( io ) );
540
541    tr_cryptoSetTorrentHash( io->crypto, hash );
542}
543
544const uint8_t*
545tr_peerIoGetTorrentHash( tr_peerIo * io )
546{
547    assert( tr_isPeerIo( io ) );
548    assert( io->crypto );
549
550    return tr_cryptoGetTorrentHash( io->crypto );
551}
552
553int
554tr_peerIoHasTorrentHash( const tr_peerIo * io )
555{
556    assert( tr_isPeerIo( io ) );
557    assert( io->crypto );
558
559    return tr_cryptoHasTorrentHash( io->crypto );
560}
561
562/**
563***
564**/
565
566void
567tr_peerIoSetPeersId( tr_peerIo *     io,
568                     const uint8_t * peer_id )
569{
570    assert( tr_isPeerIo( io ) );
571
572    if( ( io->peerIdIsSet = peer_id != NULL ) )
573        memcpy( io->peerId, peer_id, 20 );
574    else
575        memset( io->peerId, 0, 20 );
576}
577
578const uint8_t*
579tr_peerIoGetPeersId( const tr_peerIo * io )
580{
581    assert( tr_isPeerIo( io ) );
582    assert( io->peerIdIsSet );
583
584    return io->peerId;
585}
586
587/**
588***
589**/
590
591void
592tr_peerIoEnableFEXT( tr_peerIo * io,
593                     tr_bool     flag )
594{
595    assert( tr_isPeerIo( io ) );
596    assert( isFlag( flag ) );
597
598    dbgmsg( io, "setting FEXT support flag to %d", (flag?1:0) );
599    io->fastExtensionSupported = flag;
600}
601
602tr_bool
603tr_peerIoSupportsFEXT( const tr_peerIo * io )
604{
605    assert( tr_isPeerIo( io ) );
606
607    return io->fastExtensionSupported;
608}
609
610/**
611***
612**/
613
614void
615tr_peerIoEnableLTEP( tr_peerIo  * io,
616                     tr_bool      flag )
617{
618    assert( tr_isPeerIo( io ) );
619    assert( isFlag( flag ) );
620
621    dbgmsg( io, "setting LTEP support flag to %d", (flag?1:0) );
622    io->extendedProtocolSupported = flag;
623}
624
625tr_bool
626tr_peerIoSupportsLTEP( const tr_peerIo * io )
627{
628    assert( tr_isPeerIo( io ) );
629
630    return io->extendedProtocolSupported;
631}
632
633/**
634***
635**/
636
637static size_t
638getDesiredOutputBufferSize( const tr_peerIo * io )
639{
640    /* this is all kind of arbitrary, but what seems to work well is
641     * being large enough to hold the next 20 seconds' worth of input,
642     * or a few blocks, whichever is bigger.
643     * It's okay to tweak this as needed */
644    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
645    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
646    const double period = 20; /* arbitrary */
647    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
648    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
649}
650
651size_t
652tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
653{
654    const size_t desiredLen = getDesiredOutputBufferSize( io );
655    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
656    size_t freeSpace = 0;
657
658    if( desiredLen > currentLen )
659        freeSpace = desiredLen - currentLen;
660
661    return freeSpace;
662}
663
664void
665tr_peerIoSetBandwidth( tr_peerIo     * io,
666                       tr_bandwidth  * bandwidth )
667{
668    assert( tr_isPeerIo( io ) );
669
670    if( io->bandwidth )
671        tr_bandwidthRemovePeer( io->bandwidth, io );
672
673    io->bandwidth = bandwidth;
674
675    if( io->bandwidth )
676        tr_bandwidthAddPeer( io->bandwidth, io );
677}
678
679/**
680***
681**/
682
683tr_crypto*
684tr_peerIoGetCrypto( tr_peerIo * c )
685{
686    return c->crypto;
687}
688
689void
690tr_peerIoSetEncryption( tr_peerIo * io,
691                        int         encryptionMode )
692{
693    assert( tr_isPeerIo( io ) );
694    assert( encryptionMode == PEER_ENCRYPTION_NONE
695          || encryptionMode == PEER_ENCRYPTION_RC4 );
696
697    io->encryptionMode = encryptionMode;
698}
699
700int
701tr_peerIoIsEncrypted( const tr_peerIo * io )
702{
703    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
704}
705
706/**
707***
708**/
709
710void
711tr_peerIoWrite( tr_peerIo   * io,
712                const void  * writeme,
713                size_t        writemeLen,
714                int           isPieceData )
715{
716    struct tr_datatype * datatype;
717    assert( tr_amInEventThread( io->session ) );
718    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
719
720    datatype = tr_new( struct tr_datatype, 1 );
721    datatype->isPieceData = isPieceData != 0;
722    datatype->length = writemeLen;
723    tr_list_append( &io->output_datatypes, datatype );
724
725    evbuffer_add( io->outbuf, writeme, writemeLen );
726}
727
728void
729tr_peerIoWriteBuf( tr_peerIo         * io,
730                   struct evbuffer   * buf,
731                   int                 isPieceData )
732{
733    const size_t n = EVBUFFER_LENGTH( buf );
734
735    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
736    evbuffer_drain( buf, n );
737}
738
739/**
740***
741**/
742
743void
744tr_peerIoWriteBytes( tr_peerIo *       io,
745                     struct evbuffer * outbuf,
746                     const void *      bytes,
747                     size_t            byteCount )
748{
749    uint8_t * tmp;
750
751    switch( io->encryptionMode )
752    {
753        case PEER_ENCRYPTION_NONE:
754            evbuffer_add( outbuf, bytes, byteCount );
755            break;
756
757        case PEER_ENCRYPTION_RC4:
758            tmp = tr_new( uint8_t, byteCount );
759            tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
760            evbuffer_add( outbuf, tmp, byteCount );
761            tr_free( tmp );
762            break;
763
764        default:
765            assert( 0 );
766    }
767}
768
769void
770tr_peerIoWriteUint8( tr_peerIo *       io,
771                     struct evbuffer * outbuf,
772                     uint8_t           writeme )
773{
774    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
775}
776
777void
778tr_peerIoWriteUint16( tr_peerIo *       io,
779                      struct evbuffer * outbuf,
780                      uint16_t          writeme )
781{
782    uint16_t tmp = htons( writeme );
783
784    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
785}
786
787void
788tr_peerIoWriteUint32( tr_peerIo *       io,
789                      struct evbuffer * outbuf,
790                      uint32_t          writeme )
791{
792    uint32_t tmp = htonl( writeme );
793
794    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
795}
796
797/***
798****
799***/
800
801void
802tr_peerIoReadBytes( tr_peerIo *       io,
803                    struct evbuffer * inbuf,
804                    void *            bytes,
805                    size_t            byteCount )
806{
807    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
808
809    switch( io->encryptionMode )
810    {
811        case PEER_ENCRYPTION_NONE:
812            evbuffer_remove( inbuf, bytes, byteCount );
813            break;
814
815        case PEER_ENCRYPTION_RC4:
816            evbuffer_remove( inbuf, bytes, byteCount );
817            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
818            break;
819
820        default:
821            assert( 0 );
822    }
823}
824
825void
826tr_peerIoReadUint8( tr_peerIo *       io,
827                    struct evbuffer * inbuf,
828                    uint8_t *         setme )
829{
830    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
831}
832
833void
834tr_peerIoReadUint16( tr_peerIo *       io,
835                     struct evbuffer * inbuf,
836                     uint16_t *        setme )
837{
838    uint16_t tmp;
839
840    assert( tr_isPeerIo( io ) );
841
842    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
843    *setme = ntohs( tmp );
844}
845
846void
847tr_peerIoReadUint32( tr_peerIo *       io,
848                     struct evbuffer * inbuf,
849                     uint32_t *        setme )
850{
851    uint32_t tmp;
852
853    assert( tr_isPeerIo( io ) );
854
855    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
856    *setme = ntohl( tmp );
857}
858
859void
860tr_peerIoDrain( tr_peerIo *       io,
861                struct evbuffer * inbuf,
862                size_t            byteCount )
863{
864    uint8_t * tmp;
865
866    assert( tr_isPeerIo( io ) );
867
868    tmp = tr_new( uint8_t, byteCount );
869    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
870    tr_free( tmp );
871}
872
873int
874tr_peerIoGetAge( const tr_peerIo * io )
875{
876    return time( NULL ) - io->timeCreated;
877}
878
879/***
880****
881***/
882
883static int
884tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
885{
886    int res;
887
888    assert( tr_isPeerIo( io ) );
889
890    howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch );
891
892    res = howmuch ? evbuffer_read( io->inbuf, io->socket, howmuch ) : 0;
893
894    dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(errno):"") );
895
896    if( EVBUFFER_LENGTH( io->inbuf ) )
897        canReadWrapper( io );
898
899    if( ( res <= 0 ) && ( io->gotError ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
900    {
901        short what = EVBUFFER_READ | EVBUFFER_ERROR;
902        if( res == 0 )
903            what |= EVBUFFER_EOF;
904        io->gotError( io, what, io->userData );
905    }
906
907    return res;
908}
909
910static int
911tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
912{
913    int n;
914
915    assert( tr_isPeerIo( io ) );
916
917    howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch );
918
919    n = tr_evbuffer_write( io, io->socket, (int)howmuch );
920
921    if( n > 0 )
922        didWriteWrapper( io, n );
923
924    if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) {
925        short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
926        io->gotError( io, what, io->userData );
927    }
928
929    return n;
930}
931
932int
933tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
934{
935    int ret;
936
937    assert( tr_isPeerIo( io ) );
938    assert( tr_isDirection( dir ) );
939
940    if( dir==TR_DOWN )
941        ret = tr_peerIoTryRead( io, limit );
942    else
943        ret = tr_peerIoTryWrite( io, limit );
944
945    return ret;
946}
947
948struct evbuffer *
949tr_peerIoGetReadBuffer( tr_peerIo * io )
950{
951    assert( tr_isPeerIo( io ) );
952
953    return io->inbuf;
954}
955
956tr_bool
957tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
958{
959    assert( tr_isPeerIo( io ) );
960    assert( tr_isDirection( dir ) );
961
962    return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
963}
964
965/***
966****
967****/
968
969static void
970event_enable( tr_peerIo * io, short event )
971{
972    assert( tr_isPeerIo( io ) );
973
974    if( event & EV_READ )
975        event_add( &io->event_read, NULL );
976
977    if( event & EV_WRITE )
978        event_add( &io->event_write, NULL );
979}
980
981static void
982event_disable( struct tr_peerIo * io, short event )
983{
984    assert( tr_isPeerIo( io ) );
985
986    if( event & EV_READ )
987        event_del( &io->event_read );
988
989    if( event & EV_WRITE )
990        event_del( &io->event_write );
991}
992
993
994void
995tr_peerIoSetEnabled( tr_peerIo    * io,
996                     tr_direction   dir,
997                     tr_bool        isEnabled )
998{
999    short event;
1000
1001    assert( tr_isPeerIo( io ) );
1002    assert( tr_isDirection( dir ) );
1003
1004    event = dir == TR_UP ? EV_WRITE : EV_READ;
1005
1006    if( isEnabled )
1007        event_enable( io, event );
1008    else
1009        event_disable( io, event );
1010}
Note: See TracBrowser for help on using the repository browser.