source: trunk/libtransmission/peer-io.c @ 11599

Last change on this file since 11599 was 11599, checked in by charles, 11 years ago

(trunk) Join the 21st century and use only 1 space at the end sentences. This commit is nearly as important as the semi-annual ones that remove trailing spaces from the ends of lines of code... :)

  • Property svn:keywords set to Date Rev Author Id
File size: 25.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11599 2010-12-27 19:18:17Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28
29#include "transmission.h"
30#include "session.h"
31#include "bandwidth.h"
32#include "crypto.h"
33#include "list.h"
34#include "net.h"
35#include "peer-common.h" /* MAX_BLOCK_SIZE */
36#include "peer-io.h"
37#include "trevent.h" /* tr_runInEventThread() */
38#include "utils.h"
39
40#define MAGIC_NUMBER 206745
41
42#ifdef WIN32
43 #define EAGAIN       WSAEWOULDBLOCK
44 #define EINTR        WSAEINTR
45 #define EINPROGRESS  WSAEINPROGRESS
46 #define EPIPE        WSAECONNRESET
47#endif
48
49static size_t
50guessPacketOverhead( size_t d )
51{
52    /**
53     * http://sd.wareonearth.com/~phil/net/overhead/
54     *
55     * TCP over Ethernet:
56     * Assuming no header compression (e.g. not PPP)
57     * Add 20 IPv4 header or 40 IPv6 header (no options)
58     * Add 20 TCP header
59     * Add 12 bytes optional TCP timestamps
60     * Max TCP Payload data rates over ethernet are thus:
61     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
62     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
63     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
64     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
65     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
66     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
67     */
68    const double assumed_payload_data_rate = 94.0;
69
70    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
71}
72
73/**
74***
75**/
76
77#define dbgmsg( io, ... ) \
78    do { \
79        if( tr_deepLoggingIsActive( ) ) \
80            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
81    } while( 0 )
82
83struct tr_datatype
84{
85    tr_bool  isPieceData;
86    size_t   length;
87};
88
89/***
90****
91***/
92
93static void
94didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
95{
96     while( bytes_transferred && tr_isPeerIo( io ) )
97     {
98        struct tr_datatype * next = io->outbuf_datatypes->data;
99
100        const unsigned int payload = MIN( next->length, bytes_transferred );
101        const unsigned int overhead = guessPacketOverhead( payload );
102        const uint64_t now = tr_time_msec( );
103
104        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
105
106        if( overhead > 0 )
107            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
108
109        if( io->didWrite )
110            io->didWrite( io, payload, next->isPieceData, io->userData );
111
112        if( tr_isPeerIo( io ) )
113        {
114            bytes_transferred -= payload;
115            next->length -= payload;
116            if( !next->length ) {
117                tr_list_pop_front( &io->outbuf_datatypes );
118                tr_free( next );
119            }
120        }
121    }
122}
123
124static void
125canReadWrapper( tr_peerIo * io )
126{
127    tr_bool err = 0;
128    tr_bool done = 0;
129    tr_session * session;
130
131    dbgmsg( io, "canRead" );
132
133    assert( tr_isPeerIo( io ) );
134    assert( tr_isSession( io->session ) );
135    tr_peerIoRef( io );
136
137    session = io->session;
138
139    /* try to consume the input buffer */
140    if( io->canRead )
141    {
142        tr_sessionLock( session );
143
144        while( !done && !err )
145        {
146            size_t piece = 0;
147            const size_t oldLen = evbuffer_get_length( io->inbuf );
148            const int ret = io->canRead( io, io->userData, &piece );
149
150            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
151
152            assert( tr_isPeerIo( io ) );
153
154            if( piece || (piece!=used) )
155            {
156                const uint64_t now = tr_time_msec( );
157
158                if( piece )
159                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
160
161                if( used != piece )
162                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
163            }
164
165            switch( ret )
166            {
167                case READ_NOW:
168                    if( evbuffer_get_length( io->inbuf ) )
169                        continue;
170                    done = 1;
171                    break;
172
173                case READ_LATER:
174                    done = 1;
175                    break;
176
177                case READ_ERR:
178                    err = 1;
179                    break;
180            }
181
182            assert( tr_isPeerIo( io ) );
183        }
184
185        tr_sessionUnlock( session );
186    }
187
188    assert( tr_isPeerIo( io ) );
189    tr_peerIoUnref( io );
190}
191
192tr_bool
193tr_isPeerIo( const tr_peerIo * io )
194{
195    return ( io != NULL )
196        && ( io->magicNumber == MAGIC_NUMBER )
197        && ( io->refCount >= 0 )
198        && ( tr_isBandwidth( &io->bandwidth ) )
199        && ( tr_isAddress( &io->addr ) );
200}
201
202static void
203event_read_cb( int fd, short event UNUSED, void * vio )
204{
205    int res;
206    int e;
207    tr_peerIo * io = vio;
208
209    /* Limit the input buffer to 256K, so it doesn't grow too large */
210    unsigned int howmuch;
211    unsigned int curlen;
212    const tr_direction dir = TR_DOWN;
213    const unsigned int max = 256 * 1024;
214
215    assert( tr_isPeerIo( io ) );
216
217    io->hasFinishedConnecting = TRUE;
218    io->pendingEvents &= ~EV_READ;
219
220    curlen = evbuffer_get_length( io->inbuf );
221    howmuch = curlen >= max ? 0 : max - curlen;
222    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
223
224    dbgmsg( io, "libevent says this peer is ready to read" );
225
226    /* if we don't have any bandwidth left, stop reading */
227    if( howmuch < 1 ) {
228        tr_peerIoSetEnabled( io, dir, FALSE );
229        return;
230    }
231
232    EVUTIL_SET_SOCKET_ERROR( 0 );
233    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
234    e = EVUTIL_SOCKET_ERROR( );
235
236    if( res > 0 )
237    {
238        tr_peerIoSetEnabled( io, dir, TRUE );
239
240        /* Invoke the user callback - must always be called last */
241        canReadWrapper( io );
242    }
243    else
244    {
245        char errstr[512];
246        short what = BEV_EVENT_READING;
247
248        if( res == 0 ) /* EOF */
249            what |= BEV_EVENT_EOF;
250        else if( res == -1 ) {
251            if( e == EAGAIN || e == EINTR ) {
252                tr_peerIoSetEnabled( io, dir, TRUE );
253                return;
254            }
255            what |= BEV_EVENT_ERROR;
256        }
257
258        tr_net_strerror( errstr, sizeof( errstr ), e );
259        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
260
261        if( io->gotError != NULL )
262            io->gotError( io, what, io->userData );
263    }
264}
265
266static int
267tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
268{
269    int e;
270    int n;
271    char errstr[256];
272
273    EVUTIL_SET_SOCKET_ERROR( 0 );
274    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
275    e = EVUTIL_SOCKET_ERROR( );
276    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
277
278    return n;
279}
280
281static void
282event_write_cb( int fd, short event UNUSED, void * vio )
283{
284    int res = 0;
285    int e;
286    short what = BEV_EVENT_WRITING;
287    tr_peerIo * io = vio;
288    size_t howmuch;
289    const tr_direction dir = TR_UP;
290
291    assert( tr_isPeerIo( io ) );
292
293    io->hasFinishedConnecting = TRUE;
294    io->pendingEvents &= ~EV_WRITE;
295
296    dbgmsg( io, "libevent says this peer is ready to write" );
297
298    /* Write as much as possible, since the socket is non-blocking, write() will
299     * return if it can't write any more data without blocking */
300    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
301
302    /* if we don't have any bandwidth left, stop writing */
303    if( howmuch < 1 ) {
304        tr_peerIoSetEnabled( io, dir, FALSE );
305        return;
306    }
307
308    EVUTIL_SET_SOCKET_ERROR( 0 );
309    res = tr_evbuffer_write( io, fd, howmuch );
310    e = EVUTIL_SOCKET_ERROR( );
311
312    if (res == -1) {
313        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
314            goto reschedule;
315        /* error case */
316        what |= BEV_EVENT_ERROR;
317    } else if (res == 0) {
318        /* eof case */
319        what |= BEV_EVENT_EOF;
320    }
321    if (res <= 0)
322        goto error;
323
324    if( evbuffer_get_length( io->outbuf ) )
325        tr_peerIoSetEnabled( io, dir, TRUE );
326
327    didWriteWrapper( io, res );
328    return;
329
330 reschedule:
331    if( evbuffer_get_length( io->outbuf ) )
332        tr_peerIoSetEnabled( io, dir, TRUE );
333    return;
334
335 error:
336
337    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
338
339    if( io->gotError != NULL )
340        io->gotError( io, what, io->userData );
341}
342
343/**
344***
345**/
346
347static void
348maybeSetCongestionAlgorithm( int socket, const char * algorithm )
349{
350    if( algorithm && *algorithm )
351    {
352        const int rc = tr_netSetCongestionControl( socket, algorithm );
353
354        if( rc < 0 )
355            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
356                     algorithm, tr_strerror( errno ));
357    }
358}
359
360static tr_peerIo*
361tr_peerIoNew( tr_session       * session,
362              tr_bandwidth     * parent,
363              const tr_address * addr,
364              tr_port            port,
365              const uint8_t    * torrentHash,
366              tr_bool            isIncoming,
367              tr_bool            isSeed,
368              int                socket )
369{
370    tr_peerIo * io;
371
372    assert( session != NULL );
373    assert( session->events != NULL );
374    assert( tr_isBool( isIncoming ) );
375    assert( tr_isBool( isSeed ) );
376    assert( tr_amInEventThread( session ) );
377
378    if( socket >= 0 ) {
379        tr_netSetTOS( socket, session->peerSocketTOS );
380        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
381    }
382
383    io = tr_new0( tr_peerIo, 1 );
384    io->magicNumber = MAGIC_NUMBER;
385    io->refCount = 1;
386    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
387    io->session = session;
388    io->addr = *addr;
389    io->isSeed = isSeed;
390    io->port = port;
391    io->socket = socket;
392    io->isIncoming = isIncoming != 0;
393    io->hasFinishedConnecting = FALSE;
394    io->timeCreated = tr_time( );
395    io->inbuf = evbuffer_new( );
396    io->outbuf = evbuffer_new( );
397    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
398    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
399    tr_bandwidthConstruct( &io->bandwidth, session, parent );
400    tr_bandwidthSetPeer( &io->bandwidth, io );
401    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
402
403    return io;
404}
405
406tr_peerIo*
407tr_peerIoNewIncoming( tr_session        * session,
408                      tr_bandwidth      * parent,
409                      const tr_address  * addr,
410                      tr_port             port,
411                      int                 fd )
412{
413    assert( session );
414    assert( tr_isAddress( addr ) );
415    assert( fd >= 0 );
416
417    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, fd );
418}
419
420tr_peerIo*
421tr_peerIoNewOutgoing( tr_session        * session,
422                      tr_bandwidth      * parent,
423                      const tr_address  * addr,
424                      tr_port             port,
425                      const uint8_t     * torrentHash,
426                      tr_bool             isSeed )
427{
428    int fd;
429
430    assert( session );
431    assert( tr_isAddress( addr ) );
432    assert( torrentHash );
433
434    fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
435    dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
436
437    return fd < 0 ? NULL
438                  : tr_peerIoNew( session, parent, addr, port, torrentHash, FALSE, isSeed, fd );
439}
440
441/***
442****
443***/
444
445static void
446event_enable( tr_peerIo * io, short event )
447{
448    assert( tr_amInEventThread( io->session ) );
449    assert( io->session != NULL );
450    assert( io->session->events != NULL );
451    assert( event_initialized( io->event_read ) );
452    assert( event_initialized( io->event_write ) );
453
454    if( io->socket < 0 )
455        return;
456
457    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
458    {
459        dbgmsg( io, "enabling libevent ready-to-read polling" );
460        event_add( io->event_read, NULL );
461        io->pendingEvents |= EV_READ;
462    }
463
464    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
465    {
466        dbgmsg( io, "enabling libevent ready-to-write polling" );
467        event_add( io->event_write, NULL );
468        io->pendingEvents |= EV_WRITE;
469    }
470}
471
472static void
473event_disable( struct tr_peerIo * io, short event )
474{
475    assert( tr_amInEventThread( io->session ) );
476    assert( io->session != NULL );
477    assert( io->session->events != NULL );
478    assert( event_initialized( io->event_read ) );
479    assert( event_initialized( io->event_write ) );
480
481    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
482    {
483        dbgmsg( io, "disabling libevent ready-to-read polling" );
484        event_del( io->event_read );
485        io->pendingEvents &= ~EV_READ;
486    }
487
488    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
489    {
490        dbgmsg( io, "disabling libevent ready-to-write polling" );
491        event_del( io->event_write );
492        io->pendingEvents &= ~EV_WRITE;
493    }
494}
495
496void
497tr_peerIoSetEnabled( tr_peerIo    * io,
498                     tr_direction   dir,
499                     tr_bool        isEnabled )
500{
501    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
502
503    assert( tr_isPeerIo( io ) );
504    assert( tr_isDirection( dir ) );
505    assert( tr_amInEventThread( io->session ) );
506    assert( io->session->events != NULL );
507
508    if( isEnabled )
509        event_enable( io, event );
510    else
511        event_disable( io, event );
512}
513
514/***
515****
516***/
517
518static void
519io_dtor( void * vio )
520{
521    tr_peerIo * io = vio;
522
523    assert( tr_isPeerIo( io ) );
524    assert( tr_amInEventThread( io->session ) );
525    assert( io->session->events != NULL );
526
527    dbgmsg( io, "in tr_peerIo destructor" );
528    event_disable( io, EV_READ | EV_WRITE );
529    event_free( io->event_read );
530    event_free( io->event_write );
531    tr_bandwidthDestruct( &io->bandwidth );
532    evbuffer_free( io->outbuf );
533    evbuffer_free( io->inbuf );
534    tr_netClose( io->session, io->socket );
535    tr_cryptoFree( io->crypto );
536    tr_list_free( &io->outbuf_datatypes, tr_free );
537
538    memset( io, ~0, sizeof( tr_peerIo ) );
539    tr_free( io );
540}
541
542static void
543tr_peerIoFree( tr_peerIo * io )
544{
545    if( io )
546    {
547        dbgmsg( io, "in tr_peerIoFree" );
548        io->canRead = NULL;
549        io->didWrite = NULL;
550        io->gotError = NULL;
551        tr_runInEventThread( io->session, io_dtor, io );
552    }
553}
554
555void
556tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
557{
558    assert( tr_isPeerIo( io ) );
559
560    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
561                file, line, io->refCount, io->refCount+1 );
562
563    ++io->refCount;
564}
565
566void
567tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
568{
569    assert( tr_isPeerIo( io ) );
570
571    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
572                file, line, io->refCount, io->refCount-1 );
573
574    if( !--io->refCount )
575        tr_peerIoFree( io );
576}
577
578const tr_address*
579tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
580{
581    assert( tr_isPeerIo( io ) );
582
583    if( port )
584        *port = io->port;
585
586    return &io->addr;
587}
588
589const char*
590tr_peerIoAddrStr( const tr_address * addr, tr_port port )
591{
592    static char buf[512];
593
594    if( addr->type == TR_AF_INET )
595        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
596    else
597        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
598    return buf;
599}
600
601const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
602{
603    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
604}
605
606void
607tr_peerIoSetIOFuncs( tr_peerIo        * io,
608                     tr_can_read_cb     readcb,
609                     tr_did_write_cb    writecb,
610                     tr_net_error_cb    errcb,
611                     void             * userData )
612{
613    io->canRead = readcb;
614    io->didWrite = writecb;
615    io->gotError = errcb;
616    io->userData = userData;
617}
618
619void
620tr_peerIoClear( tr_peerIo * io )
621{
622    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
623    tr_peerIoSetEnabled( io, TR_UP, FALSE );
624    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
625}
626
627int
628tr_peerIoReconnect( tr_peerIo * io )
629{
630    short int pendingEvents;
631    tr_session * session;
632
633    assert( tr_isPeerIo( io ) );
634    assert( !tr_peerIoIsIncoming( io ) );
635
636    session = tr_peerIoGetSession( io );
637
638    pendingEvents = io->pendingEvents;
639    event_disable( io, EV_READ | EV_WRITE );
640
641    if( io->socket >= 0 )
642        tr_netClose( session, io->socket );
643
644    event_del( io->event_read );
645    event_del( io->event_write );
646    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
647    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
648    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
649
650    if( io->socket >= 0 )
651    {
652        event_enable( io, pendingEvents );
653        tr_netSetTOS( io->socket, session->peerSocketTOS );
654        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
655        return 0;
656    }
657
658    return -1;
659}
660
661/**
662***
663**/
664
665void
666tr_peerIoSetTorrentHash( tr_peerIo *     io,
667                         const uint8_t * hash )
668{
669    assert( tr_isPeerIo( io ) );
670
671    tr_cryptoSetTorrentHash( io->crypto, hash );
672}
673
674const uint8_t*
675tr_peerIoGetTorrentHash( tr_peerIo * io )
676{
677    assert( tr_isPeerIo( io ) );
678    assert( io->crypto );
679
680    return tr_cryptoGetTorrentHash( io->crypto );
681}
682
683int
684tr_peerIoHasTorrentHash( const tr_peerIo * io )
685{
686    assert( tr_isPeerIo( io ) );
687    assert( io->crypto );
688
689    return tr_cryptoHasTorrentHash( io->crypto );
690}
691
692/**
693***
694**/
695
696void
697tr_peerIoSetPeersId( tr_peerIo *     io,
698                     const uint8_t * peer_id )
699{
700    assert( tr_isPeerIo( io ) );
701
702    if( ( io->peerIdIsSet = peer_id != NULL ) )
703        memcpy( io->peerId, peer_id, 20 );
704    else
705        memset( io->peerId, 0, 20 );
706}
707
708/**
709***
710**/
711
712static unsigned int
713getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
714{
715    /* this is all kind of arbitrary, but what seems to work well is
716     * being large enough to hold the next 20 seconds' worth of input,
717     * or a few blocks, whichever is bigger.
718     * It's okay to tweak this as needed */
719    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
720    const unsigned int period = 15u; /* arbitrary */
721    /* the 3 is arbitrary; the .5 is to leave room for messages */
722    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
723    return MAX( ceiling, currentSpeed_Bps*period );
724}
725
726size_t
727tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
728{
729    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
730    const size_t currentLen = evbuffer_get_length( io->outbuf );
731    size_t freeSpace = 0;
732
733    if( desiredLen > currentLen )
734        freeSpace = desiredLen - currentLen;
735
736    return freeSpace;
737}
738
739/**
740***
741**/
742
743void
744tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
745{
746    assert( tr_isPeerIo( io ) );
747    assert( encryptionMode == PEER_ENCRYPTION_NONE
748         || encryptionMode == PEER_ENCRYPTION_RC4 );
749
750    io->encryptionMode = encryptionMode;
751}
752
753/**
754***
755**/
756
757static void
758addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
759{
760    struct tr_datatype * d;
761
762    d = tr_new( struct tr_datatype, 1 );
763    d->isPieceData = isPieceData != 0;
764    d->length = byteCount;
765    tr_list_append( &io->outbuf_datatypes, d );
766}
767
768static struct evbuffer_iovec *
769evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
770{
771    const size_t byteCount = evbuffer_get_length( buf );
772    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
773    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
774    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
775    assert( vecCount == n );
776    *setme_vecCount = vecCount;
777    return iovec;
778}
779
780static void
781maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
782{
783    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
784    {
785        size_t i, n;
786        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
787
788        for( i=0; i<n; ++i )
789            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
790
791        tr_free( iovec );
792    }
793}
794
795void
796tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
797{
798    const size_t byteCount = evbuffer_get_length( buf );
799    maybeEncryptBuffer( io, buf );
800    evbuffer_add_buffer( io->outbuf, buf );
801    addDatatype( io, byteCount, isPieceData );
802}
803
804void
805tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
806{
807    struct evbuffer * buf = evbuffer_new( );
808    evbuffer_add( buf, bytes, byteCount );
809    tr_peerIoWriteBuf( io, buf, isPieceData );
810    evbuffer_free( buf );
811}
812
813/***
814****
815***/
816
817void
818evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
819{
820    const uint16_t ns = htons( addme_hs );
821    evbuffer_add( outbuf, &ns, sizeof( ns ) );
822}
823
824void
825evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
826{
827    const uint32_t nl = htonl( addme_hl );
828    evbuffer_add( outbuf, &nl, sizeof( nl ) );
829}
830
831/***
832****
833***/
834
835void
836tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
837{
838    assert( tr_isPeerIo( io ) );
839    assert( evbuffer_get_length( inbuf )  >= byteCount );
840
841    switch( io->encryptionMode )
842    {
843        case PEER_ENCRYPTION_NONE:
844            evbuffer_remove( inbuf, bytes, byteCount );
845            break;
846
847        case PEER_ENCRYPTION_RC4:
848            evbuffer_remove( inbuf, bytes, byteCount );
849            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
850            break;
851
852        default:
853            assert( 0 );
854    }
855}
856
857void
858tr_peerIoReadUint16( tr_peerIo        * io,
859                     struct evbuffer  * inbuf,
860                     uint16_t         * setme )
861{
862    uint16_t tmp;
863    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
864    *setme = ntohs( tmp );
865}
866
867void tr_peerIoReadUint32( tr_peerIo        * io,
868                          struct evbuffer  * inbuf,
869                          uint32_t         * setme )
870{
871    uint32_t tmp;
872    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
873    *setme = ntohl( tmp );
874}
875
876void
877tr_peerIoDrain( tr_peerIo       * io,
878                struct evbuffer * inbuf,
879                size_t            byteCount )
880{
881    void * buf = tr_sessionGetBuffer( io->session );
882    const size_t buflen = SESSION_BUFFER_SIZE;
883
884    while( byteCount > 0 )
885    {
886        const size_t thisPass = MIN( byteCount, buflen );
887        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
888        byteCount -= thisPass;
889    }
890
891    tr_sessionReleaseBuffer( io->session );
892}
893
894/***
895****
896***/
897
898static int
899tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
900{
901    int res = 0;
902
903    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
904    {
905        int e;
906
907        EVUTIL_SET_SOCKET_ERROR( 0 );
908        res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
909        e = EVUTIL_SOCKET_ERROR( );
910
911        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
912
913        if( evbuffer_get_length( io->inbuf ) )
914            canReadWrapper( io );
915
916        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
917        {
918            char errstr[512];
919            short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
920            if( res == 0 )
921                what |= BEV_EVENT_EOF;
922            tr_net_strerror( errstr, sizeof( errstr ), e );
923            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
924            io->gotError( io, what, io->userData );
925        }
926    }
927
928    return res;
929}
930
931static int
932tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
933{
934    int n = 0;
935
936    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
937    {
938        int e;
939        EVUTIL_SET_SOCKET_ERROR( 0 );
940        n = tr_evbuffer_write( io, io->socket, howmuch );
941        e = EVUTIL_SOCKET_ERROR( );
942
943        if( n > 0 )
944            didWriteWrapper( io, n );
945
946        if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
947        {
948            char errstr[512];
949            const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
950
951            tr_net_strerror( errstr, sizeof( errstr ), e );
952            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
953
954            if( io->gotError != NULL )
955                io->gotError( io, what, io->userData );
956        }
957    }
958
959    return n;
960}
961
962int
963tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
964{
965    int bytesUsed = 0;
966
967    assert( tr_isPeerIo( io ) );
968    assert( tr_isDirection( dir ) );
969
970    if( io->hasFinishedConnecting )
971    {
972        if( dir == TR_DOWN )
973            bytesUsed = tr_peerIoTryRead( io, limit );
974        else
975            bytesUsed = tr_peerIoTryWrite( io, limit );
976    }
977
978    dbgmsg( io, "flushing peer-io, hasFinishedConnecting %d, direction %d, limit %zu, bytesUsed %d", (int)io->hasFinishedConnecting, (int)dir, limit, bytesUsed );
979    return bytesUsed;
980}
981
982int
983tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
984{
985    size_t byteCount = 0;
986    tr_list * it;
987
988    /* count up how many bytes are used by non-piece-data messages
989       at the front of our outbound queue */
990    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
991    {
992        struct tr_datatype * d = it->data;
993
994        if( d->isPieceData )
995            break;
996
997        byteCount += d->length;
998    }
999
1000    return tr_peerIoFlush( io, TR_UP, byteCount );
1001}
Note: See TracBrowser for help on using the repository browser.