source: trunk/libtransmission/peer-io.c @ 11857

Last change on this file since 11857 was 11857, checked in by jordan, 11 years ago

(trunk libT) fix compiler warning: "peer-io.h:282: warning: inlining failed in call to ‘evbuffer_add_uint8’: optimizing for size and code size would grow"

  • Property svn:keywords set to Date Rev Author Id
File size: 25.8 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11857 2011-02-09 02:35:16Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28
29#include "transmission.h"
30#include "session.h"
31#include "bandwidth.h"
32#include "crypto.h"
33#include "list.h"
34#include "net.h"
35#include "peer-common.h" /* MAX_BLOCK_SIZE */
36#include "peer-io.h"
37#include "trevent.h" /* tr_runInEventThread() */
38#include "utils.h"
39
40#define MAGIC_NUMBER 206745
41
42#ifdef WIN32
43 #define EAGAIN       WSAEWOULDBLOCK
44 #define EINTR        WSAEINTR
45 #define EINPROGRESS  WSAEINPROGRESS
46 #define EPIPE        WSAECONNRESET
47#endif
48
49static size_t
50guessPacketOverhead( size_t d )
51{
52    /**
53     * http://sd.wareonearth.com/~phil/net/overhead/
54     *
55     * TCP over Ethernet:
56     * Assuming no header compression (e.g. not PPP)
57     * Add 20 IPv4 header or 40 IPv6 header (no options)
58     * Add 20 TCP header
59     * Add 12 bytes optional TCP timestamps
60     * Max TCP Payload data rates over ethernet are thus:
61     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
62     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
63     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
64     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
65     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
66     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
67     */
68    const double assumed_payload_data_rate = 94.0;
69
70    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
71}
72
73/**
74***
75**/
76
77#define dbgmsg( io, ... ) \
78    do { \
79        if( tr_deepLoggingIsActive( ) ) \
80            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
81    } while( 0 )
82
83struct tr_datatype
84{
85    tr_bool  isPieceData;
86    size_t   length;
87};
88
89
90/***
91****
92***/
93
94static void
95didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
96{
97     while( bytes_transferred && tr_isPeerIo( io ) )
98     {
99        struct tr_datatype * next = io->outbuf_datatypes->data;
100
101        const unsigned int payload = MIN( next->length, bytes_transferred );
102        const unsigned int overhead = guessPacketOverhead( payload );
103        const uint64_t now = tr_sessionGetTimeMsec( io->session );
104
105        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
106
107        if( overhead > 0 )
108            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
109
110        if( io->didWrite )
111            io->didWrite( io, payload, next->isPieceData, io->userData );
112
113        if( tr_isPeerIo( io ) )
114        {
115            bytes_transferred -= payload;
116            next->length -= payload;
117            if( !next->length ) {
118                tr_list_pop_front( &io->outbuf_datatypes );
119                tr_free( next );
120            }
121        }
122    }
123}
124
125static void
126canReadWrapper( tr_peerIo * io )
127{
128    tr_bool err = 0;
129    tr_bool done = 0;
130    tr_session * session;
131
132    dbgmsg( io, "canRead" );
133
134    assert( tr_isPeerIo( io ) );
135    assert( tr_isSession( io->session ) );
136    tr_peerIoRef( io );
137
138    session = io->session;
139
140    /* try to consume the input buffer */
141    if( io->canRead )
142    {
143        const uint64_t now = tr_sessionGetTimeMsec( io->session );
144
145        tr_sessionLock( session );
146
147        while( !done && !err )
148        {
149            size_t piece = 0;
150            const size_t oldLen = evbuffer_get_length( io->inbuf );
151            const int ret = io->canRead( io, io->userData, &piece );
152            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
153            const unsigned int overhead = guessPacketOverhead( used );
154
155            assert( tr_isPeerIo( io ) );
156
157            if( piece || (piece!=used) )
158            {
159                if( piece )
160                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
161
162                if( used != piece )
163                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
164            }
165
166            if( overhead > 0 )
167                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
168
169            switch( ret )
170            {
171                case READ_NOW:
172                    if( evbuffer_get_length( io->inbuf ) )
173                        continue;
174                    done = 1;
175                    break;
176
177                case READ_LATER:
178                    done = 1;
179                    break;
180
181                case READ_ERR:
182                    err = 1;
183                    break;
184            }
185
186            assert( tr_isPeerIo( io ) );
187        }
188
189        tr_sessionUnlock( session );
190    }
191
192    assert( tr_isPeerIo( io ) );
193    tr_peerIoUnref( io );
194}
195
196tr_bool
197tr_isPeerIo( const tr_peerIo * io )
198{
199    return ( io != NULL )
200        && ( io->magicNumber == MAGIC_NUMBER )
201        && ( io->refCount >= 0 )
202        && ( tr_isBandwidth( &io->bandwidth ) )
203        && ( tr_isAddress( &io->addr ) );
204}
205
206static void
207event_read_cb( int fd, short event UNUSED, void * vio )
208{
209    int res;
210    int e;
211    tr_peerIo * io = vio;
212
213    /* Limit the input buffer to 256K, so it doesn't grow too large */
214    unsigned int howmuch;
215    unsigned int curlen;
216    const tr_direction dir = TR_DOWN;
217    const unsigned int max = 256 * 1024;
218
219    assert( tr_isPeerIo( io ) );
220
221    io->pendingEvents &= ~EV_READ;
222
223    curlen = evbuffer_get_length( io->inbuf );
224    howmuch = curlen >= max ? 0 : max - curlen;
225    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
226
227    dbgmsg( io, "libevent says this peer is ready to read" );
228
229    /* if we don't have any bandwidth left, stop reading */
230    if( howmuch < 1 ) {
231        tr_peerIoSetEnabled( io, dir, FALSE );
232        return;
233    }
234
235    EVUTIL_SET_SOCKET_ERROR( 0 );
236    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
237    e = EVUTIL_SOCKET_ERROR( );
238
239    if( res > 0 )
240    {
241        tr_peerIoSetEnabled( io, dir, TRUE );
242
243        /* Invoke the user callback - must always be called last */
244        canReadWrapper( io );
245    }
246    else
247    {
248        char errstr[512];
249        short what = BEV_EVENT_READING;
250
251        if( res == 0 ) /* EOF */
252            what |= BEV_EVENT_EOF;
253        else if( res == -1 ) {
254            if( e == EAGAIN || e == EINTR ) {
255                tr_peerIoSetEnabled( io, dir, TRUE );
256                return;
257            }
258            what |= BEV_EVENT_ERROR;
259        }
260
261        tr_net_strerror( errstr, sizeof( errstr ), e );
262        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
263
264        if( io->gotError != NULL )
265            io->gotError( io, what, io->userData );
266    }
267}
268
269static int
270tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
271{
272    int e;
273    int n;
274    char errstr[256];
275
276    EVUTIL_SET_SOCKET_ERROR( 0 );
277    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
278    e = EVUTIL_SOCKET_ERROR( );
279    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
280
281    return n;
282}
283
284static void
285event_write_cb( int fd, short event UNUSED, void * vio )
286{
287    int res = 0;
288    int e;
289    short what = BEV_EVENT_WRITING;
290    tr_peerIo * io = vio;
291    size_t howmuch;
292    const tr_direction dir = TR_UP;
293
294    assert( tr_isPeerIo( io ) );
295
296    io->pendingEvents &= ~EV_WRITE;
297
298    dbgmsg( io, "libevent says this peer is ready to write" );
299
300    /* Write as much as possible, since the socket is non-blocking, write() will
301     * return if it can't write any more data without blocking */
302    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
303
304    /* if we don't have any bandwidth left, stop writing */
305    if( howmuch < 1 ) {
306        tr_peerIoSetEnabled( io, dir, FALSE );
307        return;
308    }
309
310    EVUTIL_SET_SOCKET_ERROR( 0 );
311    res = tr_evbuffer_write( io, fd, howmuch );
312    e = EVUTIL_SOCKET_ERROR( );
313
314    if (res == -1) {
315        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
316            goto reschedule;
317        /* error case */
318        what |= BEV_EVENT_ERROR;
319    } else if (res == 0) {
320        /* eof case */
321        what |= BEV_EVENT_EOF;
322    }
323    if (res <= 0)
324        goto error;
325
326    if( evbuffer_get_length( io->outbuf ) )
327        tr_peerIoSetEnabled( io, dir, TRUE );
328
329    didWriteWrapper( io, res );
330    return;
331
332 reschedule:
333    if( evbuffer_get_length( io->outbuf ) )
334        tr_peerIoSetEnabled( io, dir, TRUE );
335    return;
336
337 error:
338
339    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
340
341    if( io->gotError != NULL )
342        io->gotError( io, what, io->userData );
343}
344
345/**
346***
347**/
348
349static void
350maybeSetCongestionAlgorithm( int socket, const char * algorithm )
351{
352    if( algorithm && *algorithm )
353    {
354        const int rc = tr_netSetCongestionControl( socket, algorithm );
355
356        if( rc < 0 )
357            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
358                     algorithm, tr_strerror( errno ));
359    }
360}
361
362static tr_peerIo*
363tr_peerIoNew( tr_session       * session,
364              tr_bandwidth     * parent,
365              const tr_address * addr,
366              tr_port            port,
367              const uint8_t    * torrentHash,
368              tr_bool            isIncoming,
369              tr_bool            isSeed,
370              int                socket )
371{
372    tr_peerIo * io;
373
374    assert( session != NULL );
375    assert( session->events != NULL );
376    assert( tr_isBool( isIncoming ) );
377    assert( tr_isBool( isSeed ) );
378    assert( tr_amInEventThread( session ) );
379
380    if( socket >= 0 ) {
381        tr_netSetTOS( socket, session->peerSocketTOS );
382        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
383    }
384
385    io = tr_new0( tr_peerIo, 1 );
386    io->magicNumber = MAGIC_NUMBER;
387    io->refCount = 1;
388    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
389    io->session = session;
390    io->addr = *addr;
391    io->isSeed = isSeed;
392    io->port = port;
393    io->socket = socket;
394    io->isIncoming = isIncoming != 0;
395    io->timeCreated = tr_time( );
396    io->inbuf = evbuffer_new( );
397    io->outbuf = evbuffer_new( );
398    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
399    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
400    tr_bandwidthConstruct( &io->bandwidth, session, parent );
401    tr_bandwidthSetPeer( &io->bandwidth, io );
402    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
403
404    return io;
405}
406
407tr_peerIo*
408tr_peerIoNewIncoming( tr_session        * session,
409                      tr_bandwidth      * parent,
410                      const tr_address  * addr,
411                      tr_port             port,
412                      int                 fd )
413{
414    assert( session );
415    assert( tr_isAddress( addr ) );
416    assert( fd >= 0 );
417
418    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, fd );
419}
420
421tr_peerIo*
422tr_peerIoNewOutgoing( tr_session        * session,
423                      tr_bandwidth      * parent,
424                      const tr_address  * addr,
425                      tr_port             port,
426                      const uint8_t     * torrentHash,
427                      tr_bool             isSeed )
428{
429    int fd;
430
431    assert( session );
432    assert( tr_isAddress( addr ) );
433    assert( torrentHash );
434
435    fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
436    dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
437
438    return fd < 0 ? NULL
439                  : tr_peerIoNew( session, parent, addr, port, torrentHash, FALSE, isSeed, fd );
440}
441
442/***
443****
444***/
445
446static void
447event_enable( tr_peerIo * io, short event )
448{
449    assert( tr_amInEventThread( io->session ) );
450    assert( io->session != NULL );
451    assert( io->session->events != NULL );
452    assert( event_initialized( io->event_read ) );
453    assert( event_initialized( io->event_write ) );
454
455    if( io->socket < 0 )
456        return;
457
458    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
459    {
460        dbgmsg( io, "enabling libevent ready-to-read polling" );
461        event_add( io->event_read, NULL );
462        io->pendingEvents |= EV_READ;
463    }
464
465    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
466    {
467        dbgmsg( io, "enabling libevent ready-to-write polling" );
468        event_add( io->event_write, NULL );
469        io->pendingEvents |= EV_WRITE;
470    }
471}
472
473static void
474event_disable( struct tr_peerIo * io, short event )
475{
476    assert( tr_amInEventThread( io->session ) );
477    assert( io->session != NULL );
478    assert( io->session->events != NULL );
479    assert( event_initialized( io->event_read ) );
480    assert( event_initialized( io->event_write ) );
481
482    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
483    {
484        dbgmsg( io, "disabling libevent ready-to-read polling" );
485        event_del( io->event_read );
486        io->pendingEvents &= ~EV_READ;
487    }
488
489    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
490    {
491        dbgmsg( io, "disabling libevent ready-to-write polling" );
492        event_del( io->event_write );
493        io->pendingEvents &= ~EV_WRITE;
494    }
495}
496
497void
498tr_peerIoSetEnabled( tr_peerIo    * io,
499                     tr_direction   dir,
500                     tr_bool        isEnabled )
501{
502    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
503
504    assert( tr_isPeerIo( io ) );
505    assert( tr_isDirection( dir ) );
506    assert( tr_amInEventThread( io->session ) );
507    assert( io->session->events != NULL );
508
509    if( isEnabled )
510        event_enable( io, event );
511    else
512        event_disable( io, event );
513}
514
515/***
516****
517***/
518
519static void
520io_dtor( void * vio )
521{
522    tr_peerIo * io = vio;
523
524    assert( tr_isPeerIo( io ) );
525    assert( tr_amInEventThread( io->session ) );
526    assert( io->session->events != NULL );
527
528    dbgmsg( io, "in tr_peerIo destructor" );
529    event_disable( io, EV_READ | EV_WRITE );
530    event_free( io->event_read );
531    event_free( io->event_write );
532    tr_bandwidthDestruct( &io->bandwidth );
533    evbuffer_free( io->outbuf );
534    evbuffer_free( io->inbuf );
535    tr_netClose( io->session, io->socket );
536    tr_cryptoFree( io->crypto );
537    tr_list_free( &io->outbuf_datatypes, tr_free );
538
539    memset( io, ~0, sizeof( tr_peerIo ) );
540    tr_free( io );
541}
542
543static void
544tr_peerIoFree( tr_peerIo * io )
545{
546    if( io )
547    {
548        dbgmsg( io, "in tr_peerIoFree" );
549        io->canRead = NULL;
550        io->didWrite = NULL;
551        io->gotError = NULL;
552        tr_runInEventThread( io->session, io_dtor, io );
553    }
554}
555
556void
557tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
558{
559    assert( tr_isPeerIo( io ) );
560
561    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
562                file, line, io->refCount, io->refCount+1 );
563
564    ++io->refCount;
565}
566
567void
568tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
569{
570    assert( tr_isPeerIo( io ) );
571
572    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
573                file, line, io->refCount, io->refCount-1 );
574
575    if( !--io->refCount )
576        tr_peerIoFree( io );
577}
578
579const tr_address*
580tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
581{
582    assert( tr_isPeerIo( io ) );
583
584    if( port )
585        *port = io->port;
586
587    return &io->addr;
588}
589
590const char*
591tr_peerIoAddrStr( const tr_address * addr, tr_port port )
592{
593    static char buf[512];
594
595    if( addr->type == TR_AF_INET )
596        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
597    else
598        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
599    return buf;
600}
601
602const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
603{
604    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
605}
606
607void
608tr_peerIoSetIOFuncs( tr_peerIo        * io,
609                     tr_can_read_cb     readcb,
610                     tr_did_write_cb    writecb,
611                     tr_net_error_cb    errcb,
612                     void             * userData )
613{
614    io->canRead = readcb;
615    io->didWrite = writecb;
616    io->gotError = errcb;
617    io->userData = userData;
618}
619
620void
621tr_peerIoClear( tr_peerIo * io )
622{
623    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
624    tr_peerIoSetEnabled( io, TR_UP, FALSE );
625    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
626}
627
628int
629tr_peerIoReconnect( tr_peerIo * io )
630{
631    short int pendingEvents;
632    tr_session * session;
633
634    assert( tr_isPeerIo( io ) );
635    assert( !tr_peerIoIsIncoming( io ) );
636
637    session = tr_peerIoGetSession( io );
638
639    pendingEvents = io->pendingEvents;
640    event_disable( io, EV_READ | EV_WRITE );
641
642    if( io->socket >= 0 )
643        tr_netClose( session, io->socket );
644
645    event_free( io->event_read );
646    event_free( io->event_write );
647    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
648    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
649    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
650
651    if( io->socket >= 0 )
652    {
653        event_enable( io, pendingEvents );
654        tr_netSetTOS( io->socket, session->peerSocketTOS );
655        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
656        return 0;
657    }
658
659    return -1;
660}
661
662/**
663***
664**/
665
666void
667tr_peerIoSetTorrentHash( tr_peerIo *     io,
668                         const uint8_t * hash )
669{
670    assert( tr_isPeerIo( io ) );
671
672    tr_cryptoSetTorrentHash( io->crypto, hash );
673}
674
675const uint8_t*
676tr_peerIoGetTorrentHash( tr_peerIo * io )
677{
678    assert( tr_isPeerIo( io ) );
679    assert( io->crypto );
680
681    return tr_cryptoGetTorrentHash( io->crypto );
682}
683
684int
685tr_peerIoHasTorrentHash( const tr_peerIo * io )
686{
687    assert( tr_isPeerIo( io ) );
688    assert( io->crypto );
689
690    return tr_cryptoHasTorrentHash( io->crypto );
691}
692
693/**
694***
695**/
696
697void
698tr_peerIoSetPeersId( tr_peerIo *     io,
699                     const uint8_t * peer_id )
700{
701    assert( tr_isPeerIo( io ) );
702
703    if( ( io->peerIdIsSet = peer_id != NULL ) )
704        memcpy( io->peerId, peer_id, 20 );
705    else
706        memset( io->peerId, 0, 20 );
707}
708
709/**
710***
711**/
712
713static unsigned int
714getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
715{
716    /* this is all kind of arbitrary, but what seems to work well is
717     * being large enough to hold the next 20 seconds' worth of input,
718     * or a few blocks, whichever is bigger.
719     * It's okay to tweak this as needed */
720    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
721    const unsigned int period = 15u; /* arbitrary */
722    /* the 3 is arbitrary; the .5 is to leave room for messages */
723    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
724    return MAX( ceiling, currentSpeed_Bps*period );
725}
726
727size_t
728tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
729{
730    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
731    const size_t currentLen = evbuffer_get_length( io->outbuf );
732    size_t freeSpace = 0;
733
734    if( desiredLen > currentLen )
735        freeSpace = desiredLen - currentLen;
736
737    return freeSpace;
738}
739
740/**
741***
742**/
743
744void
745tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
746{
747    assert( tr_isPeerIo( io ) );
748    assert( encryptionMode == PEER_ENCRYPTION_NONE
749         || encryptionMode == PEER_ENCRYPTION_RC4 );
750
751    io->encryptionMode = encryptionMode;
752}
753
754/**
755***
756**/
757
758static void
759addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
760{
761    struct tr_datatype * d;
762
763    d = tr_new( struct tr_datatype, 1 );
764    d->isPieceData = isPieceData != 0;
765    d->length = byteCount;
766    tr_list_append( &io->outbuf_datatypes, d );
767}
768
769static struct evbuffer_iovec *
770evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
771{
772    const size_t byteCount = evbuffer_get_length( buf );
773    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
774    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
775    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
776    assert( n == vecCount );
777    *setme_vecCount = n;
778    return iovec;
779}
780
781static void
782maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
783{
784    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
785    {
786        size_t i, n;
787        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
788
789        for( i=0; i<n; ++i )
790            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
791
792        tr_free( iovec );
793    }
794}
795
796void
797tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
798{
799    const size_t byteCount = evbuffer_get_length( buf );
800    maybeEncryptBuffer( io, buf );
801    evbuffer_add_buffer( io->outbuf, buf );
802    addDatatype( io, byteCount, isPieceData );
803}
804
805void
806tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
807{
808    struct evbuffer * buf = evbuffer_new( );
809    evbuffer_add( buf, bytes, byteCount );
810    tr_peerIoWriteBuf( io, buf, isPieceData );
811    evbuffer_free( buf );
812}
813
814/***
815****
816***/
817
818void
819evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
820{
821    evbuffer_add( outbuf, &byte, 1 );
822}
823
824void
825evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
826{
827    const uint16_t ns = htons( addme_hs );
828    evbuffer_add( outbuf, &ns, sizeof( ns ) );
829}
830
831void
832evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
833{
834    const uint32_t nl = htonl( addme_hl );
835    evbuffer_add( outbuf, &nl, sizeof( nl ) );
836}
837
838/***
839****
840***/
841
842void
843tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
844{
845    assert( tr_isPeerIo( io ) );
846    assert( evbuffer_get_length( inbuf )  >= byteCount );
847
848    switch( io->encryptionMode )
849    {
850        case PEER_ENCRYPTION_NONE:
851            evbuffer_remove( inbuf, bytes, byteCount );
852            break;
853
854        case PEER_ENCRYPTION_RC4:
855            evbuffer_remove( inbuf, bytes, byteCount );
856            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
857            break;
858
859        default:
860            assert( 0 );
861    }
862}
863
864void
865tr_peerIoReadUint16( tr_peerIo        * io,
866                     struct evbuffer  * inbuf,
867                     uint16_t         * setme )
868{
869    uint16_t tmp;
870    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
871    *setme = ntohs( tmp );
872}
873
874void tr_peerIoReadUint32( tr_peerIo        * io,
875                          struct evbuffer  * inbuf,
876                          uint32_t         * setme )
877{
878    uint32_t tmp;
879    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
880    *setme = ntohl( tmp );
881}
882
883void
884tr_peerIoDrain( tr_peerIo       * io,
885                struct evbuffer * inbuf,
886                size_t            byteCount )
887{
888    void * buf = tr_sessionGetBuffer( io->session );
889    const size_t buflen = SESSION_BUFFER_SIZE;
890
891    while( byteCount > 0 )
892    {
893        const size_t thisPass = MIN( byteCount, buflen );
894        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
895        byteCount -= thisPass;
896    }
897
898    tr_sessionReleaseBuffer( io->session );
899}
900
901/***
902****
903***/
904
905static int
906tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
907{
908    int res = 0;
909
910    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
911    {
912        int e;
913
914        EVUTIL_SET_SOCKET_ERROR( 0 );
915        res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
916        e = EVUTIL_SOCKET_ERROR( );
917
918        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
919
920        if( evbuffer_get_length( io->inbuf ) )
921            canReadWrapper( io );
922
923        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
924        {
925            char errstr[512];
926            short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
927            if( res == 0 )
928                what |= BEV_EVENT_EOF;
929            tr_net_strerror( errstr, sizeof( errstr ), e );
930            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
931            io->gotError( io, what, io->userData );
932        }
933    }
934
935    return res;
936}
937
938static int
939tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
940{
941    int n = 0;
942
943    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
944    {
945        int e;
946        EVUTIL_SET_SOCKET_ERROR( 0 );
947        n = tr_evbuffer_write( io, io->socket, howmuch );
948        e = EVUTIL_SOCKET_ERROR( );
949
950        if( n > 0 )
951            didWriteWrapper( io, n );
952
953        if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
954        {
955            char errstr[512];
956            const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
957
958            tr_net_strerror( errstr, sizeof( errstr ), e );
959            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
960
961            if( io->gotError != NULL )
962                io->gotError( io, what, io->userData );
963        }
964    }
965
966    return n;
967}
968
969int
970tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
971{
972    int bytesUsed = 0;
973
974    assert( tr_isPeerIo( io ) );
975    assert( tr_isDirection( dir ) );
976
977    if( dir == TR_DOWN )
978        bytesUsed = tr_peerIoTryRead( io, limit );
979    else
980        bytesUsed = tr_peerIoTryWrite( io, limit );
981
982    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
983    return bytesUsed;
984}
985
986int
987tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
988{
989    size_t byteCount = 0;
990    tr_list * it;
991
992    /* count up how many bytes are used by non-piece-data messages
993       at the front of our outbound queue */
994    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
995    {
996        struct tr_datatype * d = it->data;
997
998        if( d->isPieceData )
999            break;
1000
1001        byteCount += d->length;
1002    }
1003
1004    return tr_peerIoFlush( io, TR_UP, byteCount );
1005}
Note: See TracBrowser for help on using the repository browser.