source: trunk/libtransmission/peer-io.c @ 11669

Last change on this file since 11669 was 11669, checked in by jordan, 11 years ago

(trunk libT) #3894 "tr_peerIo.hasFinishedConnecting should be removed" -- committed.

  • Property svn:keywords set to Date Rev Author Id
File size: 25.5 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11669 2011-01-13 01:58:57Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28
29#include "transmission.h"
30#include "session.h"
31#include "bandwidth.h"
32#include "crypto.h"
33#include "list.h"
34#include "net.h"
35#include "peer-common.h" /* MAX_BLOCK_SIZE */
36#include "peer-io.h"
37#include "trevent.h" /* tr_runInEventThread() */
38#include "utils.h"
39
40#define MAGIC_NUMBER 206745
41
42#ifdef WIN32
43 #define EAGAIN       WSAEWOULDBLOCK
44 #define EINTR        WSAEINTR
45 #define EINPROGRESS  WSAEINPROGRESS
46 #define EPIPE        WSAECONNRESET
47#endif
48
49static size_t
50guessPacketOverhead( size_t d )
51{
52    /**
53     * http://sd.wareonearth.com/~phil/net/overhead/
54     *
55     * TCP over Ethernet:
56     * Assuming no header compression (e.g. not PPP)
57     * Add 20 IPv4 header or 40 IPv6 header (no options)
58     * Add 20 TCP header
59     * Add 12 bytes optional TCP timestamps
60     * Max TCP Payload data rates over ethernet are thus:
61     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
62     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
63     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
64     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
65     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
66     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
67     */
68    const double assumed_payload_data_rate = 94.0;
69
70    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
71}
72
73/**
74***
75**/
76
77#define dbgmsg( io, ... ) \
78    do { \
79        if( tr_deepLoggingIsActive( ) ) \
80            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
81    } while( 0 )
82
83struct tr_datatype
84{
85    tr_bool  isPieceData;
86    size_t   length;
87};
88
89/***
90****
91***/
92
93static void
94didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
95{
96     while( bytes_transferred && tr_isPeerIo( io ) )
97     {
98        struct tr_datatype * next = io->outbuf_datatypes->data;
99
100        const unsigned int payload = MIN( next->length, bytes_transferred );
101        const unsigned int overhead = guessPacketOverhead( payload );
102        const uint64_t now = tr_time_msec( );
103
104        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
105
106        if( overhead > 0 )
107            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
108
109        if( io->didWrite )
110            io->didWrite( io, payload, next->isPieceData, io->userData );
111
112        if( tr_isPeerIo( io ) )
113        {
114            bytes_transferred -= payload;
115            next->length -= payload;
116            if( !next->length ) {
117                tr_list_pop_front( &io->outbuf_datatypes );
118                tr_free( next );
119            }
120        }
121    }
122}
123
124static void
125canReadWrapper( tr_peerIo * io )
126{
127    tr_bool err = 0;
128    tr_bool done = 0;
129    tr_session * session;
130
131    dbgmsg( io, "canRead" );
132
133    assert( tr_isPeerIo( io ) );
134    assert( tr_isSession( io->session ) );
135    tr_peerIoRef( io );
136
137    session = io->session;
138
139    /* try to consume the input buffer */
140    if( io->canRead )
141    {
142        tr_sessionLock( session );
143
144        while( !done && !err )
145        {
146            size_t piece = 0;
147            const size_t oldLen = evbuffer_get_length( io->inbuf );
148            const int ret = io->canRead( io, io->userData, &piece );
149
150            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
151
152            assert( tr_isPeerIo( io ) );
153
154            if( piece || (piece!=used) )
155            {
156                const uint64_t now = tr_time_msec( );
157
158                if( piece )
159                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
160
161                if( used != piece )
162                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
163            }
164
165            switch( ret )
166            {
167                case READ_NOW:
168                    if( evbuffer_get_length( io->inbuf ) )
169                        continue;
170                    done = 1;
171                    break;
172
173                case READ_LATER:
174                    done = 1;
175                    break;
176
177                case READ_ERR:
178                    err = 1;
179                    break;
180            }
181
182            assert( tr_isPeerIo( io ) );
183        }
184
185        tr_sessionUnlock( session );
186    }
187
188    assert( tr_isPeerIo( io ) );
189    tr_peerIoUnref( io );
190}
191
192tr_bool
193tr_isPeerIo( const tr_peerIo * io )
194{
195    return ( io != NULL )
196        && ( io->magicNumber == MAGIC_NUMBER )
197        && ( io->refCount >= 0 )
198        && ( tr_isBandwidth( &io->bandwidth ) )
199        && ( tr_isAddress( &io->addr ) );
200}
201
202static void
203event_read_cb( int fd, short event UNUSED, void * vio )
204{
205    int res;
206    int e;
207    tr_peerIo * io = vio;
208
209    /* Limit the input buffer to 256K, so it doesn't grow too large */
210    unsigned int howmuch;
211    unsigned int curlen;
212    const tr_direction dir = TR_DOWN;
213    const unsigned int max = 256 * 1024;
214
215    assert( tr_isPeerIo( io ) );
216
217    io->pendingEvents &= ~EV_READ;
218
219    curlen = evbuffer_get_length( io->inbuf );
220    howmuch = curlen >= max ? 0 : max - curlen;
221    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
222
223    dbgmsg( io, "libevent says this peer is ready to read" );
224
225    /* if we don't have any bandwidth left, stop reading */
226    if( howmuch < 1 ) {
227        tr_peerIoSetEnabled( io, dir, FALSE );
228        return;
229    }
230
231    EVUTIL_SET_SOCKET_ERROR( 0 );
232    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
233    e = EVUTIL_SOCKET_ERROR( );
234
235    if( res > 0 )
236    {
237        tr_peerIoSetEnabled( io, dir, TRUE );
238
239        /* Invoke the user callback - must always be called last */
240        canReadWrapper( io );
241    }
242    else
243    {
244        char errstr[512];
245        short what = BEV_EVENT_READING;
246
247        if( res == 0 ) /* EOF */
248            what |= BEV_EVENT_EOF;
249        else if( res == -1 ) {
250            if( e == EAGAIN || e == EINTR ) {
251                tr_peerIoSetEnabled( io, dir, TRUE );
252                return;
253            }
254            what |= BEV_EVENT_ERROR;
255        }
256
257        tr_net_strerror( errstr, sizeof( errstr ), e );
258        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
259
260        if( io->gotError != NULL )
261            io->gotError( io, what, io->userData );
262    }
263}
264
265static int
266tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
267{
268    int e;
269    int n;
270    char errstr[256];
271
272    EVUTIL_SET_SOCKET_ERROR( 0 );
273    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
274    e = EVUTIL_SOCKET_ERROR( );
275    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
276
277    return n;
278}
279
280static void
281event_write_cb( int fd, short event UNUSED, void * vio )
282{
283    int res = 0;
284    int e;
285    short what = BEV_EVENT_WRITING;
286    tr_peerIo * io = vio;
287    size_t howmuch;
288    const tr_direction dir = TR_UP;
289
290    assert( tr_isPeerIo( io ) );
291
292    io->pendingEvents &= ~EV_WRITE;
293
294    dbgmsg( io, "libevent says this peer is ready to write" );
295
296    /* Write as much as possible, since the socket is non-blocking, write() will
297     * return if it can't write any more data without blocking */
298    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
299
300    /* if we don't have any bandwidth left, stop writing */
301    if( howmuch < 1 ) {
302        tr_peerIoSetEnabled( io, dir, FALSE );
303        return;
304    }
305
306    EVUTIL_SET_SOCKET_ERROR( 0 );
307    res = tr_evbuffer_write( io, fd, howmuch );
308    e = EVUTIL_SOCKET_ERROR( );
309
310    if (res == -1) {
311        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
312            goto reschedule;
313        /* error case */
314        what |= BEV_EVENT_ERROR;
315    } else if (res == 0) {
316        /* eof case */
317        what |= BEV_EVENT_EOF;
318    }
319    if (res <= 0)
320        goto error;
321
322    if( evbuffer_get_length( io->outbuf ) )
323        tr_peerIoSetEnabled( io, dir, TRUE );
324
325    didWriteWrapper( io, res );
326    return;
327
328 reschedule:
329    if( evbuffer_get_length( io->outbuf ) )
330        tr_peerIoSetEnabled( io, dir, TRUE );
331    return;
332
333 error:
334
335    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
336
337    if( io->gotError != NULL )
338        io->gotError( io, what, io->userData );
339}
340
341/**
342***
343**/
344
345static void
346maybeSetCongestionAlgorithm( int socket, const char * algorithm )
347{
348    if( algorithm && *algorithm )
349    {
350        const int rc = tr_netSetCongestionControl( socket, algorithm );
351
352        if( rc < 0 )
353            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
354                     algorithm, tr_strerror( errno ));
355    }
356}
357
358static tr_peerIo*
359tr_peerIoNew( tr_session       * session,
360              tr_bandwidth     * parent,
361              const tr_address * addr,
362              tr_port            port,
363              const uint8_t    * torrentHash,
364              tr_bool            isIncoming,
365              tr_bool            isSeed,
366              int                socket )
367{
368    tr_peerIo * io;
369
370    assert( session != NULL );
371    assert( session->events != NULL );
372    assert( tr_isBool( isIncoming ) );
373    assert( tr_isBool( isSeed ) );
374    assert( tr_amInEventThread( session ) );
375
376    if( socket >= 0 ) {
377        tr_netSetTOS( socket, session->peerSocketTOS );
378        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
379    }
380
381    io = tr_new0( tr_peerIo, 1 );
382    io->magicNumber = MAGIC_NUMBER;
383    io->refCount = 1;
384    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
385    io->session = session;
386    io->addr = *addr;
387    io->isSeed = isSeed;
388    io->port = port;
389    io->socket = socket;
390    io->isIncoming = isIncoming != 0;
391    io->timeCreated = tr_time( );
392    io->inbuf = evbuffer_new( );
393    io->outbuf = evbuffer_new( );
394    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
395    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
396    tr_bandwidthConstruct( &io->bandwidth, session, parent );
397    tr_bandwidthSetPeer( &io->bandwidth, io );
398    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
399
400    return io;
401}
402
403tr_peerIo*
404tr_peerIoNewIncoming( tr_session        * session,
405                      tr_bandwidth      * parent,
406                      const tr_address  * addr,
407                      tr_port             port,
408                      int                 fd )
409{
410    assert( session );
411    assert( tr_isAddress( addr ) );
412    assert( fd >= 0 );
413
414    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, fd );
415}
416
417tr_peerIo*
418tr_peerIoNewOutgoing( tr_session        * session,
419                      tr_bandwidth      * parent,
420                      const tr_address  * addr,
421                      tr_port             port,
422                      const uint8_t     * torrentHash,
423                      tr_bool             isSeed )
424{
425    int fd;
426
427    assert( session );
428    assert( tr_isAddress( addr ) );
429    assert( torrentHash );
430
431    fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
432    dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
433
434    return fd < 0 ? NULL
435                  : tr_peerIoNew( session, parent, addr, port, torrentHash, FALSE, isSeed, fd );
436}
437
438/***
439****
440***/
441
442static void
443event_enable( tr_peerIo * io, short event )
444{
445    assert( tr_amInEventThread( io->session ) );
446    assert( io->session != NULL );
447    assert( io->session->events != NULL );
448    assert( event_initialized( io->event_read ) );
449    assert( event_initialized( io->event_write ) );
450
451    if( io->socket < 0 )
452        return;
453
454    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
455    {
456        dbgmsg( io, "enabling libevent ready-to-read polling" );
457        event_add( io->event_read, NULL );
458        io->pendingEvents |= EV_READ;
459    }
460
461    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
462    {
463        dbgmsg( io, "enabling libevent ready-to-write polling" );
464        event_add( io->event_write, NULL );
465        io->pendingEvents |= EV_WRITE;
466    }
467}
468
469static void
470event_disable( struct tr_peerIo * io, short event )
471{
472    assert( tr_amInEventThread( io->session ) );
473    assert( io->session != NULL );
474    assert( io->session->events != NULL );
475    assert( event_initialized( io->event_read ) );
476    assert( event_initialized( io->event_write ) );
477
478    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
479    {
480        dbgmsg( io, "disabling libevent ready-to-read polling" );
481        event_del( io->event_read );
482        io->pendingEvents &= ~EV_READ;
483    }
484
485    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
486    {
487        dbgmsg( io, "disabling libevent ready-to-write polling" );
488        event_del( io->event_write );
489        io->pendingEvents &= ~EV_WRITE;
490    }
491}
492
493void
494tr_peerIoSetEnabled( tr_peerIo    * io,
495                     tr_direction   dir,
496                     tr_bool        isEnabled )
497{
498    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
499
500    assert( tr_isPeerIo( io ) );
501    assert( tr_isDirection( dir ) );
502    assert( tr_amInEventThread( io->session ) );
503    assert( io->session->events != NULL );
504
505    if( isEnabled )
506        event_enable( io, event );
507    else
508        event_disable( io, event );
509}
510
511/***
512****
513***/
514
515static void
516io_dtor( void * vio )
517{
518    tr_peerIo * io = vio;
519
520    assert( tr_isPeerIo( io ) );
521    assert( tr_amInEventThread( io->session ) );
522    assert( io->session->events != NULL );
523
524    dbgmsg( io, "in tr_peerIo destructor" );
525    event_disable( io, EV_READ | EV_WRITE );
526    event_free( io->event_read );
527    event_free( io->event_write );
528    tr_bandwidthDestruct( &io->bandwidth );
529    evbuffer_free( io->outbuf );
530    evbuffer_free( io->inbuf );
531    tr_netClose( io->session, io->socket );
532    tr_cryptoFree( io->crypto );
533    tr_list_free( &io->outbuf_datatypes, tr_free );
534
535    memset( io, ~0, sizeof( tr_peerIo ) );
536    tr_free( io );
537}
538
539static void
540tr_peerIoFree( tr_peerIo * io )
541{
542    if( io )
543    {
544        dbgmsg( io, "in tr_peerIoFree" );
545        io->canRead = NULL;
546        io->didWrite = NULL;
547        io->gotError = NULL;
548        tr_runInEventThread( io->session, io_dtor, io );
549    }
550}
551
552void
553tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
554{
555    assert( tr_isPeerIo( io ) );
556
557    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
558                file, line, io->refCount, io->refCount+1 );
559
560    ++io->refCount;
561}
562
563void
564tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
565{
566    assert( tr_isPeerIo( io ) );
567
568    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
569                file, line, io->refCount, io->refCount-1 );
570
571    if( !--io->refCount )
572        tr_peerIoFree( io );
573}
574
575const tr_address*
576tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
577{
578    assert( tr_isPeerIo( io ) );
579
580    if( port )
581        *port = io->port;
582
583    return &io->addr;
584}
585
586const char*
587tr_peerIoAddrStr( const tr_address * addr, tr_port port )
588{
589    static char buf[512];
590
591    if( addr->type == TR_AF_INET )
592        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
593    else
594        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
595    return buf;
596}
597
598const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
599{
600    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
601}
602
603void
604tr_peerIoSetIOFuncs( tr_peerIo        * io,
605                     tr_can_read_cb     readcb,
606                     tr_did_write_cb    writecb,
607                     tr_net_error_cb    errcb,
608                     void             * userData )
609{
610    io->canRead = readcb;
611    io->didWrite = writecb;
612    io->gotError = errcb;
613    io->userData = userData;
614}
615
616void
617tr_peerIoClear( tr_peerIo * io )
618{
619    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
620    tr_peerIoSetEnabled( io, TR_UP, FALSE );
621    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
622}
623
624int
625tr_peerIoReconnect( tr_peerIo * io )
626{
627    short int pendingEvents;
628    tr_session * session;
629
630    assert( tr_isPeerIo( io ) );
631    assert( !tr_peerIoIsIncoming( io ) );
632
633    session = tr_peerIoGetSession( io );
634
635    pendingEvents = io->pendingEvents;
636    event_disable( io, EV_READ | EV_WRITE );
637
638    if( io->socket >= 0 )
639        tr_netClose( session, io->socket );
640
641    event_del( io->event_read );
642    event_del( io->event_write );
643    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
644    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
645    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
646
647    if( io->socket >= 0 )
648    {
649        event_enable( io, pendingEvents );
650        tr_netSetTOS( io->socket, session->peerSocketTOS );
651        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
652        return 0;
653    }
654
655    return -1;
656}
657
658/**
659***
660**/
661
662void
663tr_peerIoSetTorrentHash( tr_peerIo *     io,
664                         const uint8_t * hash )
665{
666    assert( tr_isPeerIo( io ) );
667
668    tr_cryptoSetTorrentHash( io->crypto, hash );
669}
670
671const uint8_t*
672tr_peerIoGetTorrentHash( tr_peerIo * io )
673{
674    assert( tr_isPeerIo( io ) );
675    assert( io->crypto );
676
677    return tr_cryptoGetTorrentHash( io->crypto );
678}
679
680int
681tr_peerIoHasTorrentHash( const tr_peerIo * io )
682{
683    assert( tr_isPeerIo( io ) );
684    assert( io->crypto );
685
686    return tr_cryptoHasTorrentHash( io->crypto );
687}
688
689/**
690***
691**/
692
693void
694tr_peerIoSetPeersId( tr_peerIo *     io,
695                     const uint8_t * peer_id )
696{
697    assert( tr_isPeerIo( io ) );
698
699    if( ( io->peerIdIsSet = peer_id != NULL ) )
700        memcpy( io->peerId, peer_id, 20 );
701    else
702        memset( io->peerId, 0, 20 );
703}
704
705/**
706***
707**/
708
709static unsigned int
710getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
711{
712    /* this is all kind of arbitrary, but what seems to work well is
713     * being large enough to hold the next 20 seconds' worth of input,
714     * or a few blocks, whichever is bigger.
715     * It's okay to tweak this as needed */
716    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
717    const unsigned int period = 15u; /* arbitrary */
718    /* the 3 is arbitrary; the .5 is to leave room for messages */
719    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
720    return MAX( ceiling, currentSpeed_Bps*period );
721}
722
723size_t
724tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
725{
726    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
727    const size_t currentLen = evbuffer_get_length( io->outbuf );
728    size_t freeSpace = 0;
729
730    if( desiredLen > currentLen )
731        freeSpace = desiredLen - currentLen;
732
733    return freeSpace;
734}
735
736/**
737***
738**/
739
740void
741tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
742{
743    assert( tr_isPeerIo( io ) );
744    assert( encryptionMode == PEER_ENCRYPTION_NONE
745         || encryptionMode == PEER_ENCRYPTION_RC4 );
746
747    io->encryptionMode = encryptionMode;
748}
749
750/**
751***
752**/
753
754static void
755addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
756{
757    struct tr_datatype * d;
758
759    d = tr_new( struct tr_datatype, 1 );
760    d->isPieceData = isPieceData != 0;
761    d->length = byteCount;
762    tr_list_append( &io->outbuf_datatypes, d );
763}
764
765static struct evbuffer_iovec *
766evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
767{
768    const size_t byteCount = evbuffer_get_length( buf );
769    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
770    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
771    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
772    assert( vecCount == n );
773    *setme_vecCount = vecCount;
774    return iovec;
775}
776
777static void
778maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
779{
780    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
781    {
782        size_t i, n;
783        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
784
785        for( i=0; i<n; ++i )
786            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
787
788        tr_free( iovec );
789    }
790}
791
792void
793tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
794{
795    const size_t byteCount = evbuffer_get_length( buf );
796    maybeEncryptBuffer( io, buf );
797    evbuffer_add_buffer( io->outbuf, buf );
798    addDatatype( io, byteCount, isPieceData );
799}
800
801void
802tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
803{
804    struct evbuffer * buf = evbuffer_new( );
805    evbuffer_add( buf, bytes, byteCount );
806    tr_peerIoWriteBuf( io, buf, isPieceData );
807    evbuffer_free( buf );
808}
809
810/***
811****
812***/
813
814void
815evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
816{
817    const uint16_t ns = htons( addme_hs );
818    evbuffer_add( outbuf, &ns, sizeof( ns ) );
819}
820
821void
822evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
823{
824    const uint32_t nl = htonl( addme_hl );
825    evbuffer_add( outbuf, &nl, sizeof( nl ) );
826}
827
828/***
829****
830***/
831
832void
833tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
834{
835    assert( tr_isPeerIo( io ) );
836    assert( evbuffer_get_length( inbuf )  >= byteCount );
837
838    switch( io->encryptionMode )
839    {
840        case PEER_ENCRYPTION_NONE:
841            evbuffer_remove( inbuf, bytes, byteCount );
842            break;
843
844        case PEER_ENCRYPTION_RC4:
845            evbuffer_remove( inbuf, bytes, byteCount );
846            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
847            break;
848
849        default:
850            assert( 0 );
851    }
852}
853
854void
855tr_peerIoReadUint16( tr_peerIo        * io,
856                     struct evbuffer  * inbuf,
857                     uint16_t         * setme )
858{
859    uint16_t tmp;
860    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
861    *setme = ntohs( tmp );
862}
863
864void tr_peerIoReadUint32( tr_peerIo        * io,
865                          struct evbuffer  * inbuf,
866                          uint32_t         * setme )
867{
868    uint32_t tmp;
869    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
870    *setme = ntohl( tmp );
871}
872
873void
874tr_peerIoDrain( tr_peerIo       * io,
875                struct evbuffer * inbuf,
876                size_t            byteCount )
877{
878    void * buf = tr_sessionGetBuffer( io->session );
879    const size_t buflen = SESSION_BUFFER_SIZE;
880
881    while( byteCount > 0 )
882    {
883        const size_t thisPass = MIN( byteCount, buflen );
884        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
885        byteCount -= thisPass;
886    }
887
888    tr_sessionReleaseBuffer( io->session );
889}
890
891/***
892****
893***/
894
895static int
896tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
897{
898    int res = 0;
899
900    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
901    {
902        int e;
903
904        EVUTIL_SET_SOCKET_ERROR( 0 );
905        res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
906        e = EVUTIL_SOCKET_ERROR( );
907
908        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
909
910        if( evbuffer_get_length( io->inbuf ) )
911            canReadWrapper( io );
912
913        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
914        {
915            char errstr[512];
916            short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
917            if( res == 0 )
918                what |= BEV_EVENT_EOF;
919            tr_net_strerror( errstr, sizeof( errstr ), e );
920            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
921            io->gotError( io, what, io->userData );
922        }
923    }
924
925    return res;
926}
927
928static int
929tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
930{
931    int n = 0;
932
933    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
934    {
935        int e;
936        EVUTIL_SET_SOCKET_ERROR( 0 );
937        n = tr_evbuffer_write( io, io->socket, howmuch );
938        e = EVUTIL_SOCKET_ERROR( );
939
940        if( n > 0 )
941            didWriteWrapper( io, n );
942
943        if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
944        {
945            char errstr[512];
946            const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
947
948            tr_net_strerror( errstr, sizeof( errstr ), e );
949            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
950
951            if( io->gotError != NULL )
952                io->gotError( io, what, io->userData );
953        }
954    }
955
956    return n;
957}
958
959int
960tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
961{
962    int bytesUsed = 0;
963
964    assert( tr_isPeerIo( io ) );
965    assert( tr_isDirection( dir ) );
966
967    if( dir == TR_DOWN )
968        bytesUsed = tr_peerIoTryRead( io, limit );
969    else
970        bytesUsed = tr_peerIoTryWrite( io, limit );
971
972    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
973    return bytesUsed;
974}
975
976int
977tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
978{
979    size_t byteCount = 0;
980    tr_list * it;
981
982    /* count up how many bytes are used by non-piece-data messages
983       at the front of our outbound queue */
984    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
985    {
986        struct tr_datatype * d = it->data;
987
988        if( d->isPieceData )
989            break;
990
991        byteCount += d->length;
992    }
993
994    return tr_peerIoFlush( io, TR_UP, byteCount );
995}
Note: See TracBrowser for help on using the repository browser.