source: trunk/libtransmission/peer-io.c @ 7803

Last change on this file since 7803 was 7803, checked in by charles, 13 years ago

(trunk libT) add more assertions to make Biiaru crash moreHHHHHHHHHHHHHHHHhelp track down the cause of Biiaru's crashes

  • Property svn:keywords set to Date Rev Author Id
File size: 22.6 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7803 2009-01-26 02:51:50Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event.h>
27
28#include "transmission.h"
29#include "session.h"
30#include "bandwidth.h"
31#include "crypto.h"
32#include "list.h"
33#include "net.h"
34#include "peer-io.h"
35#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
36#include "trevent.h"
37#include "utils.h"
38
39#define MAGIC_NUMBER 206745
40
41static size_t
42getPacketOverhead( size_t d )
43{
44    /**
45     * http://sd.wareonearth.com/~phil/net/overhead/
46     *
47     * TCP over Ethernet:
48     * Assuming no header compression (e.g. not PPP)
49     * Add 20 IPv4 header or 40 IPv6 header (no options)
50     * Add 20 TCP header
51     * Add 12 bytes optional TCP timestamps
52     * Max TCP Payload data rates over ethernet are thus:
53     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
54     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
55     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
56     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
57     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
58     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
59     */
60    static const double assumed_payload_data_rate = 94.0;
61
62    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
63}
64
65/**
66***
67**/
68
69#define dbgmsg( io, ... ) \
70    do { \
71        if( tr_deepLoggingIsActive( ) ) \
72            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
73    } while( 0 )
74
75struct tr_datatype
76{
77    tr_bool  isPieceData;
78    size_t   length;
79    struct __tr_list head;
80};
81
82/***
83****
84***/
85
86static void
87didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
88{
89    while( bytes_transferred )
90    {
91        struct tr_datatype * next = __tr_list_entry( io->outbuf_datatypes.next, struct tr_datatype, head );
92        const size_t payload = MIN( next->length, bytes_transferred );
93        const size_t overhead = getPacketOverhead( payload );
94
95        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData );
96
97        if( overhead > 0 )
98            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE );
99
100        if( io->didWrite )
101            io->didWrite( io, payload, next->isPieceData, io->userData );
102
103        bytes_transferred -= payload;
104        next->length -= payload;
105        if( !next->length ) {
106            __tr_list_remove( io->outbuf_datatypes.next );
107            tr_free( next );
108        }
109    }
110}
111
112static void
113canReadWrapper( tr_peerIo * io )
114{
115    tr_bool done = 0;
116    tr_bool err = 0;
117    tr_session * session = io->session;
118
119    dbgmsg( io, "canRead" );
120
121    tr_peerIoRef( io );
122
123    /* try to consume the input buffer */
124    if( io->canRead )
125    {
126        tr_globalLock( session );
127
128        while( !done && !err )
129        {
130            size_t piece = 0;
131            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
132            const int ret = io->canRead( io, io->userData, &piece );
133
134            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
135
136            assert( tr_isPeerIo( io ) );
137
138            if( piece )
139                tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE );
140
141            if( used != piece )
142                tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE );
143
144            switch( ret )
145            {
146                case READ_NOW:
147                    if( EVBUFFER_LENGTH( io->inbuf ) )
148                        continue;
149                    done = 1;
150                    break;
151
152                case READ_LATER:
153                    done = 1;
154                    break;
155
156                case READ_ERR:
157                    err = 1;
158                    break;
159            }
160        }
161
162        tr_globalUnlock( session );
163    }
164
165    tr_peerIoUnref( io );
166}
167
168tr_bool
169tr_isPeerIo( const tr_peerIo * io )
170{
171    return ( io != NULL )
172        && ( io->magicNumber == MAGIC_NUMBER )
173        && ( io->refCount >= 0 )
174        && ( tr_isBandwidth( &io->bandwidth ) )
175        && ( tr_isAddress( &io->addr ) );
176}
177
178static void
179event_read_cb( int fd, short event UNUSED, void * vio )
180{
181    int res;
182    int e;
183    tr_peerIo * io = vio;
184
185    /* Limit the input buffer to 256K, so it doesn't grow too large */
186    size_t howmuch;
187    const tr_direction dir = TR_DOWN;
188    const size_t max = 256 * 1024;
189    size_t curlen;
190
191    assert( tr_isPeerIo( io ) );
192
193    io->hasFinishedConnecting = TRUE;
194    io->pendingEvents &= ~EV_READ;
195
196    curlen = EVBUFFER_LENGTH( io->inbuf );
197    howmuch = curlen >= max ? 0 : max - curlen;
198    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
199
200    dbgmsg( io, "libevent says this peer is ready to read" );
201
202    /* if we don't have any bandwidth left, stop reading */
203    if( howmuch < 1 ) {
204        tr_peerIoSetEnabled( io, dir, FALSE );
205        return;
206    }
207
208    errno = 0;
209    res = evbuffer_read( io->inbuf, fd, howmuch );
210    e = errno;
211
212    if( res > 0 )
213    {
214        tr_peerIoSetEnabled( io, dir, TRUE );
215
216        /* Invoke the user callback - must always be called last */
217        canReadWrapper( io );
218    }
219    else
220    {
221        short what = EVBUFFER_READ;
222
223        if( res == 0 ) /* EOF */
224            what |= EVBUFFER_EOF;
225        else if( res == -1 ) {
226            if( e == EAGAIN || e == EINTR ) {
227                tr_peerIoSetEnabled( io, dir, TRUE );
228                return;
229            }
230            what |= EVBUFFER_ERROR;
231        }
232
233        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
234
235        if( io->gotError != NULL )
236            io->gotError( io, what, io->userData );
237    }
238}
239
240static int
241tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
242{
243    int e;
244    int n;
245    struct evbuffer * buffer = io->outbuf;
246
247    howmuch = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
248
249    errno = 0;
250#ifdef WIN32
251    n = (int) send(fd, buffer->buffer, howmuch,  0 );
252#else
253    n = (int) write(fd, buffer->buffer, howmuch );
254#endif
255    e = errno;
256    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(e):"") );
257
258    if( n > 0 )
259        evbuffer_drain( buffer, n );
260
261    return n;
262}
263
264static void
265event_write_cb( int fd, short event UNUSED, void * vio )
266{
267    int res = 0;
268    int e;
269    short what = EVBUFFER_WRITE;
270    tr_peerIo * io = vio;
271    size_t howmuch;
272    const tr_direction dir = TR_UP;
273
274    assert( tr_isPeerIo( io ) );
275
276    io->hasFinishedConnecting = TRUE;
277    io->pendingEvents &= ~EV_WRITE;
278
279    dbgmsg( io, "libevent says this peer is ready to write" );
280
281    /* Write as much as possible, since the socket is non-blocking, write() will
282     * return if it can't write any more data without blocking */
283    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, EVBUFFER_LENGTH( io->outbuf ) );
284
285    /* if we don't have any bandwidth left, stop writing */
286    if( howmuch < 1 ) {
287        tr_peerIoSetEnabled( io, dir, FALSE );
288        return;
289    }
290
291    errno = 0;
292    res = tr_evbuffer_write( io, fd, howmuch );
293    e = errno;
294
295    if (res == -1) {
296#ifndef WIN32
297/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
298 *  *set errno. thus this error checking is not portable*/
299        if (e == EAGAIN || e == EINTR || e == EINPROGRESS)
300            goto reschedule;
301        /* error case */
302        what |= EVBUFFER_ERROR;
303
304#else
305        goto reschedule;
306#endif
307
308    } else if (res == 0) {
309        /* eof case */
310        what |= EVBUFFER_EOF;
311    }
312    if (res <= 0)
313        goto error;
314
315    if( EVBUFFER_LENGTH( io->outbuf ) )
316        tr_peerIoSetEnabled( io, dir, TRUE );
317
318    didWriteWrapper( io, res );
319    return;
320
321 reschedule:
322    if( EVBUFFER_LENGTH( io->outbuf ) )
323        tr_peerIoSetEnabled( io, dir, TRUE );
324    return;
325
326 error:
327
328    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
329
330    if( io->gotError != NULL )
331        io->gotError( io, what, io->userData );
332}
333
334/**
335***
336**/
337
338static tr_peerIo*
339tr_peerIoNew( tr_session       * session,
340              tr_bandwidth     * parent,
341              const tr_address * addr,
342              tr_port            port,
343              const uint8_t    * torrentHash,
344              int                isIncoming,
345              int                socket )
346{
347    tr_peerIo * io;
348
349    if( socket >= 0 )
350        tr_netSetTOS( socket, session->peerSocketTOS );
351
352    io = tr_new0( tr_peerIo, 1 );
353    io->magicNumber = MAGIC_NUMBER;
354    io->refCount = 1;
355    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
356    io->session = session;
357    io->addr = *addr;
358    io->port = port;
359    io->socket = socket;
360    io->isIncoming = isIncoming != 0;
361    io->hasFinishedConnecting = FALSE;
362    io->timeCreated = time( NULL );
363    io->inbuf = evbuffer_new( );
364    io->outbuf = evbuffer_new( );
365    tr_bandwidthConstruct( &io->bandwidth, session, parent );
366    tr_bandwidthSetPeer( &io->bandwidth, io );
367    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
368
369    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
370    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
371
372    __tr_list_init( &io->outbuf_datatypes );
373
374    return io;
375}
376
377tr_peerIo*
378tr_peerIoNewIncoming( tr_session        * session,
379                      tr_bandwidth      * parent,
380                      const tr_address  * addr,
381                      tr_port             port,
382                      int                 socket )
383{
384    assert( session );
385    assert( tr_isAddress( addr ) );
386    assert( socket >= 0 );
387
388    return tr_peerIoNew( session, parent, addr, port, NULL, 1, socket );
389}
390
391tr_peerIo*
392tr_peerIoNewOutgoing( tr_session        * session,
393                      tr_bandwidth      * parent,
394                      const tr_address  * addr,
395                      tr_port             port,
396                      const uint8_t     * torrentHash )
397{
398    int socket;
399
400    assert( session );
401    assert( tr_isAddress( addr ) );
402    assert( torrentHash );
403
404    socket = tr_netOpenTCP( session, addr, port );
405    dbgmsg( NULL, "tr_netOpenTCP returned fd %d", socket );
406
407    return socket < 0
408           ? NULL
409           : tr_peerIoNew( session, parent, addr, port, torrentHash, 0, socket );
410}
411
412static void
413trDatatypeFree( void * data )
414{
415    struct tr_datatype * dt = __tr_list_entry( data, struct tr_datatype, head );
416    tr_free(dt);
417}
418
419static void
420io_dtor( void * vio )
421{
422    tr_peerIo * io = vio;
423
424    assert( tr_isPeerIo( io ) );
425    assert( tr_amInEventThread( io->session ) );
426    assert( io->session->events != NULL );
427
428    dbgmsg( io, "in tr_peerIo destructor" );
429    event_del( &io->event_read );
430    event_del( &io->event_write );
431    tr_bandwidthDestruct( &io->bandwidth );
432    evbuffer_free( io->outbuf );
433    evbuffer_free( io->inbuf );
434    tr_netClose( io->socket );
435    tr_cryptoFree( io->crypto );
436    __tr_list_destroy( &io->outbuf_datatypes, trDatatypeFree );
437
438    memset( io, ~0, sizeof( tr_peerIo ) ); 
439    tr_free( io );
440}
441
442static void
443tr_peerIoFree( tr_peerIo * io )
444{
445    if( io )
446    {
447        dbgmsg( io, "in tr_peerIoFree" );
448        io->canRead = NULL;
449        io->didWrite = NULL;
450        io->gotError = NULL;
451        tr_runInEventThread( io->session, io_dtor, io );
452    }
453}
454
455void
456tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
457{
458    assert( tr_isPeerIo( io ) );
459
460    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
461                file, line, io->refCount, io->refCount+1 );
462
463    ++io->refCount;
464}
465
466void
467tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
468{
469    assert( tr_isPeerIo( io ) );
470
471    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
472                file, line, io->refCount, io->refCount-1 );
473
474    if( !--io->refCount )
475        tr_peerIoFree( io );
476}
477
478const tr_address*
479tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
480{
481    assert( tr_isPeerIo( io ) );
482
483    if( port )
484        *port = io->port;
485
486    return &io->addr;
487}
488
489const char*
490tr_peerIoAddrStr( const tr_address * addr, tr_port port )
491{
492    static char buf[512];
493
494    if( addr->type == TR_AF_INET ) 
495        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
496    else 
497        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
498    return buf;
499}
500
501void
502tr_peerIoSetIOFuncs( tr_peerIo        * io,
503                     tr_can_read_cb     readcb,
504                     tr_did_write_cb    writecb,
505                     tr_net_error_cb    errcb,
506                     void             * userData )
507{
508    io->canRead = readcb;
509    io->didWrite = writecb;
510    io->gotError = errcb;
511    io->userData = userData;
512}
513
514void
515tr_peerIoClear( tr_peerIo * io )
516{
517    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
518    tr_peerIoSetEnabled( io, TR_UP, FALSE );
519    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
520}
521
522int
523tr_peerIoReconnect( tr_peerIo * io )
524{
525    assert( !tr_peerIoIsIncoming( io ) );
526
527    if( io->socket >= 0 )
528        tr_netClose( io->socket );
529
530    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port ); 
531    if( io->socket >= 0 )
532    {
533        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
534        return 0;
535    }
536
537    return -1;
538}
539
540/**
541***
542**/
543
544void
545tr_peerIoSetTorrentHash( tr_peerIo *     io,
546                         const uint8_t * hash )
547{
548    assert( tr_isPeerIo( io ) );
549
550    tr_cryptoSetTorrentHash( io->crypto, hash );
551}
552
553const uint8_t*
554tr_peerIoGetTorrentHash( tr_peerIo * io )
555{
556    assert( tr_isPeerIo( io ) );
557    assert( io->crypto );
558
559    return tr_cryptoGetTorrentHash( io->crypto );
560}
561
562int
563tr_peerIoHasTorrentHash( const tr_peerIo * io )
564{
565    assert( tr_isPeerIo( io ) );
566    assert( io->crypto );
567
568    return tr_cryptoHasTorrentHash( io->crypto );
569}
570
571/**
572***
573**/
574
575void
576tr_peerIoSetPeersId( tr_peerIo *     io,
577                     const uint8_t * peer_id )
578{
579    assert( tr_isPeerIo( io ) );
580
581    if( ( io->peerIdIsSet = peer_id != NULL ) )
582        memcpy( io->peerId, peer_id, 20 );
583    else
584        memset( io->peerId, 0, 20 );
585}
586
587/**
588***
589**/
590
591void
592tr_peerIoEnableFEXT( tr_peerIo * io,
593                     tr_bool     flag )
594{
595    assert( tr_isPeerIo( io ) );
596    assert( tr_isBool( flag ) );
597
598    dbgmsg( io, "setting FEXT support flag to %d", (flag!=0) );
599    io->fastExtensionSupported = flag;
600}
601
602void
603tr_peerIoEnableLTEP( tr_peerIo  * io,
604                     tr_bool      flag )
605{
606    assert( tr_isPeerIo( io ) );
607    assert( tr_isBool( flag ) );
608
609    dbgmsg( io, "setting LTEP support flag to %d", (flag!=0) );
610    io->extendedProtocolSupported = flag;
611}
612
613/**
614***
615**/
616
617static size_t
618getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
619{
620    /* this is all kind of arbitrary, but what seems to work well is
621     * being large enough to hold the next 20 seconds' worth of input,
622     * or a few blocks, whichever is bigger.
623     * It's okay to tweak this as needed */
624    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
625    const double currentSpeed = tr_bandwidthGetPieceSpeed( &io->bandwidth, now, TR_UP );
626    const double period = 20; /* arbitrary */
627    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
628    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
629}
630
631size_t
632tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
633{
634    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
635    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
636    size_t freeSpace = 0;
637
638    if( desiredLen > currentLen )
639        freeSpace = desiredLen - currentLen;
640
641    return freeSpace;
642}
643
644/**
645***
646**/
647
648void
649tr_peerIoSetEncryption( tr_peerIo * io,
650                        int         encryptionMode )
651{
652    assert( tr_isPeerIo( io ) );
653    assert( encryptionMode == PEER_ENCRYPTION_NONE
654         || encryptionMode == PEER_ENCRYPTION_RC4 );
655
656    io->encryptionMode = encryptionMode;
657}
658
659/**
660***
661**/
662
663void
664tr_peerIoWrite( tr_peerIo   * io,
665                const void  * bytes,
666                size_t        byteCount,
667                tr_bool       isPieceData )
668{
669    struct tr_datatype * datatype;
670
671    assert( tr_amInEventThread( io->session ) );
672    dbgmsg( io, "adding %zu bytes into io->output", byteCount );
673
674    datatype = tr_new( struct tr_datatype, 1 );
675    datatype->isPieceData = isPieceData != 0;
676    datatype->length = byteCount;
677
678    __tr_list_init( &datatype->head );
679    __tr_list_append( &io->outbuf_datatypes, &datatype->head );
680
681    switch( io->encryptionMode )
682    {
683        case PEER_ENCRYPTION_RC4:
684        {
685            uint8_t tmp[MAX_STACK_ARRAY_SIZE];
686            const uint8_t * walk = bytes;
687            evbuffer_expand( io->outbuf, byteCount );
688            while( byteCount > 0 )
689            {
690                const size_t thisPass = MIN( byteCount, sizeof( tmp ) );
691                tr_cryptoEncrypt( io->crypto, thisPass, walk, tmp );
692                evbuffer_add( io->outbuf, tmp, thisPass );
693                walk += thisPass;
694                byteCount -= thisPass;
695            }
696            break;
697        }
698
699        case PEER_ENCRYPTION_NONE:
700            evbuffer_add( io->outbuf, bytes, byteCount );
701            break;
702
703        default:
704            assert( 0 );
705            break;
706    }
707}
708
709void
710tr_peerIoWriteBuf( tr_peerIo         * io,
711                   struct evbuffer   * buf,
712                   tr_bool             isPieceData )
713{
714    const size_t n = EVBUFFER_LENGTH( buf );
715    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
716    evbuffer_drain( buf, n );
717}
718
719/***
720****
721***/
722
723void
724tr_peerIoReadBytes( tr_peerIo       * io,
725                    struct evbuffer * inbuf,
726                    void            * bytes,
727                    size_t            byteCount )
728{
729    assert( tr_isPeerIo( io ) );
730    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
731
732    switch( io->encryptionMode )
733    {
734        case PEER_ENCRYPTION_NONE:
735            evbuffer_remove( inbuf, bytes, byteCount );
736            break;
737
738        case PEER_ENCRYPTION_RC4:
739            evbuffer_remove( inbuf, bytes, byteCount );
740            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
741            break;
742
743        default:
744            assert( 0 );
745    }
746}
747
748void
749tr_peerIoDrain( tr_peerIo       * io,
750                struct evbuffer * inbuf,
751                size_t            byteCount )
752{
753    uint8_t tmp[MAX_STACK_ARRAY_SIZE];
754
755    while( byteCount > 0 )
756    {
757        const size_t thisPass = MIN( byteCount, sizeof( tmp ) );
758        tr_peerIoReadBytes( io, inbuf, tmp, thisPass );
759        byteCount -= thisPass;
760    }
761}
762
763/***
764****
765***/
766
767static int
768tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
769{
770    int res = 0;
771
772    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
773    {
774        int e;
775        errno = 0;
776        res = evbuffer_read( io->inbuf, io->socket, howmuch );
777        e = errno;
778
779        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
780
781        if( EVBUFFER_LENGTH( io->inbuf ) )
782            canReadWrapper( io );
783
784        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
785        {
786            short what = EVBUFFER_READ | EVBUFFER_ERROR;
787            if( res == 0 )
788                what |= EVBUFFER_EOF;
789            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
790            io->gotError( io, what, io->userData );
791        }
792    }
793
794    return res;
795}
796
797static int
798tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
799{
800    int n = 0;
801
802    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
803    {
804        int e;
805        errno = 0;
806        n = tr_evbuffer_write( io, io->socket, howmuch );
807        e = errno;
808
809        if( n > 0 )
810            didWriteWrapper( io, n );
811
812        if( ( n < 0 ) && ( io->gotError ) && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
813        {
814            const short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
815            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, strerror( e ) );
816            io->gotError( io, what, io->userData );
817        }
818    }
819
820    return n;
821}
822
823int
824tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
825{
826    int bytesUsed = 0;
827
828    assert( tr_isPeerIo( io ) );
829    assert( tr_isDirection( dir ) );
830
831    if( io->hasFinishedConnecting )
832    {
833        if( dir == TR_DOWN )
834            bytesUsed = tr_peerIoTryRead( io, limit );
835        else
836            bytesUsed = tr_peerIoTryWrite( io, limit );
837    }
838
839    dbgmsg( io, "flushing peer-io, hasFinishedConnecting %d, direction %d, limit %zu, bytesUsed %d", (int)io->hasFinishedConnecting, (int)dir, limit, bytesUsed );
840    return bytesUsed;
841}
842
843/***
844****
845****/
846
847static void
848event_enable( tr_peerIo * io, short event )
849{
850    assert( tr_amInEventThread( io->session ) );
851    assert( io->session->events != NULL );
852    assert( event_initialized( &io->event_read ) );
853    assert( event_initialized( &io->event_write ) );
854
855    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
856    {
857        dbgmsg( io, "enabling libevent ready-to-read polling" );
858        event_add( &io->event_read, NULL );
859        io->pendingEvents |= EV_READ;
860    }
861
862    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
863    {
864        dbgmsg( io, "enabling libevent ready-to-write polling" );
865        event_add( &io->event_write, NULL );
866        io->pendingEvents |= EV_WRITE;
867    }
868}
869
870static void
871event_disable( struct tr_peerIo * io, short event )
872{
873    assert( tr_amInEventThread( io->session ) );
874    assert( io->session->events != NULL );
875    assert( event_initialized( &io->event_read ) );
876    assert( event_initialized( &io->event_write ) );
877
878    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
879    {
880        dbgmsg( io, "disabling libevent ready-to-read polling" );
881        event_del( &io->event_read );
882        io->pendingEvents &= ~EV_READ;
883    }
884
885    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
886    {
887        dbgmsg( io, "disabling libevent ready-to-write polling" );
888        event_del( &io->event_write );
889        io->pendingEvents &= ~EV_WRITE;
890    }
891}
892
893
894void
895tr_peerIoSetEnabled( tr_peerIo    * io,
896                     tr_direction   dir,
897                     tr_bool        isEnabled )
898{
899    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
900
901    assert( tr_isPeerIo( io ) );
902    assert( tr_isDirection( dir ) );
903    assert( tr_amInEventThread( io->session ) );
904    assert( io->session->events != NULL );
905
906    if( isEnabled )
907        event_enable( io, event );
908    else
909        event_disable( io, event );
910}
Note: See TracBrowser for help on using the repository browser.