source: trunk/libtransmission/peer-io.c @ 7541

Last change on this file since 7541 was 7541, checked in by charles, 12 years ago

(trunk libT) minor cleanup: use symbolic name instead of magic numbers; use tr_bool instead of int

  • Property svn:keywords set to Date Rev Author Id
File size: 24.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7541 2008-12-30 02:43:51Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <arpa/inet.h> /* inet_ntoa */
23#endif
24
25#include <event.h>
26
27#include "transmission.h"
28#include "session.h"
29#include "bandwidth.h"
30#include "crypto.h"
31#include "list.h"
32#include "net.h"
33#include "peer-io.h"
34#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
35#include "trevent.h"
36#include "utils.h"
37
38#define MAGIC_NUMBER 206745
39
40static size_t
41getPacketOverhead( size_t d )
42{
43    /**
44     * http://sd.wareonearth.com/~phil/net/overhead/
45     *
46     * TCP over Ethernet:
47     * Assuming no header compression (e.g. not PPP)
48     * Add 20 IPv4 header or 40 IPv6 header (no options)
49     * Add 20 TCP header
50     * Add 12 bytes optional TCP timestamps
51     * Max TCP Payload data rates over ethernet are thus:
52     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
53     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
54     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
55     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
56     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
57     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
58     */
59    static const double assumed_payload_data_rate = 94.0;
60
61    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
62}
63
64/**
65***
66**/
67
68#define dbgmsg( io, ... ) \
69    do { \
70        if( tr_deepLoggingIsActive( ) ) \
71            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
72    } while( 0 )
73
74struct tr_datatype
75{
76    tr_bool  isPieceData;
77    size_t   length;
78    struct __tr_list head;
79};
80
81struct tr_peerIo
82{
83    tr_bool            isEncrypted;
84    tr_bool            isIncoming;
85    tr_bool            peerIdIsSet;
86    tr_bool            extendedProtocolSupported;
87    tr_bool            fastExtensionSupported;
88
89    int                magicNumber;
90
91    uint8_t            encryptionMode;
92    tr_port            port;
93    int                socket;
94
95    uint8_t            peerId[SHA_DIGEST_LENGTH];
96    time_t             timeCreated;
97
98    tr_session       * session;
99
100    tr_address         addr;
101
102    tr_can_read_cb     canRead;
103    tr_did_write_cb    didWrite;
104    tr_net_error_cb    gotError;
105    void *             userData;
106
107    tr_bandwidth     * bandwidth;
108    tr_crypto        * crypto;
109
110    struct evbuffer  * inbuf;
111    struct evbuffer  * outbuf;
112    struct __tr_list   outbuf_datatypes; /* struct tr_datatype */
113
114    struct event       event_read;
115    struct event       event_write;
116};
117
118/***
119****
120***/
121
122static void
123didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
124{
125    while( bytes_transferred )
126    {
127        struct tr_datatype * next = __tr_list_entry( io->outbuf_datatypes.next,
128                                                     struct tr_datatype, head );
129        const size_t payload = MIN( next->length, bytes_transferred );
130        const size_t overhead = getPacketOverhead( payload );
131
132        tr_bandwidthUsed( io->bandwidth, TR_UP, payload, next->isPieceData );
133
134        if( overhead > 0 )
135            tr_bandwidthUsed( io->bandwidth, TR_UP, overhead, FALSE );
136
137        if( io->didWrite )
138            io->didWrite( io, payload, next->isPieceData, io->userData );
139
140        bytes_transferred -= payload;
141        next->length -= payload;
142        if( !next->length ) {
143            __tr_list_remove( io->outbuf_datatypes.next );
144            tr_free( next );
145        }
146    }
147}
148
149static void
150canReadWrapper( tr_peerIo * io )
151{
152    tr_bool done = 0;
153    tr_bool err = 0;
154    tr_session * session = io->session;
155
156    dbgmsg( io, "canRead" );
157
158    /* try to consume the input buffer */
159    if( io->canRead )
160    {
161        tr_globalLock( session );
162
163        while( !done && !err )
164        {
165            size_t piece = 0;
166            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
167            const int ret = io->canRead( io, io->userData, &piece );
168
169            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
170
171            if( piece )
172                tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
173
174            if( used != piece )
175                tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
176
177            switch( ret )
178            {
179                case READ_NOW:
180                    if( EVBUFFER_LENGTH( io->inbuf ) )
181                        continue;
182                    done = 1;
183                    break;
184
185                case READ_LATER:
186                    done = 1;
187                    break;
188
189                case READ_ERR:
190                    err = 1;
191                    break;
192            }
193        }
194
195        tr_globalUnlock( session );
196    }
197}
198
199#define _isBool(b) (((b)==0 || (b)==1))
200
201tr_bool
202tr_isPeerIo( const tr_peerIo * io )
203{
204    return ( io != NULL )
205        && ( io->magicNumber == MAGIC_NUMBER )
206        && ( tr_isAddress( &io->addr ) )
207        && ( _isBool( io->isEncrypted ) )
208        && ( _isBool( io->isIncoming ) )
209        && ( _isBool( io->peerIdIsSet ) )
210        && ( _isBool( io->extendedProtocolSupported ) )
211        && ( _isBool( io->fastExtensionSupported ) );
212}
213
214static void
215event_read_cb( int fd, short event UNUSED, void * vio )
216{
217    int res;
218    int e;
219    tr_peerIo * io = vio;
220
221    /* Limit the input buffer to 256K, so it doesn't grow too large */
222    size_t howmuch;
223    const tr_direction dir = TR_DOWN;
224    const size_t max = 256 * 1024;
225    const size_t curlen = EVBUFFER_LENGTH( io->inbuf );
226
227    howmuch = curlen >= max ? 0 : max - curlen;
228    howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch );
229
230    assert( tr_isPeerIo( io ) );
231
232    dbgmsg( io, "libevent says this peer is ready to read" );
233
234    /* if we don't have any bandwidth left, stop reading */
235    if( howmuch < 1 ) {
236        tr_peerIoSetEnabled( io, dir, FALSE );
237        return;
238    }
239
240    errno = 0;
241    res = evbuffer_read( io->inbuf, fd, howmuch );
242    e = errno;
243
244    if( res > 0 )
245    {
246        tr_peerIoSetEnabled( io, dir, TRUE );
247
248        /* Invoke the user callback - must always be called last */
249        canReadWrapper( io );
250    }
251    else
252    {
253        short what = EVBUFFER_READ;
254
255        if( res == 0 ) /* EOF */
256            what |= EVBUFFER_EOF;
257        else if( res == -1 ) {
258            if( e == EAGAIN || e == EINTR ) {
259                tr_peerIoSetEnabled( io, dir, TRUE );
260                return;
261            }
262            what |= EVBUFFER_ERROR;
263        }
264
265        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
266
267        if( io->gotError != NULL )
268            io->gotError( io, what, io->userData );
269    }
270}
271
272static int
273tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
274{
275    struct evbuffer * buffer = io->outbuf;
276    int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
277    int e;
278
279    errno = 0;
280#ifdef WIN32
281    n = send(fd, buffer->buffer, n,  0 );
282#else
283    n = write(fd, buffer->buffer, n );
284#endif
285    e = errno;
286    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(e):"") );
287
288    if( n == -1 )
289        return -1;
290    if (n == 0)
291        return 0;
292    evbuffer_drain( buffer, n );
293
294    return n;
295}
296
297static void
298event_write_cb( int fd, short event UNUSED, void * vio )
299{
300    int res = 0;
301    int e;
302    short what = EVBUFFER_WRITE;
303    tr_peerIo * io = vio;
304    size_t howmuch;
305    const tr_direction dir = TR_UP;
306
307    assert( tr_isPeerIo( io ) );
308
309    dbgmsg( io, "libevent says this peer is ready to write" );
310
311    /* Write as much as possible, since the socket is non-blocking, write() will
312     * return if it can't write any more data without blocking */
313    howmuch = tr_bandwidthClamp( io->bandwidth, dir, EVBUFFER_LENGTH( io->outbuf ) );
314
315    /* if we don't have any bandwidth left, stop writing */
316    if( howmuch < 1 ) {
317        tr_peerIoSetEnabled( io, dir, FALSE );
318        return;
319    }
320
321    errno = 0;
322    res = tr_evbuffer_write( io, fd, howmuch );
323    e = errno;
324
325    if (res == -1) {
326#ifndef WIN32
327/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
328 *  *set errno. thus this error checking is not portable*/
329        if (e == EAGAIN || e == EINTR || e == EINPROGRESS)
330            goto reschedule;
331        /* error case */
332        what |= EVBUFFER_ERROR;
333
334#else
335        goto reschedule;
336#endif
337
338    } else if (res == 0) {
339        /* eof case */
340        what |= EVBUFFER_EOF;
341    }
342    if (res <= 0)
343        goto error;
344
345    if( EVBUFFER_LENGTH( io->outbuf ) )
346        tr_peerIoSetEnabled( io, dir, TRUE );
347
348    didWriteWrapper( io, res );
349    return;
350
351 reschedule:
352    if( EVBUFFER_LENGTH( io->outbuf ) )
353        tr_peerIoSetEnabled( io, dir, TRUE );
354    return;
355
356 error:
357
358    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
359
360    if( io->gotError != NULL )
361        io->gotError( io, what, io->userData );
362}
363
364/**
365***
366**/
367
368static tr_peerIo*
369tr_peerIoNew( tr_session       * session,
370              const tr_address * addr,
371              tr_port            port,
372              const uint8_t    * torrentHash,
373              int                isIncoming,
374              int                socket )
375{
376    tr_peerIo * io;
377
378    if( socket >= 0 )
379        tr_netSetTOS( socket, session->peerSocketTOS );
380
381    io = tr_new0( tr_peerIo, 1 );
382    io->magicNumber = MAGIC_NUMBER;
383    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
384    io->session = session;
385    io->addr = *addr;
386    io->port = port;
387    io->socket = socket;
388    io->isIncoming = isIncoming != 0;
389    io->timeCreated = time( NULL );
390    io->inbuf = evbuffer_new( );
391    io->outbuf = evbuffer_new( );
392
393    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
394    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
395
396    __tr_list_init( &io->outbuf_datatypes );
397
398    tr_peerIoSetBandwidth( io, session->bandwidth );
399
400    return io;
401}
402
403tr_peerIo*
404tr_peerIoNewIncoming( tr_session       * session,
405                      const tr_address * addr,
406                      tr_port            port,
407                      int                socket )
408{
409    assert( session );
410    assert( tr_isAddress( addr ) );
411    assert( socket >= 0 );
412
413    return tr_peerIoNew( session, addr, port, NULL, 1, socket );
414}
415
416tr_peerIo*
417tr_peerIoNewOutgoing( tr_session       * session,
418                      const tr_address * addr,
419                      tr_port            port,
420                      const uint8_t    * torrentHash )
421{
422    int socket;
423
424    assert( session );
425    assert( tr_isAddress( addr ) );
426    assert( torrentHash );
427
428    socket = tr_netOpenTCP( session, addr, port );
429
430    return socket < 0
431           ? NULL
432           : tr_peerIoNew( session, addr, port, torrentHash, 0, socket );
433}
434
435static void
436trDatatypeFree( void * data )
437{
438    struct tr_datatype * dt = __tr_list_entry( data, struct tr_datatype, head );
439    tr_free(dt);
440}
441
442static void
443io_dtor( void * vio )
444{
445    tr_peerIo * io = vio;
446
447    event_del( &io->event_read );
448    event_del( &io->event_write );
449    tr_peerIoSetBandwidth( io, NULL );
450    evbuffer_free( io->outbuf );
451    evbuffer_free( io->inbuf );
452    tr_netClose( io->socket );
453    tr_cryptoFree( io->crypto );
454    __tr_list_destroy( &io->outbuf_datatypes, trDatatypeFree );
455
456    io->magicNumber = 0xDEAD;
457    tr_free( io );
458}
459
460void
461tr_peerIoFree( tr_peerIo * io )
462{
463    if( io )
464    {
465        io->canRead = NULL;
466        io->didWrite = NULL;
467        io->gotError = NULL;
468        tr_runInEventThread( io->session, io_dtor, io );
469    }
470}
471
472tr_session*
473tr_peerIoGetSession( tr_peerIo * io )
474{
475    assert( tr_isPeerIo( io ) );
476    assert( io->session );
477
478    return io->session;
479}
480
481const tr_address*
482tr_peerIoGetAddress( const tr_peerIo * io,
483                           tr_port   * port )
484{
485    assert( tr_isPeerIo( io ) );
486
487    if( port )
488        *port = io->port;
489
490    return &io->addr;
491}
492
493const char*
494tr_peerIoAddrStr( const tr_address * addr, tr_port port )
495{
496    static char buf[512];
497
498    if( addr->type == TR_AF_INET ) 
499        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
500    else 
501        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) ); 
502    return buf;
503}
504
505const char*
506tr_peerIoGetAddrStr( const tr_peerIo * io )
507{
508    return tr_peerIoAddrStr( &io->addr, io->port );
509}
510
511void
512tr_peerIoSetIOFuncs( tr_peerIo        * io,
513                     tr_can_read_cb     readcb,
514                     tr_did_write_cb    writecb,
515                     tr_net_error_cb    errcb,
516                     void             * userData )
517{
518    io->canRead = readcb;
519    io->didWrite = writecb;
520    io->gotError = errcb;
521    io->userData = userData;
522}
523
524tr_bool
525tr_peerIoIsIncoming( const tr_peerIo * c )
526{
527    return c->isIncoming != 0;
528}
529
530int
531tr_peerIoReconnect( tr_peerIo * io )
532{
533    assert( !tr_peerIoIsIncoming( io ) );
534
535    if( io->socket >= 0 )
536        tr_netClose( io->socket );
537
538    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port );
539
540    if( io->socket >= 0 )
541    {
542        tr_bandwidth * bandwidth = io->bandwidth;
543        tr_peerIoSetBandwidth( io, NULL );
544        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
545        tr_peerIoSetBandwidth( io, bandwidth );
546        return 0;
547    }
548
549    return -1;
550}
551
552/**
553***
554**/
555
556void
557tr_peerIoSetTorrentHash( tr_peerIo *     io,
558                         const uint8_t * hash )
559{
560    assert( tr_isPeerIo( io ) );
561
562    tr_cryptoSetTorrentHash( io->crypto, hash );
563}
564
565const uint8_t*
566tr_peerIoGetTorrentHash( tr_peerIo * io )
567{
568    assert( tr_isPeerIo( io ) );
569    assert( io->crypto );
570
571    return tr_cryptoGetTorrentHash( io->crypto );
572}
573
574int
575tr_peerIoHasTorrentHash( const tr_peerIo * io )
576{
577    assert( tr_isPeerIo( io ) );
578    assert( io->crypto );
579
580    return tr_cryptoHasTorrentHash( io->crypto );
581}
582
583/**
584***
585**/
586
587void
588tr_peerIoSetPeersId( tr_peerIo *     io,
589                     const uint8_t * peer_id )
590{
591    assert( tr_isPeerIo( io ) );
592
593    if( ( io->peerIdIsSet = peer_id != NULL ) )
594        memcpy( io->peerId, peer_id, 20 );
595    else
596        memset( io->peerId, 0, 20 );
597}
598
599const uint8_t*
600tr_peerIoGetPeersId( const tr_peerIo * io )
601{
602    assert( tr_isPeerIo( io ) );
603    assert( io->peerIdIsSet );
604
605    return io->peerId;
606}
607
608/**
609***
610**/
611
612void
613tr_peerIoEnableFEXT( tr_peerIo * io,
614                     tr_bool     flag )
615{
616    assert( tr_isPeerIo( io ) );
617    assert( _isBool( flag ) );
618
619    dbgmsg( io, "setting FEXT support flag to %d", (flag!=0) );
620    io->fastExtensionSupported = flag;
621}
622
623tr_bool
624tr_peerIoSupportsFEXT( const tr_peerIo * io )
625{
626    assert( tr_isPeerIo( io ) );
627
628    return io->fastExtensionSupported;
629}
630
631/**
632***
633**/
634
635void
636tr_peerIoEnableLTEP( tr_peerIo  * io,
637                     tr_bool      flag )
638{
639    assert( tr_isPeerIo( io ) );
640    assert( _isBool( flag ) );
641
642    dbgmsg( io, "setting LTEP support flag to %d", (flag!=0) );
643    io->extendedProtocolSupported = flag;
644}
645
646tr_bool
647tr_peerIoSupportsLTEP( const tr_peerIo * io )
648{
649    assert( tr_isPeerIo( io ) );
650
651    return io->extendedProtocolSupported;
652}
653
654/**
655***
656**/
657
658static size_t
659getDesiredOutputBufferSize( const tr_peerIo * io )
660{
661    /* this is all kind of arbitrary, but what seems to work well is
662     * being large enough to hold the next 20 seconds' worth of input,
663     * or a few blocks, whichever is bigger.
664     * It's okay to tweak this as needed */
665    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
666    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
667    const double period = 20; /* arbitrary */
668    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
669    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
670}
671
672size_t
673tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
674{
675    const size_t desiredLen = getDesiredOutputBufferSize( io );
676    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
677    size_t freeSpace = 0;
678
679    if( desiredLen > currentLen )
680        freeSpace = desiredLen - currentLen;
681
682    return freeSpace;
683}
684
685void
686tr_peerIoSetBandwidth( tr_peerIo     * io,
687                       tr_bandwidth  * bandwidth )
688{
689    assert( tr_isPeerIo( io ) );
690
691    if( io->bandwidth )
692        tr_bandwidthRemovePeer( io->bandwidth, io );
693
694    io->bandwidth = bandwidth;
695
696    if( io->bandwidth )
697        tr_bandwidthAddPeer( io->bandwidth, io );
698}
699
700/**
701***
702**/
703
704tr_crypto*
705tr_peerIoGetCrypto( tr_peerIo * c )
706{
707    return c->crypto;
708}
709
710void
711tr_peerIoSetEncryption( tr_peerIo * io,
712                        int         encryptionMode )
713{
714    assert( tr_isPeerIo( io ) );
715    assert( encryptionMode == PEER_ENCRYPTION_NONE
716          || encryptionMode == PEER_ENCRYPTION_RC4 );
717
718    io->encryptionMode = encryptionMode;
719}
720
721int
722tr_peerIoIsEncrypted( const tr_peerIo * io )
723{
724    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
725}
726
727/**
728***
729**/
730
731void
732tr_peerIoWrite( tr_peerIo   * io,
733                const void  * writeme,
734                size_t        writemeLen,
735                int           isPieceData )
736{
737    struct tr_datatype * datatype;
738
739    assert( tr_amInEventThread( io->session ) );
740    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
741
742    datatype = tr_new( struct tr_datatype, 1 );
743    datatype->isPieceData = isPieceData != 0;
744    datatype->length = writemeLen;
745
746    __tr_list_init( &datatype->head );
747    __tr_list_append( &io->outbuf_datatypes, &datatype->head );
748
749    evbuffer_add( io->outbuf, writeme, writemeLen );
750}
751
752void
753tr_peerIoWriteBuf( tr_peerIo         * io,
754                   struct evbuffer   * buf,
755                   int                 isPieceData )
756{
757    const size_t n = EVBUFFER_LENGTH( buf );
758
759    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
760    evbuffer_drain( buf, n );
761}
762
763/**
764***
765**/
766
767void
768tr_peerIoWriteBytes( tr_peerIo       * io,
769                     struct evbuffer * outbuf,
770                     const void      * bytes,
771                     size_t            byteCount )
772{
773    uint8_t tmp[MAX_STACK_ARRAY_SIZE];
774
775    switch( io->encryptionMode )
776    {
777        case PEER_ENCRYPTION_NONE:
778            evbuffer_add( outbuf, bytes, byteCount );
779            break;
780
781        case PEER_ENCRYPTION_RC4:
782            evbuffer_expand( outbuf, byteCount );
783            while( byteCount > 0 ) {
784                const size_t thisPass = MIN( byteCount, sizeof( tmp ) );
785                tr_cryptoEncrypt( io->crypto, thisPass, bytes, tmp );
786                evbuffer_add( outbuf, tmp, thisPass );
787                bytes += thisPass;
788                byteCount -= thisPass;
789            }
790            break;
791
792        default:
793            assert( 0 );
794    }
795}
796
797void
798tr_peerIoWriteUint8( tr_peerIo       * io,
799                     struct evbuffer * outbuf,
800                     uint8_t           writeme )
801{
802    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
803}
804
805void
806tr_peerIoWriteUint16( tr_peerIo       * io,
807                      struct evbuffer * outbuf,
808                      uint16_t          writeme )
809{
810    uint16_t tmp = htons( writeme );
811
812    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
813}
814
815void
816tr_peerIoWriteUint32( tr_peerIo       * io,
817                      struct evbuffer * outbuf,
818                      uint32_t          writeme )
819{
820    uint32_t tmp = htonl( writeme );
821
822    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
823}
824
825/***
826****
827***/
828
829void
830tr_peerIoReadBytes( tr_peerIo       * io,
831                    struct evbuffer * inbuf,
832                    void            * bytes,
833                    size_t            byteCount )
834{
835    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
836
837    switch( io->encryptionMode )
838    {
839        case PEER_ENCRYPTION_NONE:
840            evbuffer_remove( inbuf, bytes, byteCount );
841            break;
842
843        case PEER_ENCRYPTION_RC4:
844            evbuffer_remove( inbuf, bytes, byteCount );
845            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
846            break;
847
848        default:
849            assert( 0 );
850    }
851}
852
853void
854tr_peerIoReadUint8( tr_peerIo       * io,
855                    struct evbuffer * inbuf,
856                    uint8_t         * setme )
857{
858    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
859}
860
861void
862tr_peerIoReadUint16( tr_peerIo       * io,
863                     struct evbuffer * inbuf,
864                     uint16_t        * setme )
865{
866    uint16_t tmp;
867
868    assert( tr_isPeerIo( io ) );
869
870    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
871    *setme = ntohs( tmp );
872}
873
874void
875tr_peerIoReadUint32( tr_peerIo       * io,
876                     struct evbuffer * inbuf,
877                     uint32_t        * setme )
878{
879    uint32_t tmp;
880
881    assert( tr_isPeerIo( io ) );
882
883    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
884    *setme = ntohl( tmp );
885}
886
887void
888tr_peerIoDrain( tr_peerIo       * io,
889                struct evbuffer * inbuf,
890                size_t            byteCount )
891{
892    uint8_t tmp[MAX_STACK_ARRAY_SIZE];
893
894    assert( tr_isPeerIo( io ) );
895
896    while( byteCount > 0 )
897    {
898        const size_t thisPass = MIN( byteCount, sizeof( tmp ) );
899        tr_peerIoReadBytes( io, inbuf, tmp, thisPass );
900        byteCount -= thisPass;
901    }
902}
903
904int
905tr_peerIoGetAge( const tr_peerIo * io )
906{
907    return time( NULL ) - io->timeCreated;
908}
909
910/***
911****
912***/
913
914static int
915tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
916{
917    int res = 0;
918
919    assert( tr_isPeerIo( io ) );
920
921    if(( howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch )))
922    {
923        int e;
924        errno = 0;
925        res = evbuffer_read( io->inbuf, io->socket, howmuch );
926        e = errno;
927
928        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
929
930        if( EVBUFFER_LENGTH( io->inbuf ) )
931            canReadWrapper( io );
932
933        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
934        {
935            short what = EVBUFFER_READ | EVBUFFER_ERROR;
936            if( res == 0 )
937                what |= EVBUFFER_EOF;
938            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
939            io->gotError( io, what, io->userData );
940        }
941    }
942
943    return res;
944}
945
946static int
947tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
948{
949    int n;
950
951    assert( tr_isPeerIo( io ) );
952
953    if(( howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch )))
954    {
955        int e;
956        errno = 0;
957        n = tr_evbuffer_write( io, io->socket, (int)howmuch );
958        e = errno;
959
960        if( n > 0 )
961            didWriteWrapper( io, n );
962
963        if( ( n < 0 ) && ( io->gotError ) && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
964        {
965            const short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
966            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, strerror( e ) );
967            io->gotError( io, what, io->userData );
968        }
969    }
970
971    return n;
972}
973
974int
975tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
976{
977    int ret;
978
979    assert( tr_isPeerIo( io ) );
980    assert( tr_isDirection( dir ) );
981
982    if( dir==TR_DOWN )
983        ret = tr_peerIoTryRead( io, limit );
984    else
985        ret = tr_peerIoTryWrite( io, limit );
986
987    return ret;
988}
989
990struct evbuffer *
991tr_peerIoGetReadBuffer( tr_peerIo * io )
992{
993    assert( tr_isPeerIo( io ) );
994
995    return io->inbuf;
996}
997
998tr_bool
999tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
1000{
1001    assert( tr_isPeerIo( io ) );
1002    assert( tr_isDirection( dir ) );
1003
1004    return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
1005}
1006
1007/***
1008****
1009****/
1010
1011static void
1012event_enable( tr_peerIo * io, short event )
1013{
1014    assert( tr_isPeerIo( io ) );
1015
1016    if( event & EV_READ )
1017        event_add( &io->event_read, NULL );
1018
1019    if( event & EV_WRITE )
1020        event_add( &io->event_write, NULL );
1021}
1022
1023static void
1024event_disable( struct tr_peerIo * io, short event )
1025{
1026    assert( tr_isPeerIo( io ) );
1027
1028    if( event & EV_READ )
1029        event_del( &io->event_read );
1030
1031    if( event & EV_WRITE )
1032        event_del( &io->event_write );
1033}
1034
1035
1036void
1037tr_peerIoSetEnabled( tr_peerIo    * io,
1038                     tr_direction   dir,
1039                     tr_bool        isEnabled )
1040{
1041    short event;
1042
1043    assert( tr_isPeerIo( io ) );
1044    assert( tr_isDirection( dir ) );
1045
1046    event = dir == TR_UP ? EV_WRITE : EV_READ;
1047
1048    if( isEnabled )
1049        event_enable( io, event );
1050    else
1051        event_disable( io, event );
1052}
Note: See TracBrowser for help on using the repository browser.