source: trunk/libtransmission/peer-io.c @ 7446

Last change on this file since 7446 was 7446, checked in by charles, 12 years ago

(trunk libT) comments, tr_bool correctness, better runtime tests.

  • Property svn:keywords set to Date Rev Author Id
File size: 22.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7446 2008-12-21 18:15:00Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <arpa/inet.h> /* inet_ntoa */
23#endif
24
25#include <event.h>
26
27#include "transmission.h"
28#include "bandwidth.h"
29#include "crypto.h"
30#include "list.h"
31#include "net.h"
32#include "peer-io.h"
33#include "trevent.h"
34#include "utils.h"
35
36#define MAGIC_NUMBER 206745
37
38static size_t
39getPacketOverhead( size_t d )
40{
41    /**
42     * http://sd.wareonearth.com/~phil/net/overhead/
43     *
44     * TCP over Ethernet:
45     * Assuming no header compression (e.g. not PPP)
46     * Add 20 IPv4 header or 40 IPv6 header (no options)
47     * Add 20 TCP header
48     * Add 12 bytes optional TCP timestamps
49     * Max TCP Payload data rates over ethernet are thus:
50     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
51     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
52     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
53     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
54     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
55     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
56     */
57    static const double assumed_payload_data_rate = 94.0;
58
59    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
60}
61
62/**
63***
64**/
65
66#define dbgmsg( io, ... ) \
67    do { \
68        if( tr_deepLoggingIsActive( ) ) \
69            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
70    } while( 0 )
71
72struct tr_datatype
73{
74    tr_bool  isPieceData;
75    size_t   length;
76};
77
78struct tr_peerIo
79{
80    tr_bool            isEncrypted;
81    tr_bool            isIncoming;
82    tr_bool            peerIdIsSet;
83    tr_bool            extendedProtocolSupported;
84    tr_bool            fastExtensionSupported;
85
86    int                magicNumber;
87
88    uint8_t            encryptionMode;
89    tr_port            port;
90    int                socket;
91
92    uint8_t            peerId[20];
93    time_t             timeCreated;
94
95    tr_session       * session;
96
97    tr_address         addr;
98    tr_list          * output_datatypes; /* struct tr_datatype */
99
100    tr_can_read_cb     canRead;
101    tr_did_write_cb    didWrite;
102    tr_net_error_cb    gotError;
103    void *             userData;
104
105    size_t             bufferSize[2];
106
107    tr_bandwidth     * bandwidth;
108    tr_crypto        * crypto;
109
110    struct evbuffer  * inbuf;
111    struct evbuffer  * outbuf;
112
113    struct event       event_read;
114    struct event       event_write;
115};
116
117/***
118****
119***/
120
121static void
122didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
123{
124    while( bytes_transferred )
125    {
126        struct tr_datatype * next = io->output_datatypes->data;
127        const size_t payload = MIN( next->length, bytes_transferred );
128        const size_t overhead = getPacketOverhead( payload );
129
130        tr_bandwidthUsed( io->bandwidth, TR_UP, payload, next->isPieceData );
131
132        if( overhead > 0 )
133            tr_bandwidthUsed( io->bandwidth, TR_UP, overhead, FALSE );
134
135        if( io->didWrite )
136            io->didWrite( io, payload, next->isPieceData, io->userData );
137
138        bytes_transferred -= payload;
139        next->length -= payload;
140        if( !next->length )
141            tr_free( tr_list_pop_front( &io->output_datatypes ) );
142    }
143}
144
145static void
146canReadWrapper( tr_peerIo * io )
147{
148    tr_bool done = 0;
149    tr_bool err = 0;
150    tr_session * session = io->session;
151
152    dbgmsg( io, "canRead" );
153
154    /* try to consume the input buffer */
155    if( io->canRead )
156    {
157        tr_globalLock( session );
158
159        while( !done && !err )
160        {
161            size_t piece = 0;
162            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
163            const int ret = io->canRead( io, io->userData, &piece );
164
165            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
166
167            if( piece )
168                tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
169
170            if( used != piece )
171                tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
172
173            switch( ret )
174            {
175                case READ_NOW:
176                    if( EVBUFFER_LENGTH( io->inbuf ) )
177                        continue;
178                    done = 1;
179                    break;
180
181                case READ_LATER:
182                    done = 1;
183                    break;
184
185                case READ_ERR:
186                    err = 1;
187                    break;
188            }
189        }
190
191        tr_globalUnlock( session );
192    }
193}
194
195#define _isBool(b) (((b)==0 || (b)==1))
196
197tr_bool
198tr_isPeerIo( const tr_peerIo * io )
199{
200    return ( io != NULL )
201        && ( io->magicNumber == MAGIC_NUMBER )
202        && ( tr_isAddress( &io->addr ) )
203        && ( _isBool( io->isEncrypted ) )
204        && ( _isBool( io->isIncoming ) )
205        && ( _isBool( io->peerIdIsSet ) )
206        && ( _isBool( io->extendedProtocolSupported ) )
207        && ( _isBool( io->fastExtensionSupported ) );
208}
209
210static void
211event_read_cb( int fd, short event UNUSED, void * vio )
212{
213    int res;
214    short what = EVBUFFER_READ;
215    tr_peerIo * io = vio;
216    const size_t howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, io->session->so_rcvbuf );
217    const tr_direction dir = TR_DOWN;
218
219    assert( tr_isPeerIo( io ) );
220
221    dbgmsg( io, "libevent says this peer is ready to read" );
222
223    /* if we don't have any bandwidth left, stop reading */
224    if( howmuch < 1 ) {
225        tr_peerIoSetEnabled( io, dir, FALSE );
226        return;
227    }
228
229    res = evbuffer_read( io->inbuf, fd, howmuch );
230    if( res == -1 ) {
231        if( errno == EAGAIN || errno == EINTR )
232            goto reschedule;
233        /* error case */
234        what |= EVBUFFER_ERROR;
235    } else if( res == 0 ) {
236        /* eof case */
237        what |= EVBUFFER_EOF;
238    }
239
240    if( res <= 0 )
241        goto error;
242
243    tr_peerIoSetEnabled( io, dir, TRUE );
244
245    /* Invoke the user callback - must always be called last */
246    canReadWrapper( io );
247
248    return;
249
250 reschedule:
251    tr_peerIoSetEnabled( io, dir, TRUE );
252    return;
253
254 error:
255    if( io->gotError != NULL )
256        io->gotError( io, what, io->userData );
257}
258
259static int
260tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
261{
262    struct evbuffer * buffer = io->outbuf;
263    int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
264
265#ifdef WIN32
266    n = send(fd, buffer->buffer, n,  0 );
267#else
268    n = write(fd, buffer->buffer, n );
269#endif
270    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
271
272    if( n == -1 )
273        return -1;
274    if (n == 0)
275        return 0;
276    evbuffer_drain( buffer, n );
277
278    return n;
279}
280
281static void
282event_write_cb( int fd, short event UNUSED, void * vio )
283{
284    int res = 0;
285    short what = EVBUFFER_WRITE;
286    tr_peerIo * io = vio;
287    size_t howmuch;
288    const tr_direction dir = TR_UP;
289
290    assert( tr_isPeerIo( io ) );
291
292    dbgmsg( io, "libevent says this peer is ready to write" );
293
294    howmuch = MIN( (size_t)io->session->so_sndbuf, EVBUFFER_LENGTH( io->outbuf ) );
295    howmuch = tr_bandwidthClamp( io->bandwidth, dir, howmuch );
296
297    /* if we don't have any bandwidth left, stop writing */
298    if( howmuch < 1 ) {
299        tr_peerIoSetEnabled( io, dir, FALSE );
300        return;
301    }
302
303    res = tr_evbuffer_write( io, fd, howmuch );
304    if (res == -1) {
305#ifndef WIN32
306/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
307 *  *set errno. thus this error checking is not portable*/
308        if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
309            goto reschedule;
310        /* error case */
311        what |= EVBUFFER_ERROR;
312
313#else
314        goto reschedule;
315#endif
316
317    } else if (res == 0) {
318        /* eof case */
319        what |= EVBUFFER_EOF;
320    }
321    if (res <= 0)
322        goto error;
323
324    if( EVBUFFER_LENGTH( io->outbuf ) )
325        tr_peerIoSetEnabled( io, dir, TRUE );
326
327    didWriteWrapper( io, res );
328    return;
329
330 reschedule:
331    if( EVBUFFER_LENGTH( io->outbuf ) )
332        tr_peerIoSetEnabled( io, dir, TRUE );
333    return;
334
335 error:
336    if( io->gotError != NULL )
337        io->gotError( io, what, io->userData );
338}
339
340/**
341***
342**/
343
344
345static int
346isFlag( int flag )
347{
348    return( ( flag == 0 ) || ( flag == 1 ) );
349}
350
351static tr_peerIo*
352tr_peerIoNew( tr_session       * session,
353              const tr_address * addr,
354              tr_port            port,
355              const uint8_t    * torrentHash,
356              int                isIncoming,
357              int                socket )
358{
359    tr_peerIo * io;
360
361    if( socket >= 0 )
362        tr_netSetTOS( socket, session->peerSocketTOS );
363
364    io = tr_new0( tr_peerIo, 1 );
365    io->magicNumber = MAGIC_NUMBER;
366    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
367    io->session = session;
368    io->addr = *addr;
369    io->port = port;
370    io->socket = socket;
371    io->isIncoming = isIncoming != 0;
372    io->timeCreated = time( NULL );
373    io->inbuf = evbuffer_new( );
374    io->outbuf = evbuffer_new( );
375    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
376    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
377#if 0
378    bufevNew( io );
379#endif
380    tr_peerIoSetBandwidth( io, session->bandwidth );
381    return io;
382}
383
384tr_peerIo*
385tr_peerIoNewIncoming( tr_session       * session,
386                      const tr_address * addr,
387                      tr_port            port,
388                      int                socket )
389{
390    assert( session );
391    assert( addr );
392    assert( socket >= 0 );
393
394    return tr_peerIoNew( session, addr, port, NULL, 1, socket );
395}
396
397tr_peerIo*
398tr_peerIoNewOutgoing( tr_session       * session,
399                      const tr_address * addr,
400                      tr_port            port,
401                      const uint8_t    * torrentHash )
402{
403    int socket;
404
405    assert( session );
406    assert( addr );
407    assert( torrentHash );
408
409    socket = tr_netOpenTCP( session, addr, port );
410
411    return socket < 0
412           ? NULL
413           : tr_peerIoNew( session, addr, port, torrentHash, 0, socket );
414}
415
416static void
417io_dtor( void * vio )
418{
419    tr_peerIo * io = vio;
420
421    event_del( &io->event_read );
422    event_del( &io->event_write );
423    tr_peerIoSetBandwidth( io, NULL );
424    evbuffer_free( io->outbuf );
425    evbuffer_free( io->inbuf );
426    tr_netClose( io->socket );
427    tr_cryptoFree( io->crypto );
428    tr_list_free( &io->output_datatypes, tr_free );
429
430    io->magicNumber = 0xDEAD;
431    tr_free( io );
432}
433
434void
435tr_peerIoFree( tr_peerIo * io )
436{
437    if( io )
438    {
439        io->canRead = NULL;
440        io->didWrite = NULL;
441        io->gotError = NULL;
442        tr_runInEventThread( io->session, io_dtor, io );
443    }
444}
445
446tr_session*
447tr_peerIoGetSession( tr_peerIo * io )
448{
449    assert( tr_isPeerIo( io ) );
450    assert( io->session );
451
452    return io->session;
453}
454
455const tr_address*
456tr_peerIoGetAddress( const tr_peerIo * io,
457                           tr_port   * port )
458{
459    assert( tr_isPeerIo( io ) );
460
461    if( port )
462        *port = io->port;
463
464    return &io->addr;
465}
466
467const char*
468tr_peerIoAddrStr( const tr_address * addr,
469                  tr_port            port )
470{
471    static char buf[512];
472
473    if( addr->type == TR_AF_INET ) 
474        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
475    else 
476        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
477    return buf;
478}
479
480const char*
481tr_peerIoGetAddrStr( const tr_peerIo * io )
482{
483    return tr_peerIoAddrStr( &io->addr, io->port );
484}
485
486void
487tr_peerIoSetIOFuncs( tr_peerIo        * io,
488                     tr_can_read_cb     readcb,
489                     tr_did_write_cb    writecb,
490                     tr_net_error_cb    errcb,
491                     void             * userData )
492{
493    io->canRead = readcb;
494    io->didWrite = writecb;
495    io->gotError = errcb;
496    io->userData = userData;
497}
498
499tr_bool
500tr_peerIoIsIncoming( const tr_peerIo * c )
501{
502    return c->isIncoming != 0;
503}
504
505int
506tr_peerIoReconnect( tr_peerIo * io )
507{
508    assert( !tr_peerIoIsIncoming( io ) );
509
510    if( io->socket >= 0 )
511        tr_netClose( io->socket );
512
513    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port );
514
515    if( io->socket >= 0 )
516    {
517        tr_bandwidth * bandwidth = io->bandwidth;
518        tr_peerIoSetBandwidth( io, NULL );
519
520        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
521#if 0
522        bufevNew( io );
523#endif
524
525        tr_peerIoSetBandwidth( io, bandwidth );
526        return 0;
527    }
528
529    return -1;
530}
531
532/**
533***
534**/
535
536void
537tr_peerIoSetTorrentHash( tr_peerIo *     io,
538                         const uint8_t * hash )
539{
540    assert( tr_isPeerIo( io ) );
541
542    tr_cryptoSetTorrentHash( io->crypto, hash );
543}
544
545const uint8_t*
546tr_peerIoGetTorrentHash( tr_peerIo * io )
547{
548    assert( tr_isPeerIo( io ) );
549    assert( io->crypto );
550
551    return tr_cryptoGetTorrentHash( io->crypto );
552}
553
554int
555tr_peerIoHasTorrentHash( const tr_peerIo * io )
556{
557    assert( tr_isPeerIo( io ) );
558    assert( io->crypto );
559
560    return tr_cryptoHasTorrentHash( io->crypto );
561}
562
563/**
564***
565**/
566
567void
568tr_peerIoSetPeersId( tr_peerIo *     io,
569                     const uint8_t * peer_id )
570{
571    assert( tr_isPeerIo( io ) );
572
573    if( ( io->peerIdIsSet = peer_id != NULL ) )
574        memcpy( io->peerId, peer_id, 20 );
575    else
576        memset( io->peerId, 0, 20 );
577}
578
579const uint8_t*
580tr_peerIoGetPeersId( const tr_peerIo * io )
581{
582    assert( tr_isPeerIo( io ) );
583    assert( io->peerIdIsSet );
584
585    return io->peerId;
586}
587
588/**
589***
590**/
591
592void
593tr_peerIoEnableFEXT( tr_peerIo * io,
594                     tr_bool     flag )
595{
596    assert( tr_isPeerIo( io ) );
597    assert( isFlag( flag ) );
598
599    dbgmsg( io, "setting FEXT support flag to %d", (flag?1:0) );
600    io->fastExtensionSupported = flag;
601}
602
603tr_bool
604tr_peerIoSupportsFEXT( const tr_peerIo * io )
605{
606    assert( tr_isPeerIo( io ) );
607
608    return io->fastExtensionSupported;
609}
610
611/**
612***
613**/
614
615void
616tr_peerIoEnableLTEP( tr_peerIo  * io,
617                     tr_bool      flag )
618{
619    assert( tr_isPeerIo( io ) );
620    assert( isFlag( flag ) );
621
622    dbgmsg( io, "setting LTEP support flag to %d", (flag?1:0) );
623    io->extendedProtocolSupported = flag;
624}
625
626tr_bool
627tr_peerIoSupportsLTEP( const tr_peerIo * io )
628{
629    assert( tr_isPeerIo( io ) );
630
631    return io->extendedProtocolSupported;
632}
633
634/**
635***
636**/
637
638static size_t
639getDesiredOutputBufferSize( const tr_peerIo * io )
640{
641    /* this is all kind of arbitrary, but what seems to work well is
642     * being large enough to hold the next 20 seconds' worth of input,
643     * or a few blocks, whichever is bigger.
644     * It's okay to tweak this as needed */
645    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
646    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
647    const double period = 20; /* arbitrary */
648    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
649    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
650}
651
652size_t
653tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
654{
655    const size_t desiredLen = getDesiredOutputBufferSize( io );
656    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
657    size_t freeSpace = 0;
658
659    if( desiredLen > currentLen )
660        freeSpace = desiredLen - currentLen;
661
662    return freeSpace;
663}
664
665void
666tr_peerIoSetBandwidth( tr_peerIo     * io,
667                       tr_bandwidth  * bandwidth )
668{
669    assert( tr_isPeerIo( io ) );
670
671    if( io->bandwidth )
672        tr_bandwidthRemovePeer( io->bandwidth, io );
673
674    io->bandwidth = bandwidth;
675
676    if( io->bandwidth )
677        tr_bandwidthAddPeer( io->bandwidth, io );
678}
679
680/**
681***
682**/
683
684tr_crypto*
685tr_peerIoGetCrypto( tr_peerIo * c )
686{
687    return c->crypto;
688}
689
690void
691tr_peerIoSetEncryption( tr_peerIo * io,
692                        int         encryptionMode )
693{
694    assert( tr_isPeerIo( io ) );
695    assert( encryptionMode == PEER_ENCRYPTION_NONE
696          || encryptionMode == PEER_ENCRYPTION_RC4 );
697
698    io->encryptionMode = encryptionMode;
699}
700
701int
702tr_peerIoIsEncrypted( const tr_peerIo * io )
703{
704    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
705}
706
707/**
708***
709**/
710
711void
712tr_peerIoWrite( tr_peerIo   * io,
713                const void  * writeme,
714                size_t        writemeLen,
715                int           isPieceData )
716{
717    struct tr_datatype * datatype;
718    assert( tr_amInEventThread( io->session ) );
719    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
720
721    datatype = tr_new( struct tr_datatype, 1 );
722    datatype->isPieceData = isPieceData != 0;
723    datatype->length = writemeLen;
724    tr_list_append( &io->output_datatypes, datatype );
725
726    evbuffer_add( io->outbuf, writeme, writemeLen );
727}
728
729void
730tr_peerIoWriteBuf( tr_peerIo         * io,
731                   struct evbuffer   * buf,
732                   int                 isPieceData )
733{
734    const size_t n = EVBUFFER_LENGTH( buf );
735
736    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
737    evbuffer_drain( buf, n );
738}
739
740/**
741***
742**/
743
744void
745tr_peerIoWriteBytes( tr_peerIo *       io,
746                     struct evbuffer * outbuf,
747                     const void *      bytes,
748                     size_t            byteCount )
749{
750    uint8_t * tmp;
751
752    switch( io->encryptionMode )
753    {
754        case PEER_ENCRYPTION_NONE:
755            evbuffer_add( outbuf, bytes, byteCount );
756            break;
757
758        case PEER_ENCRYPTION_RC4:
759            tmp = tr_new( uint8_t, byteCount );
760            tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
761            evbuffer_add( outbuf, tmp, byteCount );
762            tr_free( tmp );
763            break;
764
765        default:
766            assert( 0 );
767    }
768}
769
770void
771tr_peerIoWriteUint8( tr_peerIo *       io,
772                     struct evbuffer * outbuf,
773                     uint8_t           writeme )
774{
775    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
776}
777
778void
779tr_peerIoWriteUint16( tr_peerIo *       io,
780                      struct evbuffer * outbuf,
781                      uint16_t          writeme )
782{
783    uint16_t tmp = htons( writeme );
784
785    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
786}
787
788void
789tr_peerIoWriteUint32( tr_peerIo *       io,
790                      struct evbuffer * outbuf,
791                      uint32_t          writeme )
792{
793    uint32_t tmp = htonl( writeme );
794
795    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
796}
797
798/***
799****
800***/
801
802void
803tr_peerIoReadBytes( tr_peerIo *       io,
804                    struct evbuffer * inbuf,
805                    void *            bytes,
806                    size_t            byteCount )
807{
808    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
809
810    switch( io->encryptionMode )
811    {
812        case PEER_ENCRYPTION_NONE:
813            evbuffer_remove( inbuf, bytes, byteCount );
814            break;
815
816        case PEER_ENCRYPTION_RC4:
817            evbuffer_remove( inbuf, bytes, byteCount );
818            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
819            break;
820
821        default:
822            assert( 0 );
823    }
824}
825
826void
827tr_peerIoReadUint8( tr_peerIo *       io,
828                    struct evbuffer * inbuf,
829                    uint8_t *         setme )
830{
831    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
832}
833
834void
835tr_peerIoReadUint16( tr_peerIo *       io,
836                     struct evbuffer * inbuf,
837                     uint16_t *        setme )
838{
839    uint16_t tmp;
840
841    assert( tr_isPeerIo( io ) );
842
843    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
844    *setme = ntohs( tmp );
845}
846
847void
848tr_peerIoReadUint32( tr_peerIo *       io,
849                     struct evbuffer * inbuf,
850                     uint32_t *        setme )
851{
852    uint32_t tmp;
853
854    assert( tr_isPeerIo( io ) );
855
856    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
857    *setme = ntohl( tmp );
858}
859
860void
861tr_peerIoDrain( tr_peerIo *       io,
862                struct evbuffer * inbuf,
863                size_t            byteCount )
864{
865    uint8_t * tmp;
866
867    assert( tr_isPeerIo( io ) );
868
869    tmp = tr_new( uint8_t, byteCount );
870    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
871    tr_free( tmp );
872}
873
874int
875tr_peerIoGetAge( const tr_peerIo * io )
876{
877    return time( NULL ) - io->timeCreated;
878}
879
880/***
881****
882***/
883
884static int
885tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
886{
887    int res;
888
889    assert( tr_isPeerIo( io ) );
890
891    howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch );
892
893    res = howmuch ? evbuffer_read( io->inbuf, io->socket, howmuch ) : 0;
894
895    dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(errno):"") );
896
897    if( EVBUFFER_LENGTH( io->inbuf ) )
898        canReadWrapper( io );
899
900    if( ( res <= 0 ) && ( io->gotError ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
901    {
902        short what = EVBUFFER_READ | EVBUFFER_ERROR;
903        if( res == 0 )
904            what |= EVBUFFER_EOF;
905        io->gotError( io, what, io->userData );
906    }
907
908    return res;
909}
910
911static int
912tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
913{
914    int n;
915
916    assert( tr_isPeerIo( io ) );
917
918    howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch );
919
920    n = tr_evbuffer_write( io, io->socket, (int)howmuch );
921
922    if( n > 0 )
923        didWriteWrapper( io, n );
924
925    if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) {
926        short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
927        io->gotError( io, what, io->userData );
928    }
929
930    return n;
931}
932
933int
934tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
935{
936    int ret;
937
938    assert( tr_isPeerIo( io ) );
939    assert( tr_isDirection( dir ) );
940
941    if( dir==TR_DOWN )
942        ret = tr_peerIoTryRead( io, limit );
943    else
944        ret = tr_peerIoTryWrite( io, limit );
945
946    return ret;
947}
948
949struct evbuffer *
950tr_peerIoGetReadBuffer( tr_peerIo * io )
951{
952    assert( tr_isPeerIo( io ) );
953
954    return io->inbuf;
955}
956
957tr_bool
958tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
959{
960    assert( tr_isPeerIo( io ) );
961    assert( tr_isDirection( dir ) );
962
963    return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
964}
965
966/***
967****
968****/
969
970static void
971event_enable( tr_peerIo * io, short event )
972{
973    assert( tr_isPeerIo( io ) );
974
975    if( event & EV_READ )
976        event_add( &io->event_read, NULL );
977
978    if( event & EV_WRITE )
979        event_add( &io->event_write, NULL );
980}
981
982static void
983event_disable( struct tr_peerIo * io, short event )
984{
985    assert( tr_isPeerIo( io ) );
986
987    if( event & EV_READ )
988        event_del( &io->event_read );
989
990    if( event & EV_WRITE )
991        event_del( &io->event_write );
992}
993
994
995void
996tr_peerIoSetEnabled( tr_peerIo    * io,
997                     tr_direction   dir,
998                     tr_bool        isEnabled )
999{
1000    short event;
1001
1002    assert( tr_isPeerIo( io ) );
1003    assert( tr_isDirection( dir ) );
1004
1005    event = dir == TR_UP ? EV_WRITE : EV_READ;
1006
1007    if( isEnabled )
1008        event_enable( io, event );
1009    else
1010        event_disable( io, event );
1011}
Note: See TracBrowser for help on using the repository browser.