source: trunk/libtransmission/peer-io.c @ 7595

Last change on this file since 7595 was 7595, checked in by charles, 12 years ago

(trunk libT) fix r7594 typo

  • Property svn:keywords set to Date Rev Author Id
File size: 20.6 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7595 2009-01-03 05:07:45Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <arpa/inet.h> /* inet_ntoa */
23#endif
24
25#include <event.h>
26
27#include "transmission.h"
28#include "session.h"
29#include "bandwidth.h"
30#include "crypto.h"
31#include "list.h"
32#include "net.h"
33#include "peer-io.h"
34#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
35#include "trevent.h"
36#include "utils.h"
37
38#define MAGIC_NUMBER 206745
39
40static size_t
41getPacketOverhead( size_t d )
42{
43    /**
44     * http://sd.wareonearth.com/~phil/net/overhead/
45     *
46     * TCP over Ethernet:
47     * Assuming no header compression (e.g. not PPP)
48     * Add 20 IPv4 header or 40 IPv6 header (no options)
49     * Add 20 TCP header
50     * Add 12 bytes optional TCP timestamps
51     * Max TCP Payload data rates over ethernet are thus:
52     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
53     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
54     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
55     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
56     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
57     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
58     */
59    static const double assumed_payload_data_rate = 94.0;
60
61    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
62}
63
64/**
65***
66**/
67
68#define dbgmsg( io, ... ) \
69    do { \
70        if( tr_deepLoggingIsActive( ) ) \
71            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
72    } while( 0 )
73
74struct tr_datatype
75{
76    tr_bool  isPieceData;
77    size_t   length;
78    struct __tr_list head;
79};
80
81/***
82****
83***/
84
85static void
86didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
87{
88    while( bytes_transferred )
89    {
90        struct tr_datatype * next = __tr_list_entry( io->outbuf_datatypes.next,
91                                                     struct tr_datatype, head );
92        const size_t payload = MIN( next->length, bytes_transferred );
93        const size_t overhead = getPacketOverhead( payload );
94
95        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData );
96
97        if( overhead > 0 )
98            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE );
99
100        if( io->didWrite )
101            io->didWrite( io, payload, next->isPieceData, io->userData );
102
103        bytes_transferred -= payload;
104        next->length -= payload;
105        if( !next->length ) {
106            __tr_list_remove( io->outbuf_datatypes.next );
107            tr_free( next );
108        }
109    }
110}
111
112static void
113canReadWrapper( tr_peerIo * io )
114{
115    tr_bool done = 0;
116    tr_bool err = 0;
117    tr_session * session = io->session;
118
119    dbgmsg( io, "canRead" );
120
121    /* try to consume the input buffer */
122    if( io->canRead )
123    {
124        tr_globalLock( session );
125
126        while( !done && !err )
127        {
128            size_t piece = 0;
129            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
130            const int ret = io->canRead( io, io->userData, &piece );
131
132            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
133
134            assert( tr_isPeerIo( io ) );
135
136            if( piece )
137                tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE );
138
139            if( used != piece )
140                tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE );
141
142            switch( ret )
143            {
144                case READ_NOW:
145                    if( EVBUFFER_LENGTH( io->inbuf ) )
146                        continue;
147                    done = 1;
148                    break;
149
150                case READ_LATER:
151                    done = 1;
152                    break;
153
154                case READ_ERR:
155                    err = 1;
156                    break;
157            }
158        }
159
160        tr_globalUnlock( session );
161    }
162}
163
164tr_bool
165tr_isPeerIo( const tr_peerIo * io )
166{
167    return ( io != NULL )
168        && ( io->magicNumber == MAGIC_NUMBER )
169        && ( tr_isBandwidth( &io->bandwidth ) )
170        && ( tr_isAddress( &io->addr ) )
171        && ( tr_isBool( io->isEncrypted ) )
172        && ( tr_isBool( io->isIncoming ) )
173        && ( tr_isBool( io->peerIdIsSet ) )
174        && ( tr_isBool( io->extendedProtocolSupported ) )
175        && ( tr_isBool( io->fastExtensionSupported ) );
176}
177
178static void
179event_read_cb( int fd, short event UNUSED, void * vio )
180{
181    int res;
182    int e;
183    tr_peerIo * io = vio;
184
185    /* Limit the input buffer to 256K, so it doesn't grow too large */
186    size_t howmuch;
187    const tr_direction dir = TR_DOWN;
188    const size_t max = 256 * 1024;
189    const size_t curlen = EVBUFFER_LENGTH( io->inbuf );
190
191    howmuch = curlen >= max ? 0 : max - curlen;
192    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
193
194    assert( tr_isPeerIo( io ) );
195
196    dbgmsg( io, "libevent says this peer is ready to read" );
197
198    /* if we don't have any bandwidth left, stop reading */
199    if( howmuch < 1 ) {
200        tr_peerIoSetEnabled( io, dir, FALSE );
201        return;
202    }
203
204    errno = 0;
205    res = evbuffer_read( io->inbuf, fd, howmuch );
206    e = errno;
207
208    if( res > 0 )
209    {
210        tr_peerIoSetEnabled( io, dir, TRUE );
211
212        /* Invoke the user callback - must always be called last */
213        canReadWrapper( io );
214    }
215    else
216    {
217        short what = EVBUFFER_READ;
218
219        if( res == 0 ) /* EOF */
220            what |= EVBUFFER_EOF;
221        else if( res == -1 ) {
222            if( e == EAGAIN || e == EINTR ) {
223                tr_peerIoSetEnabled( io, dir, TRUE );
224                return;
225            }
226            what |= EVBUFFER_ERROR;
227        }
228
229        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
230
231        if( io->gotError != NULL )
232            io->gotError( io, what, io->userData );
233    }
234}
235
236static ssize_t
237tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
238{
239    int e;
240    ssize_t n;
241    struct evbuffer * buffer = io->outbuf;
242
243    howmuch = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
244
245    errno = 0;
246#ifdef WIN32
247    n = send(fd, buffer->buffer, howmuch,  0 );
248#else
249    n = write(fd, buffer->buffer, howmuch );
250#endif
251    e = errno;
252    dbgmsg( io, "wrote %zd to peer (%s)", n, (n==-1?strerror(e):"") );
253
254    if( n > 0 )
255        evbuffer_drain( buffer, n );
256
257    return n;
258}
259
260static void
261event_write_cb( int fd, short event UNUSED, void * vio )
262{
263    int res = 0;
264    int e;
265    short what = EVBUFFER_WRITE;
266    tr_peerIo * io = vio;
267    size_t howmuch;
268    const tr_direction dir = TR_UP;
269
270    assert( tr_isPeerIo( io ) );
271
272    dbgmsg( io, "libevent says this peer is ready to write" );
273
274    /* Write as much as possible, since the socket is non-blocking, write() will
275     * return if it can't write any more data without blocking */
276    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, EVBUFFER_LENGTH( io->outbuf ) );
277
278    /* if we don't have any bandwidth left, stop writing */
279    if( howmuch < 1 ) {
280        tr_peerIoSetEnabled( io, dir, FALSE );
281        return;
282    }
283
284    errno = 0;
285    res = tr_evbuffer_write( io, fd, howmuch );
286    e = errno;
287
288    if (res == -1) {
289#ifndef WIN32
290/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
291 *  *set errno. thus this error checking is not portable*/
292        if (e == EAGAIN || e == EINTR || e == EINPROGRESS)
293            goto reschedule;
294        /* error case */
295        what |= EVBUFFER_ERROR;
296
297#else
298        goto reschedule;
299#endif
300
301    } else if (res == 0) {
302        /* eof case */
303        what |= EVBUFFER_EOF;
304    }
305    if (res <= 0)
306        goto error;
307
308    if( EVBUFFER_LENGTH( io->outbuf ) )
309        tr_peerIoSetEnabled( io, dir, TRUE );
310
311    didWriteWrapper( io, res );
312    return;
313
314 reschedule:
315    if( EVBUFFER_LENGTH( io->outbuf ) )
316        tr_peerIoSetEnabled( io, dir, TRUE );
317    return;
318
319 error:
320
321    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
322
323    if( io->gotError != NULL )
324        io->gotError( io, what, io->userData );
325}
326
327/**
328***
329**/
330
331static tr_peerIo*
332tr_peerIoNew( tr_session       * session,
333              tr_bandwidth     * parent,
334              const tr_address * addr,
335              tr_port            port,
336              const uint8_t    * torrentHash,
337              int                isIncoming,
338              int                socket )
339{
340    tr_peerIo * io;
341
342    if( socket >= 0 )
343        tr_netSetTOS( socket, session->peerSocketTOS );
344
345    io = tr_new0( tr_peerIo, 1 );
346    io->magicNumber = MAGIC_NUMBER;
347    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
348    io->session = session;
349    io->addr = *addr;
350    io->port = port;
351    io->socket = socket;
352    io->isIncoming = isIncoming != 0;
353    io->timeCreated = time( NULL );
354    io->inbuf = evbuffer_new( );
355    io->outbuf = evbuffer_new( );
356    tr_bandwidthConstruct( &io->bandwidth, session, parent );
357    tr_bandwidthSetPeer( &io->bandwidth, io );
358    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
359
360    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
361    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
362
363    __tr_list_init( &io->outbuf_datatypes );
364
365    return io;
366}
367
368tr_peerIo*
369tr_peerIoNewIncoming( tr_session        * session,
370                      tr_bandwidth      * parent,
371                      const tr_address  * addr,
372                      tr_port             port,
373                      int                 socket )
374{
375    assert( session );
376    assert( tr_isAddress( addr ) );
377    assert( socket >= 0 );
378
379    return tr_peerIoNew( session, parent, addr, port, NULL, 1, socket );
380}
381
382tr_peerIo*
383tr_peerIoNewOutgoing( tr_session        * session,
384                      tr_bandwidth      * parent,
385                      const tr_address  * addr,
386                      tr_port             port,
387                      const uint8_t     * torrentHash )
388{
389    int socket;
390
391    assert( session );
392    assert( tr_isAddress( addr ) );
393    assert( torrentHash );
394
395    socket = tr_netOpenTCP( session, addr, port );
396
397    return socket < 0
398           ? NULL
399           : tr_peerIoNew( session, parent, addr, port, torrentHash, 0, socket );
400}
401
402static void
403trDatatypeFree( void * data )
404{
405    struct tr_datatype * dt = __tr_list_entry( data, struct tr_datatype, head );
406    tr_free(dt);
407}
408
409static void
410io_dtor( void * vio )
411{
412    tr_peerIo * io = vio;
413
414    event_del( &io->event_read );
415    event_del( &io->event_write );
416    tr_bandwidthDestruct( &io->bandwidth );
417    evbuffer_free( io->outbuf );
418    evbuffer_free( io->inbuf );
419    tr_netClose( io->socket );
420    tr_cryptoFree( io->crypto );
421    __tr_list_destroy( &io->outbuf_datatypes, trDatatypeFree );
422
423    io->magicNumber = 0xDEAD;
424    tr_free( io );
425}
426
427void
428tr_peerIoFree( tr_peerIo * io )
429{
430    if( io )
431    {
432        io->canRead = NULL;
433        io->didWrite = NULL;
434        io->gotError = NULL;
435        tr_runInEventThread( io->session, io_dtor, io );
436    }
437}
438
439const tr_address*
440tr_peerIoGetAddress( const tr_peerIo * io,
441                           tr_port   * port )
442{
443    assert( tr_isPeerIo( io ) );
444
445    if( port )
446        *port = io->port;
447
448    return &io->addr;
449}
450
451const char*
452tr_peerIoAddrStr( const tr_address * addr, tr_port port )
453{
454    static char buf[512];
455
456    if( addr->type == TR_AF_INET ) 
457        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
458    else 
459        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
460    return buf;
461}
462
463void
464tr_peerIoSetIOFuncs( tr_peerIo        * io,
465                     tr_can_read_cb     readcb,
466                     tr_did_write_cb    writecb,
467                     tr_net_error_cb    errcb,
468                     void             * userData )
469{
470    io->canRead = readcb;
471    io->didWrite = writecb;
472    io->gotError = errcb;
473    io->userData = userData;
474}
475
476int
477tr_peerIoReconnect( tr_peerIo * io )
478{
479    assert( !tr_peerIoIsIncoming( io ) );
480
481    if( io->socket >= 0 )
482        tr_netClose( io->socket );
483
484    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port ); 
485    if( io->socket >= 0 )
486    {
487        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
488        return 0;
489    }
490
491    return -1;
492}
493
494/**
495***
496**/
497
498void
499tr_peerIoSetTorrentHash( tr_peerIo *     io,
500                         const uint8_t * hash )
501{
502    assert( tr_isPeerIo( io ) );
503
504    tr_cryptoSetTorrentHash( io->crypto, hash );
505}
506
507const uint8_t*
508tr_peerIoGetTorrentHash( tr_peerIo * io )
509{
510    assert( tr_isPeerIo( io ) );
511    assert( io->crypto );
512
513    return tr_cryptoGetTorrentHash( io->crypto );
514}
515
516int
517tr_peerIoHasTorrentHash( const tr_peerIo * io )
518{
519    assert( tr_isPeerIo( io ) );
520    assert( io->crypto );
521
522    return tr_cryptoHasTorrentHash( io->crypto );
523}
524
525/**
526***
527**/
528
529void
530tr_peerIoSetPeersId( tr_peerIo *     io,
531                     const uint8_t * peer_id )
532{
533    assert( tr_isPeerIo( io ) );
534
535    if( ( io->peerIdIsSet = peer_id != NULL ) )
536        memcpy( io->peerId, peer_id, 20 );
537    else
538        memset( io->peerId, 0, 20 );
539}
540
541/**
542***
543**/
544
545void
546tr_peerIoEnableFEXT( tr_peerIo * io,
547                     tr_bool     flag )
548{
549    assert( tr_isPeerIo( io ) );
550    assert( tr_isBool( flag ) );
551
552    dbgmsg( io, "setting FEXT support flag to %d", (flag!=0) );
553    io->fastExtensionSupported = flag;
554}
555
556void
557tr_peerIoEnableLTEP( tr_peerIo  * io,
558                     tr_bool      flag )
559{
560    assert( tr_isPeerIo( io ) );
561    assert( tr_isBool( flag ) );
562
563    dbgmsg( io, "setting LTEP support flag to %d", (flag!=0) );
564    io->extendedProtocolSupported = flag;
565}
566
567/**
568***
569**/
570
571static size_t
572getDesiredOutputBufferSize( const tr_peerIo * io )
573{
574    /* this is all kind of arbitrary, but what seems to work well is
575     * being large enough to hold the next 20 seconds' worth of input,
576     * or a few blocks, whichever is bigger.
577     * It's okay to tweak this as needed */
578    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
579    const double currentSpeed = tr_bandwidthGetPieceSpeed( &io->bandwidth, TR_UP );
580    const double period = 20; /* arbitrary */
581    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
582    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
583}
584
585size_t
586tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
587{
588    const size_t desiredLen = getDesiredOutputBufferSize( io );
589    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
590    size_t freeSpace = 0;
591
592    if( desiredLen > currentLen )
593        freeSpace = desiredLen - currentLen;
594
595    return freeSpace;
596}
597
598/**
599***
600**/
601
602void
603tr_peerIoSetEncryption( tr_peerIo * io,
604                        int         encryptionMode )
605{
606    assert( tr_isPeerIo( io ) );
607    assert( encryptionMode == PEER_ENCRYPTION_NONE
608         || encryptionMode == PEER_ENCRYPTION_RC4 );
609
610    io->encryptionMode = encryptionMode;
611}
612
613/**
614***
615**/
616
617void
618tr_peerIoWrite( tr_peerIo   * io,
619                const void  * writeme,
620                size_t        writemeLen,
621                int           isPieceData )
622{
623    struct tr_datatype * datatype;
624
625    assert( tr_amInEventThread( io->session ) );
626    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
627
628    datatype = tr_new( struct tr_datatype, 1 );
629    datatype->isPieceData = isPieceData != 0;
630    datatype->length = writemeLen;
631
632    __tr_list_init( &datatype->head );
633    __tr_list_append( &io->outbuf_datatypes, &datatype->head );
634
635    evbuffer_add( io->outbuf, writeme, writemeLen );
636}
637
638void
639tr_peerIoWriteBuf( tr_peerIo         * io,
640                   struct evbuffer   * buf,
641                   int                 isPieceData )
642{
643    const size_t n = EVBUFFER_LENGTH( buf );
644    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
645    evbuffer_drain( buf, n );
646}
647
648/**
649***
650**/
651
652void
653tr_peerIoWriteBytes( tr_peerIo       * io,
654                     struct evbuffer * outbuf,
655                     const void      * bytes,
656                     size_t            byteCount )
657{
658    uint8_t tmp[MAX_STACK_ARRAY_SIZE];
659
660    switch( io->encryptionMode )
661    {
662        case PEER_ENCRYPTION_NONE:
663            evbuffer_add( outbuf, bytes, byteCount );
664            break;
665
666        case PEER_ENCRYPTION_RC4:
667            evbuffer_expand( outbuf, byteCount );
668            while( byteCount > 0 ) {
669                const size_t thisPass = MIN( byteCount, sizeof( tmp ) );
670                tr_cryptoEncrypt( io->crypto, thisPass, bytes, tmp );
671                evbuffer_add( outbuf, tmp, thisPass );
672                bytes += thisPass;
673                byteCount -= thisPass;
674            }
675            break;
676
677        default:
678            assert( 0 );
679    }
680}
681
682/***
683****
684***/
685
686void
687tr_peerIoReadBytes( tr_peerIo       * io,
688                    struct evbuffer * inbuf,
689                    void            * bytes,
690                    size_t            byteCount )
691{
692    assert( tr_isPeerIo( io ) );
693    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
694
695    switch( io->encryptionMode )
696    {
697        case PEER_ENCRYPTION_NONE:
698            evbuffer_remove( inbuf, bytes, byteCount );
699            break;
700
701        case PEER_ENCRYPTION_RC4:
702            evbuffer_remove( inbuf, bytes, byteCount );
703            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
704            break;
705
706        default:
707            assert( 0 );
708    }
709}
710
711void
712tr_peerIoDrain( tr_peerIo       * io,
713                struct evbuffer * inbuf,
714                size_t            byteCount )
715{
716    uint8_t tmp[MAX_STACK_ARRAY_SIZE];
717
718    while( byteCount > 0 )
719    {
720        const size_t thisPass = MIN( byteCount, sizeof( tmp ) );
721        tr_peerIoReadBytes( io, inbuf, tmp, thisPass );
722        byteCount -= thisPass;
723    }
724}
725
726/***
727****
728***/
729
730static ssize_t
731tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
732{
733    ssize_t res = 0;
734
735    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
736    {
737        int e;
738        errno = 0;
739        res = evbuffer_read( io->inbuf, io->socket, howmuch );
740        e = errno;
741
742        dbgmsg( io, "read %zd from peer (%s)", res, (res==-1?strerror(e):"") );
743
744        if( EVBUFFER_LENGTH( io->inbuf ) )
745            canReadWrapper( io );
746
747        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
748        {
749            short what = EVBUFFER_READ | EVBUFFER_ERROR;
750            if( res == 0 )
751                what |= EVBUFFER_EOF;
752            dbgmsg( io, "tr_peerIoTryRead got an error. res is %zd, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
753            io->gotError( io, what, io->userData );
754        }
755    }
756
757    return res;
758}
759
760static ssize_t
761tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
762{
763    ssize_t n = 0;
764
765    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
766    {
767        int e;
768        errno = 0;
769        n = tr_evbuffer_write( io, io->socket, howmuch );
770        e = errno;
771
772        if( n > 0 )
773            didWriteWrapper( io, n );
774
775        if( ( n < 0 ) && ( io->gotError ) && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
776        {
777            const short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
778            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %zd, what is %hd, errno is %d (%s)", n, what, e, strerror( e ) );
779            io->gotError( io, what, io->userData );
780        }
781    }
782
783    return n;
784}
785
786ssize_t
787tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
788{
789    ssize_t bytesUsed;
790
791    assert( tr_isPeerIo( io ) );
792    assert( tr_isDirection( dir ) );
793
794    if( dir == TR_DOWN )
795        bytesUsed = tr_peerIoTryRead( io, limit );
796    else
797        bytesUsed = tr_peerIoTryWrite( io, limit );
798
799    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %zd", (int)dir, limit, bytesUsed );
800    return bytesUsed;
801}
802
803/***
804****
805****/
806
807static void
808event_enable( tr_peerIo * io, short event )
809{
810    assert( tr_isPeerIo( io ) );
811
812    if( event & EV_READ )
813        event_add( &io->event_read, NULL );
814
815    if( event & EV_WRITE )
816        event_add( &io->event_write, NULL );
817}
818
819static void
820event_disable( struct tr_peerIo * io, short event )
821{
822    assert( tr_isPeerIo( io ) );
823
824    if( event & EV_READ )
825        event_del( &io->event_read );
826
827    if( event & EV_WRITE )
828        event_del( &io->event_write );
829}
830
831
832void
833tr_peerIoSetEnabled( tr_peerIo    * io,
834                     tr_direction   dir,
835                     tr_bool        isEnabled )
836{
837    short event;
838
839    assert( tr_isPeerIo( io ) );
840    assert( tr_isDirection( dir ) );
841
842    event = dir == TR_UP ? EV_WRITE : EV_READ;
843
844    if( isEnabled )
845        event_enable( io, event );
846    else
847        event_disable( io, event );
848}
Note: See TracBrowser for help on using the repository browser.