source: trunk/libtransmission/peer-io.c @ 8050

Last change on this file since 8050 was 7888, checked in by charles, 12 years ago

(trunk) #1787: add support for seeding ratio limiting in libtransmission

  • Property svn:keywords set to Date Rev Author Id
File size: 22.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7888 2009-02-13 18:23:56Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event.h>
27
28#include "transmission.h"
29#include "session.h"
30#include "bandwidth.h"
31#include "crypto.h"
32#include "list.h"
33#include "net.h"
34#include "peer-io.h"
35#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
36#include "trevent.h"
37#include "utils.h"
38
39#define MAGIC_NUMBER 206745
40
41static size_t
42getPacketOverhead( size_t d )
43{
44    /**
45     * http://sd.wareonearth.com/~phil/net/overhead/
46     *
47     * TCP over Ethernet:
48     * Assuming no header compression (e.g. not PPP)
49     * Add 20 IPv4 header or 40 IPv6 header (no options)
50     * Add 20 TCP header
51     * Add 12 bytes optional TCP timestamps
52     * Max TCP Payload data rates over ethernet are thus:
53     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
54     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
55     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
56     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
57     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
58     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
59     */
60    static const double assumed_payload_data_rate = 94.0;
61
62    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
63}
64
65/**
66***
67**/
68
69#define dbgmsg( io, ... ) \
70    do { \
71        if( tr_deepLoggingIsActive( ) ) \
72            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
73    } while( 0 )
74
75struct tr_datatype
76{
77    tr_bool  isPieceData;
78    size_t   length;
79    struct __tr_list head;
80};
81
82/***
83****
84***/
85
86static void
87didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
88{
89     while( bytes_transferred && tr_isPeerIo( io ) )
90     {
91        struct tr_datatype * next = __tr_list_entry( io->outbuf_datatypes.next, struct tr_datatype, head );
92        const size_t payload = MIN( next->length, bytes_transferred );
93        const size_t overhead = getPacketOverhead( payload );
94
95        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData );
96
97        if( overhead > 0 )
98            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE );
99
100        if( io->didWrite )
101            io->didWrite( io, payload, next->isPieceData, io->userData );
102       
103        if( tr_isPeerIo( io ) )
104        {
105            bytes_transferred -= payload;
106            next->length -= payload;
107            if( !next->length ) {
108                __tr_list_remove( io->outbuf_datatypes.next );
109                tr_free( next );
110            }
111        }
112    }
113}
114
115static void
116canReadWrapper( tr_peerIo * io )
117{
118    tr_bool done = 0;
119    tr_bool err = 0;
120    tr_session * session = io->session;
121
122    dbgmsg( io, "canRead" );
123
124    tr_peerIoRef( io );
125
126    /* try to consume the input buffer */
127    if( io->canRead )
128    {
129        tr_globalLock( session );
130
131        while( !done && !err )
132        {
133            size_t piece = 0;
134            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
135            const int ret = io->canRead( io, io->userData, &piece );
136
137            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
138
139            assert( tr_isPeerIo( io ) );
140
141            if( piece )
142                tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE );
143
144            if( used != piece )
145                tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE );
146
147            switch( ret )
148            {
149                case READ_NOW:
150                    if( EVBUFFER_LENGTH( io->inbuf ) )
151                        continue;
152                    done = 1;
153                    break;
154
155                case READ_LATER:
156                    done = 1;
157                    break;
158
159                case READ_ERR:
160                    err = 1;
161                    break;
162            }
163        }
164
165        tr_globalUnlock( session );
166    }
167
168    tr_peerIoUnref( io );
169}
170
171tr_bool
172tr_isPeerIo( const tr_peerIo * io )
173{
174    return ( io != NULL )
175        && ( io->magicNumber == MAGIC_NUMBER )
176        && ( io->refCount >= 0 )
177        && ( tr_isBandwidth( &io->bandwidth ) )
178        && ( tr_isAddress( &io->addr ) );
179}
180
181static void
182event_read_cb( int fd, short event UNUSED, void * vio )
183{
184    int res;
185    int e;
186    tr_peerIo * io = vio;
187
188    /* Limit the input buffer to 256K, so it doesn't grow too large */
189    size_t howmuch;
190    const tr_direction dir = TR_DOWN;
191    const size_t max = 256 * 1024;
192    size_t curlen;
193
194    assert( tr_isPeerIo( io ) );
195
196    io->hasFinishedConnecting = TRUE;
197    io->pendingEvents &= ~EV_READ;
198
199    curlen = EVBUFFER_LENGTH( io->inbuf );
200    howmuch = curlen >= max ? 0 : max - curlen;
201    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
202
203    dbgmsg( io, "libevent says this peer is ready to read" );
204
205    /* if we don't have any bandwidth left, stop reading */
206    if( howmuch < 1 ) {
207        tr_peerIoSetEnabled( io, dir, FALSE );
208        return;
209    }
210
211    errno = 0;
212    res = evbuffer_read( io->inbuf, fd, howmuch );
213    e = errno;
214
215    if( res > 0 )
216    {
217        tr_peerIoSetEnabled( io, dir, TRUE );
218
219        /* Invoke the user callback - must always be called last */
220        canReadWrapper( io );
221    }
222    else
223    {
224        short what = EVBUFFER_READ;
225
226        if( res == 0 ) /* EOF */
227            what |= EVBUFFER_EOF;
228        else if( res == -1 ) {
229            if( e == EAGAIN || e == EINTR ) {
230                tr_peerIoSetEnabled( io, dir, TRUE );
231                return;
232            }
233            what |= EVBUFFER_ERROR;
234        }
235
236        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
237
238        if( io->gotError != NULL )
239            io->gotError( io, what, io->userData );
240    }
241}
242
243static int
244tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
245{
246    int e;
247    int n;
248    struct evbuffer * buffer = io->outbuf;
249
250    howmuch = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
251
252    errno = 0;
253#ifdef WIN32
254    n = (int) send(fd, buffer->buffer, howmuch,  0 );
255#else
256    n = (int) write(fd, buffer->buffer, howmuch );
257#endif
258    e = errno;
259    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(e):"") );
260
261    if( n > 0 )
262        evbuffer_drain( buffer, n );
263
264    return n;
265}
266
267static void
268event_write_cb( int fd, short event UNUSED, void * vio )
269{
270    int res = 0;
271    int e;
272    short what = EVBUFFER_WRITE;
273    tr_peerIo * io = vio;
274    size_t howmuch;
275    const tr_direction dir = TR_UP;
276
277    assert( tr_isPeerIo( io ) );
278
279    io->hasFinishedConnecting = TRUE;
280    io->pendingEvents &= ~EV_WRITE;
281
282    dbgmsg( io, "libevent says this peer is ready to write" );
283
284    /* Write as much as possible, since the socket is non-blocking, write() will
285     * return if it can't write any more data without blocking */
286    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, EVBUFFER_LENGTH( io->outbuf ) );
287
288    /* if we don't have any bandwidth left, stop writing */
289    if( howmuch < 1 ) {
290        tr_peerIoSetEnabled( io, dir, FALSE );
291        return;
292    }
293
294    errno = 0;
295    res = tr_evbuffer_write( io, fd, howmuch );
296    e = errno;
297
298    if (res == -1) {
299#ifndef WIN32
300/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
301 *  *set errno. thus this error checking is not portable*/
302        if (e == EAGAIN || e == EINTR || e == EINPROGRESS)
303            goto reschedule;
304        /* error case */
305        what |= EVBUFFER_ERROR;
306
307#else
308        goto reschedule;
309#endif
310
311    } else if (res == 0) {
312        /* eof case */
313        what |= EVBUFFER_EOF;
314    }
315    if (res <= 0)
316        goto error;
317
318    if( EVBUFFER_LENGTH( io->outbuf ) )
319        tr_peerIoSetEnabled( io, dir, TRUE );
320
321    didWriteWrapper( io, res );
322    return;
323
324 reschedule:
325    if( EVBUFFER_LENGTH( io->outbuf ) )
326        tr_peerIoSetEnabled( io, dir, TRUE );
327    return;
328
329 error:
330
331    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
332
333    if( io->gotError != NULL )
334        io->gotError( io, what, io->userData );
335}
336
337/**
338***
339**/
340
341static tr_peerIo*
342tr_peerIoNew( tr_session       * session,
343              tr_bandwidth     * parent,
344              const tr_address * addr,
345              tr_port            port,
346              const uint8_t    * torrentHash,
347              int                isIncoming,
348              int                socket )
349{
350    tr_peerIo * io;
351
352    assert( session != NULL );
353    assert( session->events != NULL );
354    assert( tr_amInEventThread( session ) );
355
356    if( socket >= 0 )
357        tr_netSetTOS( socket, session->peerSocketTOS );
358
359    io = tr_new0( tr_peerIo, 1 );
360    io->magicNumber = MAGIC_NUMBER;
361    io->refCount = 1;
362    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
363    io->session = session;
364    io->addr = *addr;
365    io->port = port;
366    io->socket = socket;
367    io->isIncoming = isIncoming != 0;
368    io->hasFinishedConnecting = FALSE;
369    io->timeCreated = time( NULL );
370    io->inbuf = evbuffer_new( );
371    io->outbuf = evbuffer_new( );
372    tr_bandwidthConstruct( &io->bandwidth, session, parent );
373    tr_bandwidthSetPeer( &io->bandwidth, io );
374    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
375
376    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
377    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
378
379    __tr_list_init( &io->outbuf_datatypes );
380
381    return io;
382}
383
384tr_peerIo*
385tr_peerIoNewIncoming( tr_session        * session,
386                      tr_bandwidth      * parent,
387                      const tr_address  * addr,
388                      tr_port             port,
389                      int                 socket )
390{
391    assert( session );
392    assert( tr_isAddress( addr ) );
393    assert( socket >= 0 );
394
395    return tr_peerIoNew( session, parent, addr, port, NULL, 1, socket );
396}
397
398tr_peerIo*
399tr_peerIoNewOutgoing( tr_session        * session,
400                      tr_bandwidth      * parent,
401                      const tr_address  * addr,
402                      tr_port             port,
403                      const uint8_t     * torrentHash )
404{
405    int socket;
406
407    assert( session );
408    assert( tr_isAddress( addr ) );
409    assert( torrentHash );
410
411    socket = tr_netOpenTCP( session, addr, port );
412    dbgmsg( NULL, "tr_netOpenTCP returned fd %d", socket );
413
414    return socket < 0
415           ? NULL
416           : tr_peerIoNew( session, parent, addr, port, torrentHash, 0, socket );
417}
418
419static void
420trDatatypeFree( void * data )
421{
422    struct tr_datatype * dt = __tr_list_entry( data, struct tr_datatype, head );
423    tr_free(dt);
424}
425
426static void
427io_dtor( void * vio )
428{
429    tr_peerIo * io = vio;
430
431    assert( tr_isPeerIo( io ) );
432    assert( tr_amInEventThread( io->session ) );
433    assert( io->session->events != NULL );
434
435    dbgmsg( io, "in tr_peerIo destructor" );
436    event_del( &io->event_read );
437    event_del( &io->event_write );
438    tr_bandwidthDestruct( &io->bandwidth );
439    evbuffer_free( io->outbuf );
440    evbuffer_free( io->inbuf );
441    tr_netClose( io->socket );
442    tr_cryptoFree( io->crypto );
443    __tr_list_destroy( &io->outbuf_datatypes, trDatatypeFree );
444
445    memset( io, ~0, sizeof( tr_peerIo ) ); 
446    tr_free( io );
447}
448
449static void
450tr_peerIoFree( tr_peerIo * io )
451{
452    if( io )
453    {
454        dbgmsg( io, "in tr_peerIoFree" );
455        io->canRead = NULL;
456        io->didWrite = NULL;
457        io->gotError = NULL;
458        tr_runInEventThread( io->session, io_dtor, io );
459    }
460}
461
462void
463tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
464{
465    assert( tr_isPeerIo( io ) );
466
467    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
468                file, line, io->refCount, io->refCount+1 );
469
470    ++io->refCount;
471}
472
473void
474tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
475{
476    assert( tr_isPeerIo( io ) );
477
478    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
479                file, line, io->refCount, io->refCount-1 );
480
481    if( !--io->refCount )
482        tr_peerIoFree( io );
483}
484
485const tr_address*
486tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
487{
488    assert( tr_isPeerIo( io ) );
489
490    if( port )
491        *port = io->port;
492
493    return &io->addr;
494}
495
496const char*
497tr_peerIoAddrStr( const tr_address * addr, tr_port port )
498{
499    static char buf[512];
500
501    if( addr->type == TR_AF_INET ) 
502        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
503    else 
504        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
505    return buf;
506}
507
508void
509tr_peerIoSetIOFuncs( tr_peerIo        * io,
510                     tr_can_read_cb     readcb,
511                     tr_did_write_cb    writecb,
512                     tr_net_error_cb    errcb,
513                     void             * userData )
514{
515    io->canRead = readcb;
516    io->didWrite = writecb;
517    io->gotError = errcb;
518    io->userData = userData;
519}
520
521void
522tr_peerIoClear( tr_peerIo * io )
523{
524    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
525    tr_peerIoSetEnabled( io, TR_UP, FALSE );
526    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
527}
528
529int
530tr_peerIoReconnect( tr_peerIo * io )
531{
532    assert( !tr_peerIoIsIncoming( io ) );
533
534    if( io->socket >= 0 )
535        tr_netClose( io->socket );
536
537    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port ); 
538    if( io->socket >= 0 )
539    {
540        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
541        return 0;
542    }
543
544    return -1;
545}
546
547/**
548***
549**/
550
551void
552tr_peerIoSetTorrentHash( tr_peerIo *     io,
553                         const uint8_t * hash )
554{
555    assert( tr_isPeerIo( io ) );
556
557    tr_cryptoSetTorrentHash( io->crypto, hash );
558}
559
560const uint8_t*
561tr_peerIoGetTorrentHash( tr_peerIo * io )
562{
563    assert( tr_isPeerIo( io ) );
564    assert( io->crypto );
565
566    return tr_cryptoGetTorrentHash( io->crypto );
567}
568
569int
570tr_peerIoHasTorrentHash( const tr_peerIo * io )
571{
572    assert( tr_isPeerIo( io ) );
573    assert( io->crypto );
574
575    return tr_cryptoHasTorrentHash( io->crypto );
576}
577
578/**
579***
580**/
581
582void
583tr_peerIoSetPeersId( tr_peerIo *     io,
584                     const uint8_t * peer_id )
585{
586    assert( tr_isPeerIo( io ) );
587
588    if( ( io->peerIdIsSet = peer_id != NULL ) )
589        memcpy( io->peerId, peer_id, 20 );
590    else
591        memset( io->peerId, 0, 20 );
592}
593
594/**
595***
596**/
597
598void
599tr_peerIoEnableFEXT( tr_peerIo * io,
600                     tr_bool     flag )
601{
602    assert( tr_isPeerIo( io ) );
603    assert( tr_isBool( flag ) );
604
605    dbgmsg( io, "setting FEXT support flag to %d", (flag!=0) );
606    io->fastExtensionSupported = flag;
607}
608
609void
610tr_peerIoEnableLTEP( tr_peerIo  * io,
611                     tr_bool      flag )
612{
613    assert( tr_isPeerIo( io ) );
614    assert( tr_isBool( flag ) );
615
616    dbgmsg( io, "setting LTEP support flag to %d", (flag!=0) );
617    io->extendedProtocolSupported = flag;
618}
619
620/**
621***
622**/
623
624static size_t
625getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
626{
627    /* this is all kind of arbitrary, but what seems to work well is
628     * being large enough to hold the next 20 seconds' worth of input,
629     * or a few blocks, whichever is bigger.
630     * It's okay to tweak this as needed */
631    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
632    const double currentSpeed = tr_bandwidthGetPieceSpeed( &io->bandwidth, now, TR_UP );
633    const double period = 20; /* arbitrary */
634    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
635    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
636}
637
638size_t
639tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
640{
641    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
642    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
643    size_t freeSpace = 0;
644
645    if( desiredLen > currentLen )
646        freeSpace = desiredLen - currentLen;
647
648    return freeSpace;
649}
650
651/**
652***
653**/
654
655void
656tr_peerIoSetEncryption( tr_peerIo * io,
657                        int         encryptionMode )
658{
659    assert( tr_isPeerIo( io ) );
660    assert( encryptionMode == PEER_ENCRYPTION_NONE
661         || encryptionMode == PEER_ENCRYPTION_RC4 );
662
663    io->encryptionMode = encryptionMode;
664}
665
666/**
667***
668**/
669
670void
671tr_peerIoWrite( tr_peerIo   * io,
672                const void  * bytes,
673                size_t        byteCount,
674                tr_bool       isPieceData )
675{
676    struct tr_datatype * datatype;
677
678    assert( tr_amInEventThread( io->session ) );
679    dbgmsg( io, "adding %zu bytes into io->output", byteCount );
680
681    datatype = tr_new( struct tr_datatype, 1 );
682    datatype->isPieceData = isPieceData != 0;
683    datatype->length = byteCount;
684
685    __tr_list_init( &datatype->head );
686    __tr_list_append( &io->outbuf_datatypes, &datatype->head );
687
688    switch( io->encryptionMode )
689    {
690        case PEER_ENCRYPTION_RC4:
691        {
692            uint8_t tmp[MAX_STACK_ARRAY_SIZE];
693            const uint8_t * walk = bytes;
694            evbuffer_expand( io->outbuf, byteCount );
695            while( byteCount > 0 )
696            {
697                const size_t thisPass = MIN( byteCount, sizeof( tmp ) );
698                tr_cryptoEncrypt( io->crypto, thisPass, walk, tmp );
699                evbuffer_add( io->outbuf, tmp, thisPass );
700                walk += thisPass;
701                byteCount -= thisPass;
702            }
703            break;
704        }
705
706        case PEER_ENCRYPTION_NONE:
707            evbuffer_add( io->outbuf, bytes, byteCount );
708            break;
709
710        default:
711            assert( 0 );
712            break;
713    }
714}
715
716void
717tr_peerIoWriteBuf( tr_peerIo         * io,
718                   struct evbuffer   * buf,
719                   tr_bool             isPieceData )
720{
721    const size_t n = EVBUFFER_LENGTH( buf );
722    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
723    evbuffer_drain( buf, n );
724}
725
726/***
727****
728***/
729
730void
731tr_peerIoReadBytes( tr_peerIo       * io,
732                    struct evbuffer * inbuf,
733                    void            * bytes,
734                    size_t            byteCount )
735{
736    assert( tr_isPeerIo( io ) );
737    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
738
739    switch( io->encryptionMode )
740    {
741        case PEER_ENCRYPTION_NONE:
742            evbuffer_remove( inbuf, bytes, byteCount );
743            break;
744
745        case PEER_ENCRYPTION_RC4:
746            evbuffer_remove( inbuf, bytes, byteCount );
747            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
748            break;
749
750        default:
751            assert( 0 );
752    }
753}
754
755void
756tr_peerIoDrain( tr_peerIo       * io,
757                struct evbuffer * inbuf,
758                size_t            byteCount )
759{
760    uint8_t tmp[MAX_STACK_ARRAY_SIZE];
761
762    while( byteCount > 0 )
763    {
764        const size_t thisPass = MIN( byteCount, sizeof( tmp ) );
765        tr_peerIoReadBytes( io, inbuf, tmp, thisPass );
766        byteCount -= thisPass;
767    }
768}
769
770/***
771****
772***/
773
774static int
775tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
776{
777    int res = 0;
778
779    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
780    {
781        int e;
782        errno = 0;
783        res = evbuffer_read( io->inbuf, io->socket, howmuch );
784        e = errno;
785
786        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
787
788        if( EVBUFFER_LENGTH( io->inbuf ) )
789            canReadWrapper( io );
790
791        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
792        {
793            short what = EVBUFFER_READ | EVBUFFER_ERROR;
794            if( res == 0 )
795                what |= EVBUFFER_EOF;
796            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
797            io->gotError( io, what, io->userData );
798        }
799    }
800
801    return res;
802}
803
804static int
805tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
806{
807    int n = 0;
808
809    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
810    {
811        int e;
812        errno = 0;
813        n = tr_evbuffer_write( io, io->socket, howmuch );
814        e = errno;
815
816        if( n > 0 )
817            didWriteWrapper( io, n );
818
819        if( ( n < 0 ) && ( io->gotError ) && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
820        {
821            const short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
822            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, strerror( e ) );
823            io->gotError( io, what, io->userData );
824        }
825    }
826
827    return n;
828}
829
830int
831tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
832{
833    int bytesUsed = 0;
834
835    assert( tr_isPeerIo( io ) );
836    assert( tr_isDirection( dir ) );
837
838    if( io->hasFinishedConnecting )
839    {
840        if( dir == TR_DOWN )
841            bytesUsed = tr_peerIoTryRead( io, limit );
842        else
843            bytesUsed = tr_peerIoTryWrite( io, limit );
844    }
845
846    dbgmsg( io, "flushing peer-io, hasFinishedConnecting %d, direction %d, limit %zu, bytesUsed %d", (int)io->hasFinishedConnecting, (int)dir, limit, bytesUsed );
847    return bytesUsed;
848}
849
850/***
851****
852****/
853
854static void
855event_enable( tr_peerIo * io, short event )
856{
857    assert( tr_amInEventThread( io->session ) );
858    assert( io->session != NULL );
859    assert( io->session->events != NULL );
860    assert( event_initialized( &io->event_read ) );
861    assert( event_initialized( &io->event_write ) );
862
863    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
864    {
865        dbgmsg( io, "enabling libevent ready-to-read polling" );
866        event_add( &io->event_read, NULL );
867        io->pendingEvents |= EV_READ;
868    }
869
870    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
871    {
872        dbgmsg( io, "enabling libevent ready-to-write polling" );
873        event_add( &io->event_write, NULL );
874        io->pendingEvents |= EV_WRITE;
875    }
876}
877
878static void
879event_disable( struct tr_peerIo * io, short event )
880{
881    assert( tr_amInEventThread( io->session ) );
882    assert( io->session != NULL );
883    assert( io->session->events != NULL );
884    assert( event_initialized( &io->event_read ) );
885    assert( event_initialized( &io->event_write ) );
886
887    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
888    {
889        dbgmsg( io, "disabling libevent ready-to-read polling" );
890        event_del( &io->event_read );
891        io->pendingEvents &= ~EV_READ;
892    }
893
894    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
895    {
896        dbgmsg( io, "disabling libevent ready-to-write polling" );
897        event_del( &io->event_write );
898        io->pendingEvents &= ~EV_WRITE;
899    }
900}
901
902
903void
904tr_peerIoSetEnabled( tr_peerIo    * io,
905                     tr_direction   dir,
906                     tr_bool        isEnabled )
907{
908    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
909
910    assert( tr_isPeerIo( io ) );
911    assert( tr_isDirection( dir ) );
912    assert( tr_amInEventThread( io->session ) );
913    assert( io->session->events != NULL );
914
915    if( isEnabled )
916        event_enable( io, event );
917    else
918        event_disable( io, event );
919}
Note: See TracBrowser for help on using the repository browser.