source: trunk/libtransmission/peer-io.c @ 11718

Last change on this file since 11718 was 11718, checked in by jordan, 11 years ago

(trunk libT) #3921 "no overhead for reads" -- fixed. thanks to jch for reporting this.

  • Property svn:keywords set to Date Rev Author Id
File size: 25.7 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11718 2011-01-20 00:31:46Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28
29#include "transmission.h"
30#include "session.h"
31#include "bandwidth.h"
32#include "crypto.h"
33#include "list.h"
34#include "net.h"
35#include "peer-common.h" /* MAX_BLOCK_SIZE */
36#include "peer-io.h"
37#include "trevent.h" /* tr_runInEventThread() */
38#include "utils.h"
39
40#define MAGIC_NUMBER 206745
41
42#ifdef WIN32
43 #define EAGAIN       WSAEWOULDBLOCK
44 #define EINTR        WSAEINTR
45 #define EINPROGRESS  WSAEINPROGRESS
46 #define EPIPE        WSAECONNRESET
47#endif
48
49static size_t
50guessPacketOverhead( size_t d )
51{
52    /**
53     * http://sd.wareonearth.com/~phil/net/overhead/
54     *
55     * TCP over Ethernet:
56     * Assuming no header compression (e.g. not PPP)
57     * Add 20 IPv4 header or 40 IPv6 header (no options)
58     * Add 20 TCP header
59     * Add 12 bytes optional TCP timestamps
60     * Max TCP Payload data rates over ethernet are thus:
61     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
62     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
63     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
64     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
65     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
66     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
67     */
68    const double assumed_payload_data_rate = 94.0;
69
70    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
71}
72
73/**
74***
75**/
76
77#define dbgmsg( io, ... ) \
78    do { \
79        if( tr_deepLoggingIsActive( ) ) \
80            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
81    } while( 0 )
82
83struct tr_datatype
84{
85    tr_bool  isPieceData;
86    size_t   length;
87};
88
89/***
90****
91***/
92
93static void
94didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
95{
96     while( bytes_transferred && tr_isPeerIo( io ) )
97     {
98        struct tr_datatype * next = io->outbuf_datatypes->data;
99
100        const unsigned int payload = MIN( next->length, bytes_transferred );
101        const unsigned int overhead = guessPacketOverhead( payload );
102        const uint64_t now = tr_time_msec( );
103
104        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
105
106        if( overhead > 0 )
107            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
108
109        if( io->didWrite )
110            io->didWrite( io, payload, next->isPieceData, io->userData );
111
112        if( tr_isPeerIo( io ) )
113        {
114            bytes_transferred -= payload;
115            next->length -= payload;
116            if( !next->length ) {
117                tr_list_pop_front( &io->outbuf_datatypes );
118                tr_free( next );
119            }
120        }
121    }
122}
123
124static void
125canReadWrapper( tr_peerIo * io )
126{
127    tr_bool err = 0;
128    tr_bool done = 0;
129    tr_session * session;
130
131    dbgmsg( io, "canRead" );
132
133    assert( tr_isPeerIo( io ) );
134    assert( tr_isSession( io->session ) );
135    tr_peerIoRef( io );
136
137    session = io->session;
138
139    /* try to consume the input buffer */
140    if( io->canRead )
141    {
142        tr_sessionLock( session );
143
144        while( !done && !err )
145        {
146            size_t piece = 0;
147            const size_t oldLen = evbuffer_get_length( io->inbuf );
148            const int ret = io->canRead( io, io->userData, &piece );
149            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
150            const unsigned int overhead = guessPacketOverhead( used );
151            const uint64_t now = tr_time_msec( );
152
153            assert( tr_isPeerIo( io ) );
154
155            if( piece || (piece!=used) )
156            {
157                if( piece )
158                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
159
160                if( used != piece )
161                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
162            }
163
164            if( overhead > 0 )
165                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
166
167            switch( ret )
168            {
169                case READ_NOW:
170                    if( evbuffer_get_length( io->inbuf ) )
171                        continue;
172                    done = 1;
173                    break;
174
175                case READ_LATER:
176                    done = 1;
177                    break;
178
179                case READ_ERR:
180                    err = 1;
181                    break;
182            }
183
184            assert( tr_isPeerIo( io ) );
185        }
186
187        tr_sessionUnlock( session );
188    }
189
190    assert( tr_isPeerIo( io ) );
191    tr_peerIoUnref( io );
192}
193
194tr_bool
195tr_isPeerIo( const tr_peerIo * io )
196{
197    return ( io != NULL )
198        && ( io->magicNumber == MAGIC_NUMBER )
199        && ( io->refCount >= 0 )
200        && ( tr_isBandwidth( &io->bandwidth ) )
201        && ( tr_isAddress( &io->addr ) );
202}
203
204static void
205event_read_cb( int fd, short event UNUSED, void * vio )
206{
207    int res;
208    int e;
209    tr_peerIo * io = vio;
210
211    /* Limit the input buffer to 256K, so it doesn't grow too large */
212    unsigned int howmuch;
213    unsigned int curlen;
214    const tr_direction dir = TR_DOWN;
215    const unsigned int max = 256 * 1024;
216
217    assert( tr_isPeerIo( io ) );
218
219    io->pendingEvents &= ~EV_READ;
220
221    curlen = evbuffer_get_length( io->inbuf );
222    howmuch = curlen >= max ? 0 : max - curlen;
223    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
224
225    dbgmsg( io, "libevent says this peer is ready to read" );
226
227    /* if we don't have any bandwidth left, stop reading */
228    if( howmuch < 1 ) {
229        tr_peerIoSetEnabled( io, dir, FALSE );
230        return;
231    }
232
233    EVUTIL_SET_SOCKET_ERROR( 0 );
234    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
235    e = EVUTIL_SOCKET_ERROR( );
236
237    if( res > 0 )
238    {
239        tr_peerIoSetEnabled( io, dir, TRUE );
240
241        /* Invoke the user callback - must always be called last */
242        canReadWrapper( io );
243    }
244    else
245    {
246        char errstr[512];
247        short what = BEV_EVENT_READING;
248
249        if( res == 0 ) /* EOF */
250            what |= BEV_EVENT_EOF;
251        else if( res == -1 ) {
252            if( e == EAGAIN || e == EINTR ) {
253                tr_peerIoSetEnabled( io, dir, TRUE );
254                return;
255            }
256            what |= BEV_EVENT_ERROR;
257        }
258
259        tr_net_strerror( errstr, sizeof( errstr ), e );
260        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
261
262        if( io->gotError != NULL )
263            io->gotError( io, what, io->userData );
264    }
265}
266
267static int
268tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
269{
270    int e;
271    int n;
272    char errstr[256];
273
274    EVUTIL_SET_SOCKET_ERROR( 0 );
275    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
276    e = EVUTIL_SOCKET_ERROR( );
277    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
278
279    return n;
280}
281
282static void
283event_write_cb( int fd, short event UNUSED, void * vio )
284{
285    int res = 0;
286    int e;
287    short what = BEV_EVENT_WRITING;
288    tr_peerIo * io = vio;
289    size_t howmuch;
290    const tr_direction dir = TR_UP;
291
292    assert( tr_isPeerIo( io ) );
293
294    io->pendingEvents &= ~EV_WRITE;
295
296    dbgmsg( io, "libevent says this peer is ready to write" );
297
298    /* Write as much as possible, since the socket is non-blocking, write() will
299     * return if it can't write any more data without blocking */
300    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
301
302    /* if we don't have any bandwidth left, stop writing */
303    if( howmuch < 1 ) {
304        tr_peerIoSetEnabled( io, dir, FALSE );
305        return;
306    }
307
308    EVUTIL_SET_SOCKET_ERROR( 0 );
309    res = tr_evbuffer_write( io, fd, howmuch );
310    e = EVUTIL_SOCKET_ERROR( );
311
312    if (res == -1) {
313        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
314            goto reschedule;
315        /* error case */
316        what |= BEV_EVENT_ERROR;
317    } else if (res == 0) {
318        /* eof case */
319        what |= BEV_EVENT_EOF;
320    }
321    if (res <= 0)
322        goto error;
323
324    if( evbuffer_get_length( io->outbuf ) )
325        tr_peerIoSetEnabled( io, dir, TRUE );
326
327    didWriteWrapper( io, res );
328    return;
329
330 reschedule:
331    if( evbuffer_get_length( io->outbuf ) )
332        tr_peerIoSetEnabled( io, dir, TRUE );
333    return;
334
335 error:
336
337    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
338
339    if( io->gotError != NULL )
340        io->gotError( io, what, io->userData );
341}
342
343/**
344***
345**/
346
347static void
348maybeSetCongestionAlgorithm( int socket, const char * algorithm )
349{
350    if( algorithm && *algorithm )
351    {
352        const int rc = tr_netSetCongestionControl( socket, algorithm );
353
354        if( rc < 0 )
355            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
356                     algorithm, tr_strerror( errno ));
357    }
358}
359
360static tr_peerIo*
361tr_peerIoNew( tr_session       * session,
362              tr_bandwidth     * parent,
363              const tr_address * addr,
364              tr_port            port,
365              const uint8_t    * torrentHash,
366              tr_bool            isIncoming,
367              tr_bool            isSeed,
368              int                socket )
369{
370    tr_peerIo * io;
371
372    assert( session != NULL );
373    assert( session->events != NULL );
374    assert( tr_isBool( isIncoming ) );
375    assert( tr_isBool( isSeed ) );
376    assert( tr_amInEventThread( session ) );
377
378    if( socket >= 0 ) {
379        tr_netSetTOS( socket, session->peerSocketTOS );
380        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
381    }
382
383    io = tr_new0( tr_peerIo, 1 );
384    io->magicNumber = MAGIC_NUMBER;
385    io->refCount = 1;
386    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
387    io->session = session;
388    io->addr = *addr;
389    io->isSeed = isSeed;
390    io->port = port;
391    io->socket = socket;
392    io->isIncoming = isIncoming != 0;
393    io->timeCreated = tr_time( );
394    io->inbuf = evbuffer_new( );
395    io->outbuf = evbuffer_new( );
396    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
397    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
398    tr_bandwidthConstruct( &io->bandwidth, session, parent );
399    tr_bandwidthSetPeer( &io->bandwidth, io );
400    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
401
402    return io;
403}
404
405tr_peerIo*
406tr_peerIoNewIncoming( tr_session        * session,
407                      tr_bandwidth      * parent,
408                      const tr_address  * addr,
409                      tr_port             port,
410                      int                 fd )
411{
412    assert( session );
413    assert( tr_isAddress( addr ) );
414    assert( fd >= 0 );
415
416    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, fd );
417}
418
419tr_peerIo*
420tr_peerIoNewOutgoing( tr_session        * session,
421                      tr_bandwidth      * parent,
422                      const tr_address  * addr,
423                      tr_port             port,
424                      const uint8_t     * torrentHash,
425                      tr_bool             isSeed )
426{
427    int fd;
428
429    assert( session );
430    assert( tr_isAddress( addr ) );
431    assert( torrentHash );
432
433    fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
434    dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
435
436    return fd < 0 ? NULL
437                  : tr_peerIoNew( session, parent, addr, port, torrentHash, FALSE, isSeed, fd );
438}
439
440/***
441****
442***/
443
444static void
445event_enable( tr_peerIo * io, short event )
446{
447    assert( tr_amInEventThread( io->session ) );
448    assert( io->session != NULL );
449    assert( io->session->events != NULL );
450    assert( event_initialized( io->event_read ) );
451    assert( event_initialized( io->event_write ) );
452
453    if( io->socket < 0 )
454        return;
455
456    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
457    {
458        dbgmsg( io, "enabling libevent ready-to-read polling" );
459        event_add( io->event_read, NULL );
460        io->pendingEvents |= EV_READ;
461    }
462
463    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
464    {
465        dbgmsg( io, "enabling libevent ready-to-write polling" );
466        event_add( io->event_write, NULL );
467        io->pendingEvents |= EV_WRITE;
468    }
469}
470
471static void
472event_disable( struct tr_peerIo * io, short event )
473{
474    assert( tr_amInEventThread( io->session ) );
475    assert( io->session != NULL );
476    assert( io->session->events != NULL );
477    assert( event_initialized( io->event_read ) );
478    assert( event_initialized( io->event_write ) );
479
480    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
481    {
482        dbgmsg( io, "disabling libevent ready-to-read polling" );
483        event_del( io->event_read );
484        io->pendingEvents &= ~EV_READ;
485    }
486
487    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
488    {
489        dbgmsg( io, "disabling libevent ready-to-write polling" );
490        event_del( io->event_write );
491        io->pendingEvents &= ~EV_WRITE;
492    }
493}
494
495void
496tr_peerIoSetEnabled( tr_peerIo    * io,
497                     tr_direction   dir,
498                     tr_bool        isEnabled )
499{
500    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
501
502    assert( tr_isPeerIo( io ) );
503    assert( tr_isDirection( dir ) );
504    assert( tr_amInEventThread( io->session ) );
505    assert( io->session->events != NULL );
506
507    if( isEnabled )
508        event_enable( io, event );
509    else
510        event_disable( io, event );
511}
512
513/***
514****
515***/
516
517static void
518io_dtor( void * vio )
519{
520    tr_peerIo * io = vio;
521
522    assert( tr_isPeerIo( io ) );
523    assert( tr_amInEventThread( io->session ) );
524    assert( io->session->events != NULL );
525
526    dbgmsg( io, "in tr_peerIo destructor" );
527    event_disable( io, EV_READ | EV_WRITE );
528    event_free( io->event_read );
529    event_free( io->event_write );
530    tr_bandwidthDestruct( &io->bandwidth );
531    evbuffer_free( io->outbuf );
532    evbuffer_free( io->inbuf );
533    tr_netClose( io->session, io->socket );
534    tr_cryptoFree( io->crypto );
535    tr_list_free( &io->outbuf_datatypes, tr_free );
536
537    memset( io, ~0, sizeof( tr_peerIo ) );
538    tr_free( io );
539}
540
541static void
542tr_peerIoFree( tr_peerIo * io )
543{
544    if( io )
545    {
546        dbgmsg( io, "in tr_peerIoFree" );
547        io->canRead = NULL;
548        io->didWrite = NULL;
549        io->gotError = NULL;
550        tr_runInEventThread( io->session, io_dtor, io );
551    }
552}
553
554void
555tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
556{
557    assert( tr_isPeerIo( io ) );
558
559    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
560                file, line, io->refCount, io->refCount+1 );
561
562    ++io->refCount;
563}
564
565void
566tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
567{
568    assert( tr_isPeerIo( io ) );
569
570    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
571                file, line, io->refCount, io->refCount-1 );
572
573    if( !--io->refCount )
574        tr_peerIoFree( io );
575}
576
577const tr_address*
578tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
579{
580    assert( tr_isPeerIo( io ) );
581
582    if( port )
583        *port = io->port;
584
585    return &io->addr;
586}
587
588const char*
589tr_peerIoAddrStr( const tr_address * addr, tr_port port )
590{
591    static char buf[512];
592
593    if( addr->type == TR_AF_INET )
594        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
595    else
596        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
597    return buf;
598}
599
600const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
601{
602    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
603}
604
605void
606tr_peerIoSetIOFuncs( tr_peerIo        * io,
607                     tr_can_read_cb     readcb,
608                     tr_did_write_cb    writecb,
609                     tr_net_error_cb    errcb,
610                     void             * userData )
611{
612    io->canRead = readcb;
613    io->didWrite = writecb;
614    io->gotError = errcb;
615    io->userData = userData;
616}
617
618void
619tr_peerIoClear( tr_peerIo * io )
620{
621    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
622    tr_peerIoSetEnabled( io, TR_UP, FALSE );
623    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
624}
625
626int
627tr_peerIoReconnect( tr_peerIo * io )
628{
629    short int pendingEvents;
630    tr_session * session;
631
632    assert( tr_isPeerIo( io ) );
633    assert( !tr_peerIoIsIncoming( io ) );
634
635    session = tr_peerIoGetSession( io );
636
637    pendingEvents = io->pendingEvents;
638    event_disable( io, EV_READ | EV_WRITE );
639
640    if( io->socket >= 0 )
641        tr_netClose( session, io->socket );
642
643    event_del( io->event_read );
644    event_del( io->event_write );
645    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
646    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
647    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
648
649    if( io->socket >= 0 )
650    {
651        event_enable( io, pendingEvents );
652        tr_netSetTOS( io->socket, session->peerSocketTOS );
653        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
654        return 0;
655    }
656
657    return -1;
658}
659
660/**
661***
662**/
663
664void
665tr_peerIoSetTorrentHash( tr_peerIo *     io,
666                         const uint8_t * hash )
667{
668    assert( tr_isPeerIo( io ) );
669
670    tr_cryptoSetTorrentHash( io->crypto, hash );
671}
672
673const uint8_t*
674tr_peerIoGetTorrentHash( tr_peerIo * io )
675{
676    assert( tr_isPeerIo( io ) );
677    assert( io->crypto );
678
679    return tr_cryptoGetTorrentHash( io->crypto );
680}
681
682int
683tr_peerIoHasTorrentHash( const tr_peerIo * io )
684{
685    assert( tr_isPeerIo( io ) );
686    assert( io->crypto );
687
688    return tr_cryptoHasTorrentHash( io->crypto );
689}
690
691/**
692***
693**/
694
695void
696tr_peerIoSetPeersId( tr_peerIo *     io,
697                     const uint8_t * peer_id )
698{
699    assert( tr_isPeerIo( io ) );
700
701    if( ( io->peerIdIsSet = peer_id != NULL ) )
702        memcpy( io->peerId, peer_id, 20 );
703    else
704        memset( io->peerId, 0, 20 );
705}
706
707/**
708***
709**/
710
711static unsigned int
712getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
713{
714    /* this is all kind of arbitrary, but what seems to work well is
715     * being large enough to hold the next 20 seconds' worth of input,
716     * or a few blocks, whichever is bigger.
717     * It's okay to tweak this as needed */
718    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
719    const unsigned int period = 15u; /* arbitrary */
720    /* the 3 is arbitrary; the .5 is to leave room for messages */
721    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
722    return MAX( ceiling, currentSpeed_Bps*period );
723}
724
725size_t
726tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
727{
728    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
729    const size_t currentLen = evbuffer_get_length( io->outbuf );
730    size_t freeSpace = 0;
731
732    if( desiredLen > currentLen )
733        freeSpace = desiredLen - currentLen;
734
735    return freeSpace;
736}
737
738/**
739***
740**/
741
742void
743tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
744{
745    assert( tr_isPeerIo( io ) );
746    assert( encryptionMode == PEER_ENCRYPTION_NONE
747         || encryptionMode == PEER_ENCRYPTION_RC4 );
748
749    io->encryptionMode = encryptionMode;
750}
751
752/**
753***
754**/
755
756static void
757addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
758{
759    struct tr_datatype * d;
760
761    d = tr_new( struct tr_datatype, 1 );
762    d->isPieceData = isPieceData != 0;
763    d->length = byteCount;
764    tr_list_append( &io->outbuf_datatypes, d );
765}
766
767static struct evbuffer_iovec *
768evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
769{
770    const size_t byteCount = evbuffer_get_length( buf );
771    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
772    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
773    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
774    assert( vecCount == n );
775    *setme_vecCount = vecCount;
776    return iovec;
777}
778
779static void
780maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
781{
782    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
783    {
784        size_t i, n;
785        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
786
787        for( i=0; i<n; ++i )
788            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
789
790        tr_free( iovec );
791    }
792}
793
794void
795tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
796{
797    const size_t byteCount = evbuffer_get_length( buf );
798    maybeEncryptBuffer( io, buf );
799    evbuffer_add_buffer( io->outbuf, buf );
800    addDatatype( io, byteCount, isPieceData );
801}
802
803void
804tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
805{
806    struct evbuffer * buf = evbuffer_new( );
807    evbuffer_add( buf, bytes, byteCount );
808    tr_peerIoWriteBuf( io, buf, isPieceData );
809    evbuffer_free( buf );
810}
811
812/***
813****
814***/
815
816void
817evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
818{
819    const uint16_t ns = htons( addme_hs );
820    evbuffer_add( outbuf, &ns, sizeof( ns ) );
821}
822
823void
824evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
825{
826    const uint32_t nl = htonl( addme_hl );
827    evbuffer_add( outbuf, &nl, sizeof( nl ) );
828}
829
830/***
831****
832***/
833
834void
835tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
836{
837    assert( tr_isPeerIo( io ) );
838    assert( evbuffer_get_length( inbuf )  >= byteCount );
839
840    switch( io->encryptionMode )
841    {
842        case PEER_ENCRYPTION_NONE:
843            evbuffer_remove( inbuf, bytes, byteCount );
844            break;
845
846        case PEER_ENCRYPTION_RC4:
847            evbuffer_remove( inbuf, bytes, byteCount );
848            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
849            break;
850
851        default:
852            assert( 0 );
853    }
854}
855
856void
857tr_peerIoReadUint16( tr_peerIo        * io,
858                     struct evbuffer  * inbuf,
859                     uint16_t         * setme )
860{
861    uint16_t tmp;
862    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
863    *setme = ntohs( tmp );
864}
865
866void tr_peerIoReadUint32( tr_peerIo        * io,
867                          struct evbuffer  * inbuf,
868                          uint32_t         * setme )
869{
870    uint32_t tmp;
871    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
872    *setme = ntohl( tmp );
873}
874
875void
876tr_peerIoDrain( tr_peerIo       * io,
877                struct evbuffer * inbuf,
878                size_t            byteCount )
879{
880    void * buf = tr_sessionGetBuffer( io->session );
881    const size_t buflen = SESSION_BUFFER_SIZE;
882
883    while( byteCount > 0 )
884    {
885        const size_t thisPass = MIN( byteCount, buflen );
886        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
887        byteCount -= thisPass;
888    }
889
890    tr_sessionReleaseBuffer( io->session );
891}
892
893/***
894****
895***/
896
897static int
898tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
899{
900    int res = 0;
901
902    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
903    {
904        int e;
905
906        EVUTIL_SET_SOCKET_ERROR( 0 );
907        res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
908        e = EVUTIL_SOCKET_ERROR( );
909
910        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
911
912        if( evbuffer_get_length( io->inbuf ) )
913            canReadWrapper( io );
914
915        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
916        {
917            char errstr[512];
918            short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
919            if( res == 0 )
920                what |= BEV_EVENT_EOF;
921            tr_net_strerror( errstr, sizeof( errstr ), e );
922            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
923            io->gotError( io, what, io->userData );
924        }
925    }
926
927    return res;
928}
929
930static int
931tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
932{
933    int n = 0;
934
935    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
936    {
937        int e;
938        EVUTIL_SET_SOCKET_ERROR( 0 );
939        n = tr_evbuffer_write( io, io->socket, howmuch );
940        e = EVUTIL_SOCKET_ERROR( );
941
942        if( n > 0 )
943            didWriteWrapper( io, n );
944
945        if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
946        {
947            char errstr[512];
948            const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
949
950            tr_net_strerror( errstr, sizeof( errstr ), e );
951            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
952
953            if( io->gotError != NULL )
954                io->gotError( io, what, io->userData );
955        }
956    }
957
958    return n;
959}
960
961int
962tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
963{
964    int bytesUsed = 0;
965
966    assert( tr_isPeerIo( io ) );
967    assert( tr_isDirection( dir ) );
968
969    if( dir == TR_DOWN )
970        bytesUsed = tr_peerIoTryRead( io, limit );
971    else
972        bytesUsed = tr_peerIoTryWrite( io, limit );
973
974    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
975    return bytesUsed;
976}
977
978int
979tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
980{
981    size_t byteCount = 0;
982    tr_list * it;
983
984    /* count up how many bytes are used by non-piece-data messages
985       at the front of our outbound queue */
986    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
987    {
988        struct tr_datatype * d = it->data;
989
990        if( d->isPieceData )
991            break;
992
993        byteCount += d->length;
994    }
995
996    return tr_peerIoFlush( io, TR_UP, byteCount );
997}
Note: See TracBrowser for help on using the repository browser.