source: trunk/libtransmission/peer-io.c @ 7811

Last change on this file since 7811 was 7811, checked in by charles, 13 years ago

(trunk libT) add more assertions to ensure that the libevent calls are all being made from the same thread

  • Property svn:keywords set to Date Rev Author Id
File size: 22.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7811 2009-01-28 19:35:39Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event.h>
27
28#include "transmission.h"
29#include "session.h"
30#include "bandwidth.h"
31#include "crypto.h"
32#include "list.h"
33#include "net.h"
34#include "peer-io.h"
35#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
36#include "trevent.h"
37#include "utils.h"
38
39#define MAGIC_NUMBER 206745
40
41static size_t
42getPacketOverhead( size_t d )
43{
44    /**
45     * http://sd.wareonearth.com/~phil/net/overhead/
46     *
47     * TCP over Ethernet:
48     * Assuming no header compression (e.g. not PPP)
49     * Add 20 IPv4 header or 40 IPv6 header (no options)
50     * Add 20 TCP header
51     * Add 12 bytes optional TCP timestamps
52     * Max TCP Payload data rates over ethernet are thus:
53     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
54     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
55     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
56     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
57     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
58     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
59     */
60    static const double assumed_payload_data_rate = 94.0;
61
62    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
63}
64
65/**
66***
67**/
68
69#define dbgmsg( io, ... ) \
70    do { \
71        if( tr_deepLoggingIsActive( ) ) \
72            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
73    } while( 0 )
74
75struct tr_datatype
76{
77    tr_bool  isPieceData;
78    size_t   length;
79    struct __tr_list head;
80};
81
82/***
83****
84***/
85
86static void
87didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
88{
89    while( bytes_transferred )
90    {
91        struct tr_datatype * next = __tr_list_entry( io->outbuf_datatypes.next, struct tr_datatype, head );
92        const size_t payload = MIN( next->length, bytes_transferred );
93        const size_t overhead = getPacketOverhead( payload );
94
95        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData );
96
97        if( overhead > 0 )
98            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE );
99
100        if( io->didWrite )
101            io->didWrite( io, payload, next->isPieceData, io->userData );
102
103        bytes_transferred -= payload;
104        next->length -= payload;
105        if( !next->length ) {
106            __tr_list_remove( io->outbuf_datatypes.next );
107            tr_free( next );
108        }
109    }
110}
111
112static void
113canReadWrapper( tr_peerIo * io )
114{
115    tr_bool done = 0;
116    tr_bool err = 0;
117    tr_session * session = io->session;
118
119    dbgmsg( io, "canRead" );
120
121    tr_peerIoRef( io );
122
123    /* try to consume the input buffer */
124    if( io->canRead )
125    {
126        tr_globalLock( session );
127
128        while( !done && !err )
129        {
130            size_t piece = 0;
131            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
132            const int ret = io->canRead( io, io->userData, &piece );
133
134            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
135
136            assert( tr_isPeerIo( io ) );
137
138            if( piece )
139                tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE );
140
141            if( used != piece )
142                tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE );
143
144            switch( ret )
145            {
146                case READ_NOW:
147                    if( EVBUFFER_LENGTH( io->inbuf ) )
148                        continue;
149                    done = 1;
150                    break;
151
152                case READ_LATER:
153                    done = 1;
154                    break;
155
156                case READ_ERR:
157                    err = 1;
158                    break;
159            }
160        }
161
162        tr_globalUnlock( session );
163    }
164
165    tr_peerIoUnref( io );
166}
167
168tr_bool
169tr_isPeerIo( const tr_peerIo * io )
170{
171    return ( io != NULL )
172        && ( io->magicNumber == MAGIC_NUMBER )
173        && ( io->refCount >= 0 )
174        && ( tr_isBandwidth( &io->bandwidth ) )
175        && ( tr_isAddress( &io->addr ) );
176}
177
178static void
179event_read_cb( int fd, short event UNUSED, void * vio )
180{
181    int res;
182    int e;
183    tr_peerIo * io = vio;
184
185    /* Limit the input buffer to 256K, so it doesn't grow too large */
186    size_t howmuch;
187    const tr_direction dir = TR_DOWN;
188    const size_t max = 256 * 1024;
189    size_t curlen;
190
191    assert( tr_isPeerIo( io ) );
192
193    io->hasFinishedConnecting = TRUE;
194    io->pendingEvents &= ~EV_READ;
195
196    curlen = EVBUFFER_LENGTH( io->inbuf );
197    howmuch = curlen >= max ? 0 : max - curlen;
198    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
199
200    dbgmsg( io, "libevent says this peer is ready to read" );
201
202    /* if we don't have any bandwidth left, stop reading */
203    if( howmuch < 1 ) {
204        tr_peerIoSetEnabled( io, dir, FALSE );
205        return;
206    }
207
208    errno = 0;
209    res = evbuffer_read( io->inbuf, fd, howmuch );
210    e = errno;
211
212    if( res > 0 )
213    {
214        tr_peerIoSetEnabled( io, dir, TRUE );
215
216        /* Invoke the user callback - must always be called last */
217        canReadWrapper( io );
218    }
219    else
220    {
221        short what = EVBUFFER_READ;
222
223        if( res == 0 ) /* EOF */
224            what |= EVBUFFER_EOF;
225        else if( res == -1 ) {
226            if( e == EAGAIN || e == EINTR ) {
227                tr_peerIoSetEnabled( io, dir, TRUE );
228                return;
229            }
230            what |= EVBUFFER_ERROR;
231        }
232
233        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
234
235        if( io->gotError != NULL )
236            io->gotError( io, what, io->userData );
237    }
238}
239
240static int
241tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
242{
243    int e;
244    int n;
245    struct evbuffer * buffer = io->outbuf;
246
247    howmuch = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
248
249    errno = 0;
250#ifdef WIN32
251    n = (int) send(fd, buffer->buffer, howmuch,  0 );
252#else
253    n = (int) write(fd, buffer->buffer, howmuch );
254#endif
255    e = errno;
256    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(e):"") );
257
258    if( n > 0 )
259        evbuffer_drain( buffer, n );
260
261    return n;
262}
263
264static void
265event_write_cb( int fd, short event UNUSED, void * vio )
266{
267    int res = 0;
268    int e;
269    short what = EVBUFFER_WRITE;
270    tr_peerIo * io = vio;
271    size_t howmuch;
272    const tr_direction dir = TR_UP;
273
274    assert( tr_isPeerIo( io ) );
275
276    io->hasFinishedConnecting = TRUE;
277    io->pendingEvents &= ~EV_WRITE;
278
279    dbgmsg( io, "libevent says this peer is ready to write" );
280
281    /* Write as much as possible, since the socket is non-blocking, write() will
282     * return if it can't write any more data without blocking */
283    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, EVBUFFER_LENGTH( io->outbuf ) );
284
285    /* if we don't have any bandwidth left, stop writing */
286    if( howmuch < 1 ) {
287        tr_peerIoSetEnabled( io, dir, FALSE );
288        return;
289    }
290
291    errno = 0;
292    res = tr_evbuffer_write( io, fd, howmuch );
293    e = errno;
294
295    if (res == -1) {
296#ifndef WIN32
297/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
298 *  *set errno. thus this error checking is not portable*/
299        if (e == EAGAIN || e == EINTR || e == EINPROGRESS)
300            goto reschedule;
301        /* error case */
302        what |= EVBUFFER_ERROR;
303
304#else
305        goto reschedule;
306#endif
307
308    } else if (res == 0) {
309        /* eof case */
310        what |= EVBUFFER_EOF;
311    }
312    if (res <= 0)
313        goto error;
314
315    if( EVBUFFER_LENGTH( io->outbuf ) )
316        tr_peerIoSetEnabled( io, dir, TRUE );
317
318    didWriteWrapper( io, res );
319    return;
320
321 reschedule:
322    if( EVBUFFER_LENGTH( io->outbuf ) )
323        tr_peerIoSetEnabled( io, dir, TRUE );
324    return;
325
326 error:
327
328    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
329
330    if( io->gotError != NULL )
331        io->gotError( io, what, io->userData );
332}
333
334/**
335***
336**/
337
338static tr_peerIo*
339tr_peerIoNew( tr_session       * session,
340              tr_bandwidth     * parent,
341              const tr_address * addr,
342              tr_port            port,
343              const uint8_t    * torrentHash,
344              int                isIncoming,
345              int                socket )
346{
347    tr_peerIo * io;
348
349    assert( session != NULL );
350    assert( session->events != NULL );
351    assert( tr_amInEventThread( session ) );
352
353    if( socket >= 0 )
354        tr_netSetTOS( socket, session->peerSocketTOS );
355
356    io = tr_new0( tr_peerIo, 1 );
357    io->magicNumber = MAGIC_NUMBER;
358    io->refCount = 1;
359    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
360    io->session = session;
361    io->addr = *addr;
362    io->port = port;
363    io->socket = socket;
364    io->isIncoming = isIncoming != 0;
365    io->hasFinishedConnecting = FALSE;
366    io->timeCreated = time( NULL );
367    io->inbuf = evbuffer_new( );
368    io->outbuf = evbuffer_new( );
369    tr_bandwidthConstruct( &io->bandwidth, session, parent );
370    tr_bandwidthSetPeer( &io->bandwidth, io );
371    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
372
373    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
374    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
375
376    __tr_list_init( &io->outbuf_datatypes );
377
378    return io;
379}
380
381tr_peerIo*
382tr_peerIoNewIncoming( tr_session        * session,
383                      tr_bandwidth      * parent,
384                      const tr_address  * addr,
385                      tr_port             port,
386                      int                 socket )
387{
388    assert( session );
389    assert( tr_isAddress( addr ) );
390    assert( socket >= 0 );
391
392    return tr_peerIoNew( session, parent, addr, port, NULL, 1, socket );
393}
394
395tr_peerIo*
396tr_peerIoNewOutgoing( tr_session        * session,
397                      tr_bandwidth      * parent,
398                      const tr_address  * addr,
399                      tr_port             port,
400                      const uint8_t     * torrentHash )
401{
402    int socket;
403
404    assert( session );
405    assert( tr_isAddress( addr ) );
406    assert( torrentHash );
407
408    socket = tr_netOpenTCP( session, addr, port );
409    dbgmsg( NULL, "tr_netOpenTCP returned fd %d", socket );
410
411    return socket < 0
412           ? NULL
413           : tr_peerIoNew( session, parent, addr, port, torrentHash, 0, socket );
414}
415
416static void
417trDatatypeFree( void * data )
418{
419    struct tr_datatype * dt = __tr_list_entry( data, struct tr_datatype, head );
420    tr_free(dt);
421}
422
423static void
424io_dtor( void * vio )
425{
426    tr_peerIo * io = vio;
427
428    assert( tr_isPeerIo( io ) );
429    assert( tr_amInEventThread( io->session ) );
430    assert( io->session->events != NULL );
431
432    dbgmsg( io, "in tr_peerIo destructor" );
433    event_del( &io->event_read );
434    event_del( &io->event_write );
435    tr_bandwidthDestruct( &io->bandwidth );
436    evbuffer_free( io->outbuf );
437    evbuffer_free( io->inbuf );
438    tr_netClose( io->socket );
439    tr_cryptoFree( io->crypto );
440    __tr_list_destroy( &io->outbuf_datatypes, trDatatypeFree );
441
442    memset( io, ~0, sizeof( tr_peerIo ) ); 
443    tr_free( io );
444}
445
446static void
447tr_peerIoFree( tr_peerIo * io )
448{
449    if( io )
450    {
451        dbgmsg( io, "in tr_peerIoFree" );
452        io->canRead = NULL;
453        io->didWrite = NULL;
454        io->gotError = NULL;
455        tr_runInEventThread( io->session, io_dtor, io );
456    }
457}
458
459void
460tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
461{
462    assert( tr_isPeerIo( io ) );
463
464    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
465                file, line, io->refCount, io->refCount+1 );
466
467    ++io->refCount;
468}
469
470void
471tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
472{
473    assert( tr_isPeerIo( io ) );
474
475    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
476                file, line, io->refCount, io->refCount-1 );
477
478    if( !--io->refCount )
479        tr_peerIoFree( io );
480}
481
482const tr_address*
483tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
484{
485    assert( tr_isPeerIo( io ) );
486
487    if( port )
488        *port = io->port;
489
490    return &io->addr;
491}
492
493const char*
494tr_peerIoAddrStr( const tr_address * addr, tr_port port )
495{
496    static char buf[512];
497
498    if( addr->type == TR_AF_INET ) 
499        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
500    else 
501        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
502    return buf;
503}
504
505void
506tr_peerIoSetIOFuncs( tr_peerIo        * io,
507                     tr_can_read_cb     readcb,
508                     tr_did_write_cb    writecb,
509                     tr_net_error_cb    errcb,
510                     void             * userData )
511{
512    io->canRead = readcb;
513    io->didWrite = writecb;
514    io->gotError = errcb;
515    io->userData = userData;
516}
517
518void
519tr_peerIoClear( tr_peerIo * io )
520{
521    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
522    tr_peerIoSetEnabled( io, TR_UP, FALSE );
523    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
524}
525
526int
527tr_peerIoReconnect( tr_peerIo * io )
528{
529    assert( !tr_peerIoIsIncoming( io ) );
530
531    if( io->socket >= 0 )
532        tr_netClose( io->socket );
533
534    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port ); 
535    if( io->socket >= 0 )
536    {
537        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
538        return 0;
539    }
540
541    return -1;
542}
543
544/**
545***
546**/
547
548void
549tr_peerIoSetTorrentHash( tr_peerIo *     io,
550                         const uint8_t * hash )
551{
552    assert( tr_isPeerIo( io ) );
553
554    tr_cryptoSetTorrentHash( io->crypto, hash );
555}
556
557const uint8_t*
558tr_peerIoGetTorrentHash( tr_peerIo * io )
559{
560    assert( tr_isPeerIo( io ) );
561    assert( io->crypto );
562
563    return tr_cryptoGetTorrentHash( io->crypto );
564}
565
566int
567tr_peerIoHasTorrentHash( const tr_peerIo * io )
568{
569    assert( tr_isPeerIo( io ) );
570    assert( io->crypto );
571
572    return tr_cryptoHasTorrentHash( io->crypto );
573}
574
575/**
576***
577**/
578
579void
580tr_peerIoSetPeersId( tr_peerIo *     io,
581                     const uint8_t * peer_id )
582{
583    assert( tr_isPeerIo( io ) );
584
585    if( ( io->peerIdIsSet = peer_id != NULL ) )
586        memcpy( io->peerId, peer_id, 20 );
587    else
588        memset( io->peerId, 0, 20 );
589}
590
591/**
592***
593**/
594
595void
596tr_peerIoEnableFEXT( tr_peerIo * io,
597                     tr_bool     flag )
598{
599    assert( tr_isPeerIo( io ) );
600    assert( tr_isBool( flag ) );
601
602    dbgmsg( io, "setting FEXT support flag to %d", (flag!=0) );
603    io->fastExtensionSupported = flag;
604}
605
606void
607tr_peerIoEnableLTEP( tr_peerIo  * io,
608                     tr_bool      flag )
609{
610    assert( tr_isPeerIo( io ) );
611    assert( tr_isBool( flag ) );
612
613    dbgmsg( io, "setting LTEP support flag to %d", (flag!=0) );
614    io->extendedProtocolSupported = flag;
615}
616
617/**
618***
619**/
620
621static size_t
622getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
623{
624    /* this is all kind of arbitrary, but what seems to work well is
625     * being large enough to hold the next 20 seconds' worth of input,
626     * or a few blocks, whichever is bigger.
627     * It's okay to tweak this as needed */
628    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
629    const double currentSpeed = tr_bandwidthGetPieceSpeed( &io->bandwidth, now, TR_UP );
630    const double period = 20; /* arbitrary */
631    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
632    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
633}
634
635size_t
636tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
637{
638    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
639    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
640    size_t freeSpace = 0;
641
642    if( desiredLen > currentLen )
643        freeSpace = desiredLen - currentLen;
644
645    return freeSpace;
646}
647
648/**
649***
650**/
651
652void
653tr_peerIoSetEncryption( tr_peerIo * io,
654                        int         encryptionMode )
655{
656    assert( tr_isPeerIo( io ) );
657    assert( encryptionMode == PEER_ENCRYPTION_NONE
658         || encryptionMode == PEER_ENCRYPTION_RC4 );
659
660    io->encryptionMode = encryptionMode;
661}
662
663/**
664***
665**/
666
667void
668tr_peerIoWrite( tr_peerIo   * io,
669                const void  * bytes,
670                size_t        byteCount,
671                tr_bool       isPieceData )
672{
673    struct tr_datatype * datatype;
674
675    assert( tr_amInEventThread( io->session ) );
676    dbgmsg( io, "adding %zu bytes into io->output", byteCount );
677
678    datatype = tr_new( struct tr_datatype, 1 );
679    datatype->isPieceData = isPieceData != 0;
680    datatype->length = byteCount;
681
682    __tr_list_init( &datatype->head );
683    __tr_list_append( &io->outbuf_datatypes, &datatype->head );
684
685    switch( io->encryptionMode )
686    {
687        case PEER_ENCRYPTION_RC4:
688        {
689            uint8_t tmp[MAX_STACK_ARRAY_SIZE];
690            const uint8_t * walk = bytes;
691            evbuffer_expand( io->outbuf, byteCount );
692            while( byteCount > 0 )
693            {
694                const size_t thisPass = MIN( byteCount, sizeof( tmp ) );
695                tr_cryptoEncrypt( io->crypto, thisPass, walk, tmp );
696                evbuffer_add( io->outbuf, tmp, thisPass );
697                walk += thisPass;
698                byteCount -= thisPass;
699            }
700            break;
701        }
702
703        case PEER_ENCRYPTION_NONE:
704            evbuffer_add( io->outbuf, bytes, byteCount );
705            break;
706
707        default:
708            assert( 0 );
709            break;
710    }
711}
712
713void
714tr_peerIoWriteBuf( tr_peerIo         * io,
715                   struct evbuffer   * buf,
716                   tr_bool             isPieceData )
717{
718    const size_t n = EVBUFFER_LENGTH( buf );
719    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
720    evbuffer_drain( buf, n );
721}
722
723/***
724****
725***/
726
727void
728tr_peerIoReadBytes( tr_peerIo       * io,
729                    struct evbuffer * inbuf,
730                    void            * bytes,
731                    size_t            byteCount )
732{
733    assert( tr_isPeerIo( io ) );
734    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
735
736    switch( io->encryptionMode )
737    {
738        case PEER_ENCRYPTION_NONE:
739            evbuffer_remove( inbuf, bytes, byteCount );
740            break;
741
742        case PEER_ENCRYPTION_RC4:
743            evbuffer_remove( inbuf, bytes, byteCount );
744            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
745            break;
746
747        default:
748            assert( 0 );
749    }
750}
751
752void
753tr_peerIoDrain( tr_peerIo       * io,
754                struct evbuffer * inbuf,
755                size_t            byteCount )
756{
757    uint8_t tmp[MAX_STACK_ARRAY_SIZE];
758
759    while( byteCount > 0 )
760    {
761        const size_t thisPass = MIN( byteCount, sizeof( tmp ) );
762        tr_peerIoReadBytes( io, inbuf, tmp, thisPass );
763        byteCount -= thisPass;
764    }
765}
766
767/***
768****
769***/
770
771static int
772tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
773{
774    int res = 0;
775
776    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
777    {
778        int e;
779        errno = 0;
780        res = evbuffer_read( io->inbuf, io->socket, howmuch );
781        e = errno;
782
783        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
784
785        if( EVBUFFER_LENGTH( io->inbuf ) )
786            canReadWrapper( io );
787
788        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
789        {
790            short what = EVBUFFER_READ | EVBUFFER_ERROR;
791            if( res == 0 )
792                what |= EVBUFFER_EOF;
793            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
794            io->gotError( io, what, io->userData );
795        }
796    }
797
798    return res;
799}
800
801static int
802tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
803{
804    int n = 0;
805
806    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
807    {
808        int e;
809        errno = 0;
810        n = tr_evbuffer_write( io, io->socket, howmuch );
811        e = errno;
812
813        if( n > 0 )
814            didWriteWrapper( io, n );
815
816        if( ( n < 0 ) && ( io->gotError ) && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
817        {
818            const short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
819            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, strerror( e ) );
820            io->gotError( io, what, io->userData );
821        }
822    }
823
824    return n;
825}
826
827int
828tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
829{
830    int bytesUsed = 0;
831
832    assert( tr_isPeerIo( io ) );
833    assert( tr_isDirection( dir ) );
834
835    if( io->hasFinishedConnecting )
836    {
837        if( dir == TR_DOWN )
838            bytesUsed = tr_peerIoTryRead( io, limit );
839        else
840            bytesUsed = tr_peerIoTryWrite( io, limit );
841    }
842
843    dbgmsg( io, "flushing peer-io, hasFinishedConnecting %d, direction %d, limit %zu, bytesUsed %d", (int)io->hasFinishedConnecting, (int)dir, limit, bytesUsed );
844    return bytesUsed;
845}
846
847/***
848****
849****/
850
851static void
852event_enable( tr_peerIo * io, short event )
853{
854    assert( tr_amInEventThread( io->session ) );
855    assert( io->session != NULL );
856    assert( io->session->events != NULL );
857    assert( event_initialized( &io->event_read ) );
858    assert( event_initialized( &io->event_write ) );
859
860    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
861    {
862        dbgmsg( io, "enabling libevent ready-to-read polling" );
863        event_add( &io->event_read, NULL );
864        io->pendingEvents |= EV_READ;
865    }
866
867    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
868    {
869        dbgmsg( io, "enabling libevent ready-to-write polling" );
870        event_add( &io->event_write, NULL );
871        io->pendingEvents |= EV_WRITE;
872    }
873}
874
875static void
876event_disable( struct tr_peerIo * io, short event )
877{
878    assert( tr_amInEventThread( io->session ) );
879    assert( io->session != NULL );
880    assert( io->session->events != NULL );
881    assert( event_initialized( &io->event_read ) );
882    assert( event_initialized( &io->event_write ) );
883
884    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
885    {
886        dbgmsg( io, "disabling libevent ready-to-read polling" );
887        event_del( &io->event_read );
888        io->pendingEvents &= ~EV_READ;
889    }
890
891    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
892    {
893        dbgmsg( io, "disabling libevent ready-to-write polling" );
894        event_del( &io->event_write );
895        io->pendingEvents &= ~EV_WRITE;
896    }
897}
898
899
900void
901tr_peerIoSetEnabled( tr_peerIo    * io,
902                     tr_direction   dir,
903                     tr_bool        isEnabled )
904{
905    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
906
907    assert( tr_isPeerIo( io ) );
908    assert( tr_isDirection( dir ) );
909    assert( tr_amInEventThread( io->session ) );
910    assert( io->session->events != NULL );
911
912    if( isEnabled )
913        event_enable( io, event );
914    else
915        event_disable( io, event );
916}
Note: See TracBrowser for help on using the repository browser.