source: branches/1.4x/libtransmission/peer-io.c @ 7455

Last change on this file since 7455 was 7455, checked in by charles, 12 years ago

(1.4x libT) backport handshake, peer, bandwidth, peer-io to 1.4x.

  • Property svn:keywords set to Date Rev Author Id
File size: 22.5 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 7455 2008-12-22 00:51:14Z charles $
11 */
12
13#include <assert.h>
14#include <limits.h> /* INT_MAX */
15#include <string.h>
16#include <stdio.h>
17#include <unistd.h>
18
19#ifdef WIN32
20 #include <winsock2.h>
21#else
22 #include <arpa/inet.h> /* inet_ntoa */
23#endif
24
25#include <event.h>
26
27#include "transmission.h"
28#include "bandwidth.h"
29#include "crypto.h"
30#include "list.h"
31#include "net.h"
32#include "peer-io.h"
33#include "trevent.h"
34#include "utils.h"
35
36#define MAGIC_NUMBER 206745
37
38static size_t
39getPacketOverhead( size_t d )
40{
41    /**
42     * http://sd.wareonearth.com/~phil/net/overhead/
43     *
44     * TCP over Ethernet:
45     * Assuming no header compression (e.g. not PPP)
46     * Add 20 IPv4 header or 40 IPv6 header (no options)
47     * Add 20 TCP header
48     * Add 12 bytes optional TCP timestamps
49     * Max TCP Payload data rates over ethernet are thus:
50     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
51     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
52     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
53     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
54     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
55     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
56     */
57    static const double assumed_payload_data_rate = 94.0;
58
59    return (size_t)( d * ( 100.0 / assumed_payload_data_rate ) - d );
60}
61
62/**
63***
64**/
65
66#define dbgmsg( io, ... ) \
67    do { \
68        if( tr_deepLoggingIsActive( ) ) \
69            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
70    } while( 0 )
71
72struct tr_datatype
73{
74    tr_bool  isPieceData;
75    size_t   length;
76};
77
78struct tr_peerIo
79{
80    tr_bool            isEncrypted;
81    tr_bool            isIncoming;
82    tr_bool            peerIdIsSet;
83    tr_bool            extendedProtocolSupported;
84    tr_bool            fastExtensionSupported;
85
86    int                magicNumber;
87
88    uint8_t            encryptionMode;
89    tr_port            port;
90    int                socket;
91
92    uint8_t            peerId[20];
93    time_t             timeCreated;
94
95    tr_session       * session;
96
97    tr_address         addr;
98    tr_list          * output_datatypes; /* struct tr_datatype */
99
100    tr_can_read_cb     canRead;
101    tr_did_write_cb    didWrite;
102    tr_net_error_cb    gotError;
103    void *             userData;
104
105    size_t             bufferSize[2];
106
107    tr_bandwidth     * bandwidth;
108    tr_crypto        * crypto;
109
110    struct evbuffer  * inbuf;
111    struct evbuffer  * outbuf;
112
113    struct event       event_read;
114    struct event       event_write;
115};
116
117/***
118****
119***/
120
121static void
122didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
123{
124    while( bytes_transferred )
125    {
126        struct tr_datatype * next = io->output_datatypes->data;
127        const size_t payload = MIN( next->length, bytes_transferred );
128        const size_t overhead = getPacketOverhead( payload );
129
130        tr_bandwidthUsed( io->bandwidth, TR_UP, payload, next->isPieceData );
131
132        if( overhead > 0 )
133            tr_bandwidthUsed( io->bandwidth, TR_UP, overhead, FALSE );
134
135        if( io->didWrite )
136            io->didWrite( io, payload, next->isPieceData, io->userData );
137
138        bytes_transferred -= payload;
139        next->length -= payload;
140        if( !next->length )
141            tr_free( tr_list_pop_front( &io->output_datatypes ) );
142    }
143}
144
145static void
146canReadWrapper( tr_peerIo * io )
147{
148    tr_bool done = 0;
149    tr_bool err = 0;
150    tr_session * session = io->session;
151
152    dbgmsg( io, "canRead" );
153
154    /* try to consume the input buffer */
155    if( io->canRead )
156    {
157        tr_globalLock( session );
158
159        while( !done && !err )
160        {
161            size_t piece = 0;
162            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
163            const int ret = io->canRead( io, io->userData, &piece );
164
165            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
166
167            if( piece )
168                tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
169
170            if( used != piece )
171                tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
172
173            switch( ret )
174            {
175                case READ_NOW:
176                    if( EVBUFFER_LENGTH( io->inbuf ) )
177                        continue;
178                    done = 1;
179                    break;
180
181                case READ_LATER:
182                    done = 1;
183                    break;
184
185                case READ_ERR:
186                    err = 1;
187                    break;
188            }
189        }
190
191        tr_globalUnlock( session );
192    }
193}
194
195#define _isBool(b) (((b)==0 || (b)==1))
196
197tr_bool
198tr_isPeerIo( const tr_peerIo * io )
199{
200    return ( io != NULL )
201        && ( io->magicNumber == MAGIC_NUMBER )
202        && ( _isBool( io->isEncrypted ) )
203        && ( _isBool( io->isIncoming ) )
204        && ( _isBool( io->peerIdIsSet ) )
205        && ( _isBool( io->extendedProtocolSupported ) )
206        && ( _isBool( io->fastExtensionSupported ) );
207}
208
209static void
210event_read_cb( int fd, short event UNUSED, void * vio )
211{
212    int res;
213    short what = EVBUFFER_READ;
214    tr_peerIo * io = vio;
215    const size_t howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, io->session->so_rcvbuf );
216    const tr_direction dir = TR_DOWN;
217
218    assert( tr_isPeerIo( io ) );
219
220    dbgmsg( io, "libevent says this peer is ready to read" );
221
222    /* if we don't have any bandwidth left, stop reading */
223    if( howmuch < 1 ) {
224        tr_peerIoSetEnabled( io, dir, FALSE );
225        return;
226    }
227
228    res = evbuffer_read( io->inbuf, fd, howmuch );
229    if( res == -1 ) {
230        if( errno == EAGAIN || errno == EINTR )
231            goto reschedule;
232        /* error case */
233        what |= EVBUFFER_ERROR;
234    } else if( res == 0 ) {
235        /* eof case */
236        what |= EVBUFFER_EOF;
237    }
238
239    if( res <= 0 )
240        goto error;
241
242    tr_peerIoSetEnabled( io, dir, TRUE );
243
244    /* Invoke the user callback - must always be called last */
245    canReadWrapper( io );
246
247    return;
248
249 reschedule:
250    tr_peerIoSetEnabled( io, dir, TRUE );
251    return;
252
253 error:
254    if( io->gotError != NULL )
255        io->gotError( io, what, io->userData );
256}
257
258static int
259tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
260{
261    struct evbuffer * buffer = io->outbuf;
262    int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
263
264#ifdef WIN32
265    n = send(fd, buffer->buffer, n,  0 );
266#else
267    n = write(fd, buffer->buffer, n );
268#endif
269    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
270
271    if( n == -1 )
272        return -1;
273    if (n == 0)
274        return 0;
275    evbuffer_drain( buffer, n );
276
277    return n;
278}
279
280static void
281event_write_cb( int fd, short event UNUSED, void * vio )
282{
283    int res = 0;
284    short what = EVBUFFER_WRITE;
285    tr_peerIo * io = vio;
286    size_t howmuch;
287    const tr_direction dir = TR_UP;
288
289    assert( tr_isPeerIo( io ) );
290
291    dbgmsg( io, "libevent says this peer is ready to write" );
292
293    howmuch = MIN( (size_t)io->session->so_sndbuf, EVBUFFER_LENGTH( io->outbuf ) );
294    howmuch = tr_bandwidthClamp( io->bandwidth, dir, howmuch );
295
296    /* if we don't have any bandwidth left, stop writing */
297    if( howmuch < 1 ) {
298        tr_peerIoSetEnabled( io, dir, FALSE );
299        return;
300    }
301
302    res = tr_evbuffer_write( io, fd, howmuch );
303    if (res == -1) {
304#ifndef WIN32
305/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
306 *  *set errno. thus this error checking is not portable*/
307        if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
308            goto reschedule;
309        /* error case */
310        what |= EVBUFFER_ERROR;
311
312#else
313        goto reschedule;
314#endif
315
316    } else if (res == 0) {
317        /* eof case */
318        what |= EVBUFFER_EOF;
319    }
320    if (res <= 0)
321        goto error;
322
323    if( EVBUFFER_LENGTH( io->outbuf ) )
324        tr_peerIoSetEnabled( io, dir, TRUE );
325
326    didWriteWrapper( io, res );
327    return;
328
329 reschedule:
330    if( EVBUFFER_LENGTH( io->outbuf ) )
331        tr_peerIoSetEnabled( io, dir, TRUE );
332    return;
333
334 error:
335    if( io->gotError != NULL )
336        io->gotError( io, what, io->userData );
337}
338
339/**
340***
341**/
342
343
344static int
345isFlag( int flag )
346{
347    return( ( flag == 0 ) || ( flag == 1 ) );
348}
349
350static tr_peerIo*
351tr_peerIoNew( tr_session       * session,
352              const tr_address * addr,
353              tr_port            port,
354              const uint8_t    * torrentHash,
355              int                isIncoming,
356              int                socket )
357{
358    tr_peerIo * io;
359
360    if( socket >= 0 )
361        tr_netSetTOS( socket, session->peerSocketTOS );
362
363    io = tr_new0( tr_peerIo, 1 );
364    io->magicNumber = MAGIC_NUMBER;
365    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
366    io->session = session;
367    io->addr = *addr;
368    io->port = port;
369    io->socket = socket;
370    io->isIncoming = isIncoming != 0;
371    io->timeCreated = time( NULL );
372    io->inbuf = evbuffer_new( );
373    io->outbuf = evbuffer_new( );
374    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
375    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
376#if 0
377    bufevNew( io );
378#endif
379    tr_peerIoSetBandwidth( io, session->bandwidth );
380    return io;
381}
382
383tr_peerIo*
384tr_peerIoNewIncoming( tr_session       * session,
385                      const tr_address * addr,
386                      tr_port            port,
387                      int                socket )
388{
389    assert( session );
390    assert( addr );
391    assert( socket >= 0 );
392
393    return tr_peerIoNew( session, addr, port, NULL, 1, socket );
394}
395
396tr_peerIo*
397tr_peerIoNewOutgoing( tr_session       * session,
398                      const tr_address * addr,
399                      tr_port            port,
400                      const uint8_t    * torrentHash )
401{
402    int socket;
403
404    assert( session );
405    assert( addr );
406    assert( torrentHash );
407
408    socket = tr_netOpenTCP( session, addr, port );
409
410    return socket < 0
411           ? NULL
412           : tr_peerIoNew( session, addr, port, torrentHash, 0, socket );
413}
414
415static void
416io_dtor( void * vio )
417{
418    tr_peerIo * io = vio;
419
420    event_del( &io->event_read );
421    event_del( &io->event_write );
422    tr_peerIoSetBandwidth( io, NULL );
423    evbuffer_free( io->outbuf );
424    evbuffer_free( io->inbuf );
425    tr_netClose( io->socket );
426    tr_cryptoFree( io->crypto );
427    tr_list_free( &io->output_datatypes, tr_free );
428
429    io->magicNumber = 0xDEAD;
430    tr_free( io );
431}
432
433void
434tr_peerIoFree( tr_peerIo * io )
435{
436    if( io )
437    {
438        io->canRead = NULL;
439        io->didWrite = NULL;
440        io->gotError = NULL;
441        tr_runInEventThread( io->session, io_dtor, io );
442    }
443}
444
445tr_session*
446tr_peerIoGetSession( tr_peerIo * io )
447{
448    assert( tr_isPeerIo( io ) );
449    assert( io->session );
450
451    return io->session;
452}
453
454const tr_address*
455tr_peerIoGetAddress( const tr_peerIo * io,
456                           tr_port   * port )
457{
458    assert( tr_isPeerIo( io ) );
459
460    if( port )
461        *port = io->port;
462
463    return &io->addr;
464}
465
466const char*
467tr_peerIoAddrStr( const tr_address * addr, tr_port port )
468{
469    static char buf[512];
470    tr_snprintf( buf, sizeof( buf ), "%s:%u", inet_ntoa( *addr ), ntohs( port ) );
471    return buf;
472}
473
474const char*
475tr_peerIoGetAddrStr( const tr_peerIo * io )
476{
477    return tr_peerIoAddrStr( &io->addr, io->port );
478}
479
480void
481tr_peerIoSetIOFuncs( tr_peerIo        * io,
482                     tr_can_read_cb     readcb,
483                     tr_did_write_cb    writecb,
484                     tr_net_error_cb    errcb,
485                     void             * userData )
486{
487    io->canRead = readcb;
488    io->didWrite = writecb;
489    io->gotError = errcb;
490    io->userData = userData;
491}
492
493tr_bool
494tr_peerIoIsIncoming( const tr_peerIo * c )
495{
496    return c->isIncoming != 0;
497}
498
499int
500tr_peerIoReconnect( tr_peerIo * io )
501{
502    assert( !tr_peerIoIsIncoming( io ) );
503
504    if( io->socket >= 0 )
505        tr_netClose( io->socket );
506
507    io->socket = tr_netOpenTCP( io->session, &io->addr, io->port );
508
509    if( io->socket >= 0 )
510    {
511        tr_bandwidth * bandwidth = io->bandwidth;
512        tr_peerIoSetBandwidth( io, NULL );
513
514        tr_netSetTOS( io->socket, io->session->peerSocketTOS );
515#if 0
516        bufevNew( io );
517#endif
518
519        tr_peerIoSetBandwidth( io, bandwidth );
520        return 0;
521    }
522
523    return -1;
524}
525
526/**
527***
528**/
529
530void
531tr_peerIoSetTorrentHash( tr_peerIo *     io,
532                         const uint8_t * hash )
533{
534    assert( tr_isPeerIo( io ) );
535
536    tr_cryptoSetTorrentHash( io->crypto, hash );
537}
538
539const uint8_t*
540tr_peerIoGetTorrentHash( tr_peerIo * io )
541{
542    assert( tr_isPeerIo( io ) );
543    assert( io->crypto );
544
545    return tr_cryptoGetTorrentHash( io->crypto );
546}
547
548int
549tr_peerIoHasTorrentHash( const tr_peerIo * io )
550{
551    assert( tr_isPeerIo( io ) );
552    assert( io->crypto );
553
554    return tr_cryptoHasTorrentHash( io->crypto );
555}
556
557/**
558***
559**/
560
561void
562tr_peerIoSetPeersId( tr_peerIo *     io,
563                     const uint8_t * peer_id )
564{
565    assert( tr_isPeerIo( io ) );
566
567    if( ( io->peerIdIsSet = peer_id != NULL ) )
568        memcpy( io->peerId, peer_id, 20 );
569    else
570        memset( io->peerId, 0, 20 );
571}
572
573const uint8_t*
574tr_peerIoGetPeersId( const tr_peerIo * io )
575{
576    assert( tr_isPeerIo( io ) );
577    assert( io->peerIdIsSet );
578
579    return io->peerId;
580}
581
582/**
583***
584**/
585
586void
587tr_peerIoEnableFEXT( tr_peerIo * io,
588                     tr_bool     flag )
589{
590    assert( tr_isPeerIo( io ) );
591    assert( isFlag( flag ) );
592
593    dbgmsg( io, "setting FEXT support flag to %d", (flag?1:0) );
594    io->fastExtensionSupported = flag;
595}
596
597tr_bool
598tr_peerIoSupportsFEXT( const tr_peerIo * io )
599{
600    assert( tr_isPeerIo( io ) );
601
602    return io->fastExtensionSupported;
603}
604
605/**
606***
607**/
608
609void
610tr_peerIoEnableLTEP( tr_peerIo  * io,
611                     tr_bool      flag )
612{
613    assert( tr_isPeerIo( io ) );
614    assert( isFlag( flag ) );
615
616    dbgmsg( io, "setting LTEP support flag to %d", (flag?1:0) );
617    io->extendedProtocolSupported = flag;
618}
619
620tr_bool
621tr_peerIoSupportsLTEP( const tr_peerIo * io )
622{
623    assert( tr_isPeerIo( io ) );
624
625    return io->extendedProtocolSupported;
626}
627
628/**
629***
630**/
631
632static size_t
633getDesiredOutputBufferSize( const tr_peerIo * io )
634{
635    /* this is all kind of arbitrary, but what seems to work well is
636     * being large enough to hold the next 20 seconds' worth of input,
637     * or a few blocks, whichever is bigger.
638     * It's okay to tweak this as needed */
639    const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
640    const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
641    const double period = 20; /* arbitrary */
642    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
643    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
644}
645
646size_t
647tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
648{
649    const size_t desiredLen = getDesiredOutputBufferSize( io );
650    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
651    size_t freeSpace = 0;
652
653    if( desiredLen > currentLen )
654        freeSpace = desiredLen - currentLen;
655
656    return freeSpace;
657}
658
659void
660tr_peerIoSetBandwidth( tr_peerIo     * io,
661                       tr_bandwidth  * bandwidth )
662{
663    assert( tr_isPeerIo( io ) );
664
665    if( io->bandwidth )
666        tr_bandwidthRemovePeer( io->bandwidth, io );
667
668    io->bandwidth = bandwidth;
669
670    if( io->bandwidth )
671        tr_bandwidthAddPeer( io->bandwidth, io );
672}
673
674/**
675***
676**/
677
678tr_crypto*
679tr_peerIoGetCrypto( tr_peerIo * c )
680{
681    return c->crypto;
682}
683
684void
685tr_peerIoSetEncryption( tr_peerIo * io,
686                        int         encryptionMode )
687{
688    assert( tr_isPeerIo( io ) );
689    assert( encryptionMode == PEER_ENCRYPTION_NONE
690          || encryptionMode == PEER_ENCRYPTION_RC4 );
691
692    io->encryptionMode = encryptionMode;
693}
694
695int
696tr_peerIoIsEncrypted( const tr_peerIo * io )
697{
698    return io != NULL && io->encryptionMode == PEER_ENCRYPTION_RC4;
699}
700
701/**
702***
703**/
704
705void
706tr_peerIoWrite( tr_peerIo   * io,
707                const void  * writeme,
708                size_t        writemeLen,
709                int           isPieceData )
710{
711    struct tr_datatype * datatype;
712    assert( tr_amInEventThread( io->session ) );
713    dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
714
715    datatype = tr_new( struct tr_datatype, 1 );
716    datatype->isPieceData = isPieceData != 0;
717    datatype->length = writemeLen;
718    tr_list_append( &io->output_datatypes, datatype );
719
720    evbuffer_add( io->outbuf, writeme, writemeLen );
721}
722
723void
724tr_peerIoWriteBuf( tr_peerIo         * io,
725                   struct evbuffer   * buf,
726                   int                 isPieceData )
727{
728    const size_t n = EVBUFFER_LENGTH( buf );
729
730    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
731    evbuffer_drain( buf, n );
732}
733
734/**
735***
736**/
737
738void
739tr_peerIoWriteBytes( tr_peerIo *       io,
740                     struct evbuffer * outbuf,
741                     const void *      bytes,
742                     size_t            byteCount )
743{
744    uint8_t * tmp;
745
746    switch( io->encryptionMode )
747    {
748        case PEER_ENCRYPTION_NONE:
749            evbuffer_add( outbuf, bytes, byteCount );
750            break;
751
752        case PEER_ENCRYPTION_RC4:
753            tmp = tr_new( uint8_t, byteCount );
754            tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
755            evbuffer_add( outbuf, tmp, byteCount );
756            tr_free( tmp );
757            break;
758
759        default:
760            assert( 0 );
761    }
762}
763
764void
765tr_peerIoWriteUint8( tr_peerIo *       io,
766                     struct evbuffer * outbuf,
767                     uint8_t           writeme )
768{
769    tr_peerIoWriteBytes( io, outbuf, &writeme, sizeof( uint8_t ) );
770}
771
772void
773tr_peerIoWriteUint16( tr_peerIo *       io,
774                      struct evbuffer * outbuf,
775                      uint16_t          writeme )
776{
777    uint16_t tmp = htons( writeme );
778
779    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
780}
781
782void
783tr_peerIoWriteUint32( tr_peerIo *       io,
784                      struct evbuffer * outbuf,
785                      uint32_t          writeme )
786{
787    uint32_t tmp = htonl( writeme );
788
789    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
790}
791
792/***
793****
794***/
795
796void
797tr_peerIoReadBytes( tr_peerIo *       io,
798                    struct evbuffer * inbuf,
799                    void *            bytes,
800                    size_t            byteCount )
801{
802    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
803
804    switch( io->encryptionMode )
805    {
806        case PEER_ENCRYPTION_NONE:
807            evbuffer_remove( inbuf, bytes, byteCount );
808            break;
809
810        case PEER_ENCRYPTION_RC4:
811            evbuffer_remove( inbuf, bytes, byteCount );
812            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
813            break;
814
815        default:
816            assert( 0 );
817    }
818}
819
820void
821tr_peerIoReadUint8( tr_peerIo *       io,
822                    struct evbuffer * inbuf,
823                    uint8_t *         setme )
824{
825    tr_peerIoReadBytes( io, inbuf, setme, sizeof( uint8_t ) );
826}
827
828void
829tr_peerIoReadUint16( tr_peerIo *       io,
830                     struct evbuffer * inbuf,
831                     uint16_t *        setme )
832{
833    uint16_t tmp;
834
835    assert( tr_isPeerIo( io ) );
836
837    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
838    *setme = ntohs( tmp );
839}
840
841void
842tr_peerIoReadUint32( tr_peerIo *       io,
843                     struct evbuffer * inbuf,
844                     uint32_t *        setme )
845{
846    uint32_t tmp;
847
848    assert( tr_isPeerIo( io ) );
849
850    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
851    *setme = ntohl( tmp );
852}
853
854void
855tr_peerIoDrain( tr_peerIo *       io,
856                struct evbuffer * inbuf,
857                size_t            byteCount )
858{
859    uint8_t * tmp;
860
861    assert( tr_isPeerIo( io ) );
862
863    tmp = tr_new( uint8_t, byteCount );
864    tr_peerIoReadBytes( io, inbuf, tmp, byteCount );
865    tr_free( tmp );
866}
867
868int
869tr_peerIoGetAge( const tr_peerIo * io )
870{
871    return time( NULL ) - io->timeCreated;
872}
873
874/***
875****
876***/
877
878static int
879tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
880{
881    int res;
882
883    assert( tr_isPeerIo( io ) );
884
885    howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, howmuch );
886
887    res = howmuch ? evbuffer_read( io->inbuf, io->socket, howmuch ) : 0;
888
889    dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(errno):"") );
890
891    if( EVBUFFER_LENGTH( io->inbuf ) )
892        canReadWrapper( io );
893
894    if( ( res <= 0 ) && ( io->gotError ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
895    {
896        short what = EVBUFFER_READ | EVBUFFER_ERROR;
897        if( res == 0 )
898            what |= EVBUFFER_EOF;
899        io->gotError( io, what, io->userData );
900    }
901
902    return res;
903}
904
905static int
906tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
907{
908    int n;
909
910    assert( tr_isPeerIo( io ) );
911
912    howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch );
913
914    n = tr_evbuffer_write( io, io->socket, (int)howmuch );
915
916    if( n > 0 )
917        didWriteWrapper( io, n );
918
919    if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) {
920        short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
921        io->gotError( io, what, io->userData );
922    }
923
924    return n;
925}
926
927int
928tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
929{
930    int ret;
931
932    assert( tr_isPeerIo( io ) );
933    assert( tr_isDirection( dir ) );
934
935    if( dir==TR_DOWN )
936        ret = tr_peerIoTryRead( io, limit );
937    else
938        ret = tr_peerIoTryWrite( io, limit );
939
940    return ret;
941}
942
943struct evbuffer *
944tr_peerIoGetReadBuffer( tr_peerIo * io )
945{
946    assert( tr_isPeerIo( io ) );
947
948    return io->inbuf;
949}
950
951tr_bool
952tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
953{
954    assert( tr_isPeerIo( io ) );
955    assert( tr_isDirection( dir ) );
956
957    return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
958}
959
960/***
961****
962****/
963
964static void
965event_enable( tr_peerIo * io, short event )
966{
967    assert( tr_isPeerIo( io ) );
968
969    if( event & EV_READ )
970        event_add( &io->event_read, NULL );
971
972    if( event & EV_WRITE )
973        event_add( &io->event_write, NULL );
974}
975
976static void
977event_disable( struct tr_peerIo * io, short event )
978{
979    assert( tr_isPeerIo( io ) );
980
981    if( event & EV_READ )
982        event_del( &io->event_read );
983
984    if( event & EV_WRITE )
985        event_del( &io->event_write );
986}
987
988
989void
990tr_peerIoSetEnabled( tr_peerIo    * io,
991                     tr_direction   dir,
992                     tr_bool        isEnabled )
993{
994    short event;
995
996    assert( tr_isPeerIo( io ) );
997    assert( tr_isDirection( dir ) );
998
999    event = dir == TR_UP ? EV_WRITE : EV_READ;
1000
1001    if( isEnabled )
1002        event_enable( io, event );
1003    else
1004        event_disable( io, event );
1005}
Note: See TracBrowser for help on using the repository browser.