source: trunk/libtransmission/peer-io.c @ 7486

Last change on this file since 7486 was 7486, checked in by charles, 13 years ago

(trunk libT) fix connectivity error reported by Stargazer. Also, add more debug statements to track down errors like this in the future

  • Property svn:keywords set to Date Rev Author Id
File size: 24.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7486 2008-12-24 02:50:08Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <arpa/inet.h> /* inet_ntoa */
23#endif
24
25#include <event.h>
26
27#include "transmission.h"
28#include "session.h"
29#include "bandwidth.h"
30#include "crypto.h"
31#include "list.h"
32#include "net.h"
33#include "peer-io.h"
34#include "trevent.h"
35#include "utils.h"
36
37#define MAGIC_NUMBER 206745
38
39static size_t
40getPacketOverhead( size_t d )
41{
42    /**
43     * http://sd.wareonearth.com/~phil/net/overhead/
44     *
45     * TCP over Ethernet:
46     * Assuming no header compression (e.g. not PPP)
47     * Add 20 IPv4 header or 40 IPv6 header (no options)
48     * Add 20 TCP header
49     * Add 12 bytes optional TCP timestamps
50     * Max TCP Payload data rates over ethernet are thus:
51     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
52     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
53     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
54     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
55     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
56     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
57     */
58    static const double assumed_payload_data_rate = 94.0;
59
60    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
61}
62
63/**
64***
65**/
66
67#define dbgmsg( io, ... ) \
68    do { \
69        if( tr_deepLoggingIsActive( ) ) \
70            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
71    } while( 0 )
72
73struct tr_datatype
74{
75    tr_bool  isPieceData;
76    size_t   length;
77    struct __tr_list head;
78};
79
80struct tr_peerIo
81{
82    tr_bool            isEncrypted;
83    tr_bool            isIncoming;
84    tr_bool            peerIdIsSet;
85    tr_bool            extendedProtocolSupported;
86    tr_bool            fastExtensionSupported;
87
88    int                magicNumber;
89
90    uint8_t            encryptionMode;
91    tr_port            port;
92    int                socket;
93
94    uint8_t            peerId[20];
95    time_t             timeCreated;
96
97    tr_session       * session;
98
99    tr_address         addr;
100
101    tr_can_read_cb     canRead;
102    tr_did_write_cb    didWrite;
103    tr_net_error_cb    gotError;
104    void *             userData;
105
106    tr_bandwidth     * bandwidth;
107    tr_crypto        * crypto;
108
109    struct evbuffer  * inbuf;
110    struct evbuffer  * outbuf;
111    struct __tr_list   outbuf_datatypes; /* struct tr_datatype */
112
113    struct event       event_read;
114    struct event       event_write;
115};
116
117/***
118****
119***/
120
121static void
122didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
123{
124    while( bytes_transferred )
125    {
126        struct tr_datatype * next = __tr_list_entry( io->outbuf_datatypes.next,
127                                                     struct tr_datatype, head );
128        const size_t payload = MIN( next->length, bytes_transferred );
129        const size_t overhead = getPacketOverhead( payload );
130
131        tr_bandwidthUsed( io->bandwidth, TR_UP, payload, next->isPieceData );
132
133        if( overhead > 0 )
134            tr_bandwidthUsed( io->bandwidth, TR_UP, overhead, FALSE );
135
136        if( io->didWrite )
137            io->didWrite( io, payload, next->isPieceData, io->userData );
138
139        bytes_transferred -= payload;
140        next->length -= payload;
141        if( !next->length ) {
142            __tr_list_remove( io->outbuf_datatypes.next );
143            tr_free( next );
144        }
145    }
146}
147
148static void
149canReadWrapper( tr_peerIo * io )
150{
151    tr_bool done = 0;
152    tr_bool err = 0;
153    tr_session * session = io->session;
154
155    dbgmsg( io, "canRead" );
156
157    /* try to consume the input buffer */
158    if( io->canRead )
159    {
160        tr_globalLock( session );
161
162        while( !done && !err )
163        {
164            size_t piece = 0;
165            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
166            const int ret = io->canRead( io, io->userData, &piece );
167
168            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
169
170            if( piece )
171                tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
172
173            if( used != piece )
174                tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
175
176            switch( ret )
177            {
178                case READ_NOW:
179                    if( EVBUFFER_LENGTH( io->inbuf ) )
180                        continue;
181                    done = 1;
182                    break;
183
184                case READ_LATER:
185                    done = 1;
186                    break;
187
188                case READ_ERR:
189                    err = 1;
190                    break;
191            }
192        }
193
194        tr_globalUnlock( session );
195    }
196}
197
198#define _isBool(b) (((b)==0 || (b)==1))
199
200tr_bool
201tr_isPeerIo( const tr_peerIo * io )
202{
203    return ( io != NULL )
204        && ( io->magicNumber == MAGIC_NUMBER )
205        && ( tr_isAddress( &io->addr ) )
206        && ( _isBool( io->isEncrypted ) )
207        && ( _isBool( io->isIncoming ) )
208        && ( _isBool( io->peerIdIsSet ) )
209        && ( _isBool( io->extendedProtocolSupported ) )
210        && ( _isBool( io->fastExtensionSupported ) );
211}
212
213static void
214event_read_cb( int fd, short event UNUSED, void * vio )
215{
216    int res;
217    int e;
218    tr_peerIo * io = vio;
219
220    /* Limit the input buffer to 256K, so it doesn't grow too large */
221    size_t howmuch;
222    const tr_direction dir = TR_DOWN;
223    const size_t max = 256 * 1024;
224    const size_t curlen = EVBUFFER_LENGTH( io->inbuf );
225
226    howmuch = curlen >= max ? 0 : max - curlen;
227    howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch );
228
229    assert( tr_isPeerIo( io ) );
230
231    dbgmsg( io, "libevent says this peer is ready to read" );
232
233    /* if we don't have any bandwidth left, stop reading */
234    if( howmuch < 1 ) {
235        tr_peerIoSetEnabled( io, dir, FALSE );
236        return;
237    }
238
239    errno = 0;
240    res = evbuffer_read( io->inbuf, fd, howmuch );
241    e = errno;
242
243    if( res > 0 )
244    {
245        tr_peerIoSetEnabled( io, dir, TRUE );
246
247        /* Invoke the user callback - must always be called last */
248        canReadWrapper( io );
249    }
250    else
251    {
252        short what = EVBUFFER_READ;
253
254        if( res == 0 ) /* EOF */
255            what |= EVBUFFER_EOF;
256        else if( res == -1 ) {
257            if( e == EAGAIN || e == EINTR ) {
258                tr_peerIoSetEnabled( io, dir, TRUE );
259                return;
260            }
261            what |= EVBUFFER_ERROR;
262        }
263
264        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
265
266        if( io->gotError != NULL )
267            io->gotError( io, what, io->userData );
268    }
269}
270
271static int
272tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
273{
274    struct evbuffer * buffer = io->outbuf;
275    int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
276    int e;
277
278    errno = 0;
279#ifdef WIN32
280    n = send(fd, buffer->buffer, n,  0 );
281#else
282    n = write(fd, buffer->buffer, n );
283#endif
284    e = errno;
285    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(e):"") );
286
287    if( n == -1 )
288        return -1;
289    if (n == 0)
290        return 0;
291    evbuffer_drain( buffer, n );
292
293    return n;
294}
295
296static void
297event_write_cb( int fd, short event UNUSED, void * vio )
298{
299    int res = 0;
300    int e;
301    short what = EVBUFFER_WRITE;
302    tr_peerIo * io = vio;
303    size_t howmuch;
304    const tr_direction dir = TR_UP;
305
306    assert( tr_isPeerIo( io ) );
307
308    dbgmsg( io, "libevent says this peer is ready to write" );
309
310    /* Write as much as possible, since the socket is non-blocking, write() will
311     * return if it can't write any more data without blocking */
312    howmuch = tr_bandwidthClamp( io->bandwidth, dir, EVBUFFER_LENGTH( io->outbuf ) );
313
314    /* if we don't have any bandwidth left, stop writing */
315    if( howmuch < 1 ) {
316        tr_peerIoSetEnabled( io, dir, FALSE );
317        return;
318    }
319
320    errno = 0;
321    res = tr_evbuffer_write( io, fd, howmuch );
322    e = errno;
323
324    if (res == -1) {
325#ifndef WIN32
326/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
327 *  *set errno. thus this error checking is not portable*/
328        if (e == EAGAIN || e == EINTR || e == EINPROGRESS)
329            goto reschedule;
330        /* error case */
331        what |= EVBUFFER_ERROR;
332
333#else
334        goto reschedule;
335#endif
336
337    } else if (res == 0) {
338        /* eof case */
339        what |= EVBUFFER_EOF;
340    }
341    if (res <= 0)
342        goto error;
343
344    if( EVBUFFER_LENGTH( io->outbuf ) )
345        tr_peerIoSetEnabled( io, dir, TRUE );
346
347    didWriteWrapper( io, res );
348    return;
349
350 reschedule:
351    if( EVBUFFER_LENGTH( io->outbuf ) )
352        tr_peerIoSetEnabled( io, dir, TRUE );
353    return;
354
355 error:
356
357    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
358
359    if( io->gotError != NULL )
360        io->gotError( io, what, io->userData );
361}
362
363/**
364***
365**/
366
367static tr_peerIo*
368tr_peerIoNew( tr_session       * session,
369              const tr_address * addr,
370              tr_port            port,
371              const uint8_t    * torrentHash,
372              int                isIncoming,
373              int                socket )
374{
375    tr_peerIo * io;
376
377    if( socket >= 0 )
378        tr_netSetTOS( socket, session->peerSocketTOS );
379
380    io = tr_new0( tr_peerIo, 1 );
381    io->magicNumber = MAGIC_NUMBER;
382    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
383    io->session = session;
384    io->addr = *addr;
385    io->port = port;
386    io->socket = socket;
387    io->isIncoming = isIncoming != 0;
388    io->timeCreated = time( NULL );
389    io->inbuf = evbuffer_new( );
390    io->outbuf = evbuffer_new( );
391
392    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
393    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
394
395    __tr_list_init( &io->outbuf_datatypes );
396
397    tr_peerIoSetBandwidth( io, session->bandwidth );
398
399    return io;
400}
401
402tr_peerIo*
403tr_peerIoNewIncoming( tr_session       * session,
404                      const tr_address * addr,
405                      tr_port            port,
406                      int                socket )
407{
408    assert( session );
409    assert( tr_isAddress( addr ) );
410    assert( socket >= 0 );
411
412    return tr_peerIoNew( session, addr, port, NULL, 1, socket );
413}
414
415tr_peerIo*
416tr_peerIoNewOutgoing( tr_session       * session,
417                      const tr_address * addr,
418                      tr_port            port,
419                      const uint8_t    * torrentHash )
420{
421    int socket;
422
423    assert( session );
424    assert( tr_isAddress( addr ) );
425    assert( torrentHash );
426
427    socket = tr_netOpenTCP( session, addr, port );
428
429    return socket < 0
430           ? NULL
431           : tr_peerIoNew( session, addr, port, torrentHash, 0, socket );
432}
433
434static void
435trDatatypeFree( void * data )
436{
437    struct tr_datatype * dt = __tr_list_entry( data, struct tr_datatype, head );
438    tr_free(dt);
439}
440
441static void
442io_dtor( void * vio )
443{
444    tr_peerIo * io = vio;
445
446    event_del( &io->event_read );
447    event_del( &io->event_write );
448    tr_peerIoSetBandwidth( io, NULL );
449    evbuffer_free( io->outbuf );
450    evbuffer_free( io->inbuf );
451    tr_netClose( io->socket );
452    tr_cryptoFree( io->crypto );
453    __tr_list_destroy( &io->outbuf_datatypes, trDatatypeFree );
454
455    io->magicNumber = 0xDEAD;
456    tr_free( io );
457}
458
459void
460tr_peerIoFree( tr_peerIo * io )
461{
462    if( io )
463    {
464        io->canRead = NULL;
465        io->didWrite = NULL;
466        io->gotError = NULL;
467        tr_runInEventThread( io->session, io_dtor, io );
468    }
469}
470
471tr_session*
472tr_peerIoGetSession( tr_peerIo * io )
473{
474    assert( tr_isPeerIo( io ) );
475    assert( io->session );
476
477    return io->session;
478}
479
480const tr_address*
481tr_peerIoGetAddress( const tr_peerIo * io,
482                           tr_port   * port )
483{
484    assert( tr_isPeerIo( io ) );
485
486    if( port )
487        *port = io->port;
488
489    return &io->addr;
490}
491
492const char*
493tr_peerIoAddrStr( const tr_address * addr, tr_port port )
494{
495    static char buf[512];
496
497    if( addr->type == TR_AF_INET ) 
498        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
499    else 
500        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
501    return buf;
502}
503
504const char*
505tr_peerIoGetAddrStr( const tr_peerIo * io )
506{
507    return tr_peerIoAddrStr( &io->addr, io->port );
508}
509
510void
511tr_peerIoSetIOFuncs( tr_peerIo        * io,
512                     tr_can_read_cb     readcb,
513                     tr_did_write_cb    writecb,
514                     tr_net_error_cb    errcb,
515                     void             * userData )
516{
517    io->canRead = readcb;
518    io->didWrite = writecb;
519    io->gotError = errcb;
520    io->userData = userData;
521}
522
523tr_bool
524tr_peerIoIsIncoming( const tr_peerIo * c )
525{
526    return c->isIncoming != 0;
527}
528
529int
530tr_peerIoReconnect( tr_peerIo * io )
531{
532    assert( !tr_peerIoIsIncoming( io ) );
533
534    if( io->socket >= 0 )
535        tr_netClose( io->socket );
536
537    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port );
538
539    if( io->socket >= 0 )
540    {
541        tr_bandwidth * bandwidth = io->bandwidth;
542        tr_peerIoSetBandwidth( io, NULL );
543        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
544        tr_peerIoSetBandwidth( io, bandwidth );
545        return 0;
546    }
547
548    return -1;
549}
550
551/**
552***
553**/
554
555void
556tr_peerIoSetTorrentHash( tr_peerIo *     io,
557                         const uint8_t * hash )
558{
559    assert( tr_isPeerIo( io ) );
560
561    tr_cryptoSetTorrentHash( io->crypto, hash );
562}
563
564const uint8_t*
565tr_peerIoGetTorrentHash( tr_peerIo * io )
566{
567    assert( tr_isPeerIo( io ) );
568    assert( io->crypto );
569
570    return tr_cryptoGetTorrentHash( io->crypto );
571}
572
573int
574tr_peerIoHasTorrentHash( const tr_peerIo * io )
575{
576    assert( tr_isPeerIo( io ) );
577    assert( io->crypto );
578
579    return tr_cryptoHasTorrentHash( io->crypto );
580}
581
582/**
583***
584**/
585
586void
587tr_peerIoSetPeersId( tr_peerIo *     io,
588                     const uint8_t * peer_id )
589{
590    assert( tr_isPeerIo( io ) );
591
592    if( ( io->peerIdIsSet = peer_id != NULL ) )
593        memcpy( io->peerId, peer_id, 20 );
594    else
595        memset( io->peerId, 0, 20 );
596}
597
598const uint8_t*
599tr_peerIoGetPeersId( const tr_peerIo * io )
600{
601    assert( tr_isPeerIo( io ) );
602    assert( io->peerIdIsSet );
603
604    return io->peerId;
605}
606
607/**
608***
609**/
610
611void
612tr_peerIoEnableFEXT( tr_peerIo * io,
613                     tr_bool     flag )
614{
615    assert( tr_isPeerIo( io ) );
616    assert( _isBool( flag ) );
617
618    dbgmsg( io, "setting FEXT support flag to %d", (flag!=0) );
619    io->fastExtensionSupported = flag;
620}
621
622tr_bool
623tr_peerIoSupportsFEXT( const tr_peerIo * io )
624{
625    assert( tr_isPeerIo( io ) );
626
627    return io->fastExtensionSupported;
628}
629
630/**
631***
632**/
633
634void
635tr_peerIoEnableLTEP( tr_peerIo  * io,
636                     tr_bool      flag )
637{
638    assert( tr_isPeerIo( io ) );
639    assert( _isBool( flag ) );
640
641    dbgmsg( io, "setting LTEP support flag to %d", (flag!=0) );
642    io->extendedProtocolSupported = flag;
643}
644
645tr_bool
646tr_peerIoSupportsLTEP( const tr_peerIo * io )
647{
648    assert( tr_isPeerIo( io ) );
649
650    return io->extendedProtocolSupported;
651}
652
653/**
654***
655**/
656
657static size_t
658getDesiredOutputBufferSize( const tr_peerIo * io )
659{
660    /* this is all kind of arbitrary, but what seems to work well is
661     * being large enough to hold the next 20 seconds' worth of input,
662     * or a few blocks, whichever is bigger.
663     * It's okay to tweak this as needed */
664    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
665    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
666    const double period = 20; /* arbitrary */
667    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
668    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
669}
670
671size_t
672tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
673{
674    const size_t desiredLen = getDesiredOutputBufferSize( io );
675    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
676    size_t freeSpace = 0;
677
678    if( desiredLen > currentLen )
679        freeSpace = desiredLen - currentLen;
680
681    return freeSpace;
682}
683
684void
685tr_peerIoSetBandwidth( tr_peerIo     * io,
686                       tr_bandwidth  * bandwidth )
687{
688    assert( tr_isPeerIo( io ) );
689
690    if( io->bandwidth )
691        tr_bandwidthRemovePeer( io->bandwidth, io );
692
693    io->bandwidth = bandwidth;
694
695    if( io->bandwidth )
696        tr_bandwidthAddPeer( io->bandwidth, io );
697}
698
699/**
700***
701**/
702
703tr_crypto*
704tr_peerIoGetCrypto( tr_peerIo * c )
705{
706    return c->crypto;
707}
708
709void
710tr_peerIoSetEncryption( tr_peerIo * io,
711                        int         encryptionMode )
712{
713    assert( tr_isPeerIo( io ) );
714    assert( encryptionMode == PEER_ENCRYPTION_NONE
715          || encryptionMode == PEER_ENCRYPTION_RC4 );
716
717    io->encryptionMode = encryptionMode;
718}
719
720int
721tr_peerIoIsEncrypted( const tr_peerIo * io )
722{
723    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
724}
725
726/**
727***
728**/
729
730void
731tr_peerIoWrite( tr_peerIo   * io,
732                const void  * writeme,
733                size_t        writemeLen,
734                int           isPieceData )
735{
736    struct tr_datatype * datatype;
737
738    assert( tr_amInEventThread( io->session ) );
739    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
740
741    datatype = tr_new( struct tr_datatype, 1 );
742    datatype->isPieceData = isPieceData != 0;
743    datatype->length = writemeLen;
744
745    __tr_list_init( &datatype->head );
746    __tr_list_append( &io->outbuf_datatypes, &datatype->head );
747
748    evbuffer_add( io->outbuf, writeme, writemeLen );
749}
750
751void
752tr_peerIoWriteBuf( tr_peerIo         * io,
753                   struct evbuffer   * buf,
754                   int                 isPieceData )
755{
756    const size_t n = EVBUFFER_LENGTH( buf );
757
758    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
759    evbuffer_drain( buf, n );
760}
761
762/**
763***
764**/
765
766void
767tr_peerIoWriteBytes( tr_peerIo       * io,
768                     struct evbuffer * outbuf,
769                     const void      * bytes,
770                     size_t            byteCount )
771{
772    uint8_t * tmp;
773
774    switch( io->encryptionMode )
775    {
776        case PEER_ENCRYPTION_NONE:
777            evbuffer_add( outbuf, bytes, byteCount );
778            break;
779
780        case PEER_ENCRYPTION_RC4:
781            tmp = tr_new( uint8_t, byteCount );
782            tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
783            evbuffer_add( outbuf, tmp, byteCount );
784            tr_free( tmp );
785            break;
786
787        default:
788            assert( 0 );
789    }
790}
791
792void
793tr_peerIoWriteUint8( tr_peerIo       * io,
794                     struct evbuffer * outbuf,
795                     uint8_t           writeme )
796{
797    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
798}
799
800void
801tr_peerIoWriteUint16( tr_peerIo       * io,
802                      struct evbuffer * outbuf,
803                      uint16_t          writeme )
804{
805    uint16_t tmp = htons( writeme );
806
807    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
808}
809
810void
811tr_peerIoWriteUint32( tr_peerIo       * io,
812                      struct evbuffer * outbuf,
813                      uint32_t          writeme )
814{
815    uint32_t tmp = htonl( writeme );
816
817    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
818}
819
820/***
821****
822***/
823
824void
825tr_peerIoReadBytes( tr_peerIo       * io,
826                    struct evbuffer * inbuf,
827                    void            * bytes,
828                    size_t            byteCount )
829{
830    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
831
832    switch( io->encryptionMode )
833    {
834        case PEER_ENCRYPTION_NONE:
835            evbuffer_remove( inbuf, bytes, byteCount );
836            break;
837
838        case PEER_ENCRYPTION_RC4:
839            evbuffer_remove( inbuf, bytes, byteCount );
840            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
841            break;
842
843        default:
844            assert( 0 );
845    }
846}
847
848void
849tr_peerIoReadUint8( tr_peerIo       * io,
850                    struct evbuffer * inbuf,
851                    uint8_t         * setme )
852{
853    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
854}
855
856void
857tr_peerIoReadUint16( tr_peerIo       * io,
858                     struct evbuffer * inbuf,
859                     uint16_t        * setme )
860{
861    uint16_t tmp;
862
863    assert( tr_isPeerIo( io ) );
864
865    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
866    *setme = ntohs( tmp );
867}
868
869void
870tr_peerIoReadUint32( tr_peerIo       * io,
871                     struct evbuffer * inbuf,
872                     uint32_t        * setme )
873{
874    uint32_t tmp;
875
876    assert( tr_isPeerIo( io ) );
877
878    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
879    *setme = ntohl( tmp );
880}
881
882void
883tr_peerIoDrain( tr_peerIo       * io,
884                struct evbuffer * inbuf,
885                size_t            byteCount )
886{
887    uint8_t * tmp;
888
889    assert( tr_isPeerIo( io ) );
890
891    tmp = tr_new( uint8_t, byteCount );
892    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
893    tr_free( tmp );
894}
895
896int
897tr_peerIoGetAge( const tr_peerIo * io )
898{
899    return time( NULL ) - io->timeCreated;
900}
901
902/***
903****
904***/
905
906static int
907tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
908{
909    int res = 0;
910
911    assert( tr_isPeerIo( io ) );
912
913    if(( howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch )))
914    {
915        int e;
916        errno = 0;
917        res = evbuffer_read( io->inbuf, io->socket, howmuch );
918        e = errno;
919
920        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
921
922        if( EVBUFFER_LENGTH( io->inbuf ) )
923            canReadWrapper( io );
924
925        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
926        {
927            short what = EVBUFFER_READ | EVBUFFER_ERROR;
928            if( res == 0 )
929                what |= EVBUFFER_EOF;
930            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
931            io->gotError( io, what, io->userData );
932        }
933    }
934
935    return res;
936}
937
938static int
939tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
940{
941    int n;
942
943    assert( tr_isPeerIo( io ) );
944
945    if(( howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch )))
946    {
947        int e;
948        errno = 0;
949        n = tr_evbuffer_write( io, io->socket, (int)howmuch );
950        e = errno;
951
952        if( n > 0 )
953            didWriteWrapper( io, n );
954
955        if( ( n < 0 ) && ( io->gotError ) && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
956        {
957            const short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
958            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, strerror( e ) );
959            io->gotError( io, what, io->userData );
960        }
961    }
962
963    return n;
964}
965
966int
967tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
968{
969    int ret;
970
971    assert( tr_isPeerIo( io ) );
972    assert( tr_isDirection( dir ) );
973
974    if( dir==TR_DOWN )
975        ret = tr_peerIoTryRead( io, limit );
976    else
977        ret = tr_peerIoTryWrite( io, limit );
978
979    return ret;
980}
981
982struct evbuffer *
983tr_peerIoGetReadBuffer( tr_peerIo * io )
984{
985    assert( tr_isPeerIo( io ) );
986
987    return io->inbuf;
988}
989
990tr_bool
991tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
992{
993    assert( tr_isPeerIo( io ) );
994    assert( tr_isDirection( dir ) );
995
996    return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
997}
998
999/***
1000****
1001****/
1002
1003static void
1004event_enable( tr_peerIo * io, short event )
1005{
1006    assert( tr_isPeerIo( io ) );
1007
1008    if( event & EV_READ )
1009        event_add( &io->event_read, NULL );
1010
1011    if( event & EV_WRITE )
1012        event_add( &io->event_write, NULL );
1013}
1014
1015static void
1016event_disable( struct tr_peerIo * io, short event )
1017{
1018    assert( tr_isPeerIo( io ) );
1019
1020    if( event & EV_READ )
1021        event_del( &io->event_read );
1022
1023    if( event & EV_WRITE )
1024        event_del( &io->event_write );
1025}
1026
1027
1028void
1029tr_peerIoSetEnabled( tr_peerIo    * io,
1030                     tr_direction   dir,
1031                     tr_bool        isEnabled )
1032{
1033    short event;
1034
1035    assert( tr_isPeerIo( io ) );
1036    assert( tr_isDirection( dir ) );
1037
1038    event = dir == TR_UP ? EV_WRITE : EV_READ;
1039
1040    if( isEnabled )
1041        event_enable( io, event );
1042    else
1043        event_disable( io, event );
1044}
Note: See TracBrowser for help on using the repository browser.