source: trunk/libtransmission/peer-io.c @ 11398

Last change on this file since 11398 was 11398, checked in by charles, 11 years ago

(trunk libT) add some new bugs to the code so that it will crash when vraa tries to use it

  • Property svn:keywords set to Date Rev Author Id
File size: 26.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11398 2010-11-11 15:31:11Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event.h>
27
28#include "transmission.h"
29#include "session.h"
30#include "bandwidth.h"
31#include "crypto.h"
32#include "list.h"
33#include "net.h"
34#include "peer-common.h" /* MAX_BLOCK_SIZE */
35#include "peer-io.h"
36#include "trevent.h" /* tr_runInEventThread() */
37#include "utils.h"
38
39#define MAGIC_NUMBER 206745
40
41#ifdef WIN32
42 #define EAGAIN       WSAEWOULDBLOCK
43 #define EINTR        WSAEINTR
44 #define EINPROGRESS  WSAEINPROGRESS
45 #define EPIPE        WSAECONNRESET
46#endif
47
48static size_t
49guessPacketOverhead( size_t d )
50{
51    /**
52     * http://sd.wareonearth.com/~phil/net/overhead/
53     *
54     * TCP over Ethernet:
55     * Assuming no header compression (e.g. not PPP)
56     * Add 20 IPv4 header or 40 IPv6 header (no options)
57     * Add 20 TCP header
58     * Add 12 bytes optional TCP timestamps
59     * Max TCP Payload data rates over ethernet are thus:
60     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
61     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
62     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
63     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
64     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
65     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
66     */
67    const double assumed_payload_data_rate = 94.0;
68
69    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
70}
71
72/**
73***
74**/
75
76#define dbgmsg( io, ... ) \
77    do { \
78        if( tr_deepLoggingIsActive( ) ) \
79            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
80    } while( 0 )
81
82struct tr_datatype
83{
84    tr_bool  isPieceData;
85    size_t   length;
86};
87
88/***
89****
90***/
91
92static void
93didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
94{
95     while( bytes_transferred && tr_isPeerIo( io ) )
96     {
97        struct tr_datatype * next = io->outbuf_datatypes->data;
98
99        const unsigned int payload = MIN( next->length, bytes_transferred );
100        const unsigned int overhead = guessPacketOverhead( payload );
101        const uint64_t now = tr_time_msec( );
102
103        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
104
105        if( overhead > 0 )
106            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
107
108        if( io->didWrite )
109            io->didWrite( io, payload, next->isPieceData, io->userData );
110
111        if( tr_isPeerIo( io ) )
112        {
113            bytes_transferred -= payload;
114            next->length -= payload;
115            if( !next->length ) {
116                tr_list_pop_front( &io->outbuf_datatypes );
117                tr_free( next );
118            }
119        }
120    }
121}
122
123static void
124canReadWrapper( tr_peerIo * io )
125{
126    tr_bool err = 0;
127    tr_bool done = 0;
128    tr_session * session;
129
130    dbgmsg( io, "canRead" );
131
132    assert( tr_isPeerIo( io ) );
133    assert( tr_isSession( io->session ) );
134    tr_peerIoRef( io );
135
136    session = io->session;
137
138    /* try to consume the input buffer */
139    if( io->canRead )
140    {
141        tr_sessionLock( session );
142
143        while( !done && !err )
144        {
145            size_t piece = 0;
146            const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
147            const int ret = io->canRead( io, io->userData, &piece );
148
149            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
150
151            assert( tr_isPeerIo( io ) );
152
153            if( piece || (piece!=used) )
154            {
155                const uint64_t now = tr_time_msec( );
156
157                if( piece )
158                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
159
160                if( used != piece )
161                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
162            }
163
164            switch( ret )
165            {
166                case READ_NOW:
167                    if( EVBUFFER_LENGTH( io->inbuf ) )
168                        continue;
169                    done = 1;
170                    break;
171
172                case READ_LATER:
173                    done = 1;
174                    break;
175
176                case READ_ERR:
177                    err = 1;
178                    break;
179            }
180
181            assert( tr_isPeerIo( io ) );
182        }
183
184        tr_sessionUnlock( session );
185    }
186
187    /* keep the iobuf's excess capacity from growing too large */
188    if( EVBUFFER_LENGTH( io->inbuf ) == 0 ) {
189        evbuffer_free( io->inbuf );
190        io->inbuf = evbuffer_new( );
191    }
192
193    assert( tr_isPeerIo( io ) );
194    tr_peerIoUnref( io );
195}
196
197tr_bool
198tr_isPeerIo( const tr_peerIo * io )
199{
200    return ( io != NULL )
201        && ( io->magicNumber == MAGIC_NUMBER )
202        && ( io->refCount >= 0 )
203        && ( tr_isBandwidth( &io->bandwidth ) )
204        && ( tr_isAddress( &io->addr ) );
205}
206
207static void
208event_read_cb( int fd, short event UNUSED, void * vio )
209{
210    int res;
211    int e;
212    tr_peerIo * io = vio;
213
214    /* Limit the input buffer to 256K, so it doesn't grow too large */
215    unsigned int howmuch;
216    unsigned int curlen;
217    const tr_direction dir = TR_DOWN;
218    const unsigned int max = 256 * 1024;
219
220    assert( tr_isPeerIo( io ) );
221
222    io->hasFinishedConnecting = TRUE;
223    io->pendingEvents &= ~EV_READ;
224
225    curlen = EVBUFFER_LENGTH( io->inbuf );
226    howmuch = curlen >= max ? 0 : max - curlen;
227    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
228
229    dbgmsg( io, "libevent says this peer is ready to read" );
230
231    /* if we don't have any bandwidth left, stop reading */
232    if( howmuch < 1 ) {
233        tr_peerIoSetEnabled( io, dir, FALSE );
234        return;
235    }
236
237    EVUTIL_SET_SOCKET_ERROR( 0 );
238    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
239    e = EVUTIL_SOCKET_ERROR( );
240
241    if( res > 0 )
242    {
243        tr_peerIoSetEnabled( io, dir, TRUE );
244
245        /* Invoke the user callback - must always be called last */
246        canReadWrapper( io );
247    }
248    else
249    {
250        char errstr[512];
251        short what = EVBUFFER_READ;
252
253        if( res == 0 ) /* EOF */
254            what |= EVBUFFER_EOF;
255        else if( res == -1 ) {
256            if( e == EAGAIN || e == EINTR ) {
257                tr_peerIoSetEnabled( io, dir, TRUE );
258                return;
259            }
260            what |= EVBUFFER_ERROR;
261        }
262
263        tr_net_strerror( errstr, sizeof( errstr ), e );
264        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
265
266        if( io->gotError != NULL )
267            io->gotError( io, what, io->userData );
268    }
269}
270
271static int
272tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
273{
274    int e;
275    int n;
276    char errstr[256];
277    struct evbuffer * buffer = io->outbuf;
278
279    howmuch = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
280
281    EVUTIL_SET_SOCKET_ERROR( 0 );
282#ifdef WIN32
283    n = (int) send(fd, buffer->buffer, howmuch,  0 );
284#else
285    n = (int) write(fd, buffer->buffer, howmuch );
286#endif
287    e = EVUTIL_SOCKET_ERROR( );
288    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
289
290    if( n > 0 )
291        evbuffer_drain( buffer, (size_t)n );
292
293    /* keep the iobuf's excess capacity from growing too large */
294    if( EVBUFFER_LENGTH( io->outbuf ) == 0 ) {
295        evbuffer_free( io->outbuf );
296        io->outbuf = evbuffer_new( );
297    }
298
299    return n;
300}
301
302static void
303event_write_cb( int fd, short event UNUSED, void * vio )
304{
305    int res = 0;
306    int e;
307    short what = EVBUFFER_WRITE;
308    tr_peerIo * io = vio;
309    size_t howmuch;
310    const tr_direction dir = TR_UP;
311
312    assert( tr_isPeerIo( io ) );
313
314    io->hasFinishedConnecting = TRUE;
315    io->pendingEvents &= ~EV_WRITE;
316
317    dbgmsg( io, "libevent says this peer is ready to write" );
318
319    /* Write as much as possible, since the socket is non-blocking, write() will
320     * return if it can't write any more data without blocking */
321    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, EVBUFFER_LENGTH( io->outbuf ) );
322
323    /* if we don't have any bandwidth left, stop writing */
324    if( howmuch < 1 ) {
325        tr_peerIoSetEnabled( io, dir, FALSE );
326        return;
327    }
328
329    EVUTIL_SET_SOCKET_ERROR( 0 );
330    res = tr_evbuffer_write( io, fd, howmuch );
331    e = EVUTIL_SOCKET_ERROR( );
332
333    if (res == -1) {
334        if (e == EAGAIN || e == EINTR || e == EINPROGRESS)
335            goto reschedule;
336        /* error case */
337        what |= EVBUFFER_ERROR;
338    } else if (res == 0) {
339        /* eof case */
340        what |= EVBUFFER_EOF;
341    }
342    if (res <= 0)
343        goto error;
344
345    if( EVBUFFER_LENGTH( io->outbuf ) )
346        tr_peerIoSetEnabled( io, dir, TRUE );
347
348    didWriteWrapper( io, res );
349    return;
350
351 reschedule:
352    if( EVBUFFER_LENGTH( io->outbuf ) )
353        tr_peerIoSetEnabled( io, dir, TRUE );
354    return;
355
356 error:
357
358    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
359
360    if( io->gotError != NULL )
361        io->gotError( io, what, io->userData );
362}
363
364/**
365***
366**/
367
368static void
369maybeSetCongestionAlgorithm( int socket, const char * algorithm )
370{
371    if( algorithm && *algorithm )
372    {
373        const int rc = tr_netSetCongestionControl( socket, algorithm );
374
375        if( rc < 0 )
376            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
377                     algorithm, tr_strerror( errno ));
378    }
379}
380
381static tr_peerIo*
382tr_peerIoNew( tr_session       * session,
383              tr_bandwidth     * parent,
384              const tr_address * addr,
385              tr_port            port,
386              const uint8_t    * torrentHash,
387              tr_bool            isIncoming,
388              tr_bool            isSeed,
389              int                socket )
390{
391    tr_peerIo * io;
392
393    assert( session != NULL );
394    assert( session->events != NULL );
395    assert( tr_isBool( isIncoming ) );
396    assert( tr_isBool( isSeed ) );
397    assert( tr_amInEventThread( session ) );
398
399    if( socket >= 0 ) {
400        tr_netSetTOS( socket, session->peerSocketTOS );
401        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
402    }
403
404    io = tr_new0( tr_peerIo, 1 );
405    io->magicNumber = MAGIC_NUMBER;
406    io->refCount = 1;
407    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
408    io->session = session;
409    io->addr = *addr;
410    io->isSeed = isSeed;
411    io->port = port;
412    io->socket = socket;
413    io->isIncoming = isIncoming != 0;
414    io->hasFinishedConnecting = FALSE;
415    io->timeCreated = tr_time( );
416    io->inbuf = evbuffer_new( );
417    io->outbuf = evbuffer_new( );
418    tr_bandwidthConstruct( &io->bandwidth, session, parent );
419    tr_bandwidthSetPeer( &io->bandwidth, io );
420    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
421
422    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
423    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
424
425    return io;
426}
427
428tr_peerIo*
429tr_peerIoNewIncoming( tr_session        * session,
430                      tr_bandwidth      * parent,
431                      const tr_address  * addr,
432                      tr_port             port,
433                      int                 fd )
434{
435    assert( session );
436    assert( tr_isAddress( addr ) );
437    assert( fd >= 0 );
438
439    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, fd );
440}
441
442tr_peerIo*
443tr_peerIoNewOutgoing( tr_session        * session,
444                      tr_bandwidth      * parent,
445                      const tr_address  * addr,
446                      tr_port             port,
447                      const uint8_t     * torrentHash,
448                      tr_bool             isSeed )
449{
450    int fd;
451
452    assert( session );
453    assert( tr_isAddress( addr ) );
454    assert( torrentHash );
455
456    fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
457    dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
458
459    return fd < 0 ? NULL
460                  : tr_peerIoNew( session, parent, addr, port, torrentHash, FALSE, isSeed, fd );
461}
462
463/***
464****
465***/
466
467static void
468event_enable( tr_peerIo * io, short event )
469{
470    assert( tr_amInEventThread( io->session ) );
471    assert( io->session != NULL );
472    assert( io->session->events != NULL );
473    assert( event_initialized( &io->event_read ) );
474    assert( event_initialized( &io->event_write ) );
475
476    if( io->socket < 0 )
477        return;
478
479    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
480    {
481        dbgmsg( io, "enabling libevent ready-to-read polling" );
482        event_add( &io->event_read, NULL );
483        io->pendingEvents |= EV_READ;
484    }
485
486    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
487    {
488        dbgmsg( io, "enabling libevent ready-to-write polling" );
489        event_add( &io->event_write, NULL );
490        io->pendingEvents |= EV_WRITE;
491    }
492}
493
494static void
495event_disable( struct tr_peerIo * io, short event )
496{
497    assert( tr_amInEventThread( io->session ) );
498    assert( io->session != NULL );
499    assert( io->session->events != NULL );
500    assert( event_initialized( &io->event_read ) );
501    assert( event_initialized( &io->event_write ) );
502
503    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
504    {
505        dbgmsg( io, "disabling libevent ready-to-read polling" );
506        event_del( &io->event_read );
507        io->pendingEvents &= ~EV_READ;
508    }
509
510    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
511    {
512        dbgmsg( io, "disabling libevent ready-to-write polling" );
513        event_del( &io->event_write );
514        io->pendingEvents &= ~EV_WRITE;
515    }
516}
517
518void
519tr_peerIoSetEnabled( tr_peerIo    * io,
520                     tr_direction   dir,
521                     tr_bool        isEnabled )
522{
523    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
524
525    assert( tr_isPeerIo( io ) );
526    assert( tr_isDirection( dir ) );
527    assert( tr_amInEventThread( io->session ) );
528    assert( io->session->events != NULL );
529
530    if( isEnabled )
531        event_enable( io, event );
532    else
533        event_disable( io, event );
534}
535
536/***
537****
538***/
539
540static void
541io_dtor( void * vio )
542{
543    tr_peerIo * io = vio;
544
545    assert( tr_isPeerIo( io ) );
546    assert( tr_amInEventThread( io->session ) );
547    assert( io->session->events != NULL );
548
549    dbgmsg( io, "in tr_peerIo destructor" );
550    event_disable( io, EV_READ | EV_WRITE );
551    tr_bandwidthDestruct( &io->bandwidth );
552    evbuffer_free( io->outbuf );
553    evbuffer_free( io->inbuf );
554    tr_netClose( io->session, io->socket );
555    tr_cryptoFree( io->crypto );
556    tr_list_free( &io->outbuf_datatypes, tr_free );
557
558    memset( io, ~0, sizeof( tr_peerIo ) );
559    tr_free( io );
560}
561
562static void
563tr_peerIoFree( tr_peerIo * io )
564{
565    if( io )
566    {
567        dbgmsg( io, "in tr_peerIoFree" );
568        io->canRead = NULL;
569        io->didWrite = NULL;
570        io->gotError = NULL;
571        tr_runInEventThread( io->session, io_dtor, io );
572    }
573}
574
575void
576tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
577{
578    assert( tr_isPeerIo( io ) );
579
580    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
581                file, line, io->refCount, io->refCount+1 );
582
583    ++io->refCount;
584}
585
586void
587tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
588{
589    assert( tr_isPeerIo( io ) );
590
591    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
592                file, line, io->refCount, io->refCount-1 );
593
594    if( !--io->refCount )
595        tr_peerIoFree( io );
596}
597
598const tr_address*
599tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
600{
601    assert( tr_isPeerIo( io ) );
602
603    if( port )
604        *port = io->port;
605
606    return &io->addr;
607}
608
609const char*
610tr_peerIoAddrStr( const tr_address * addr, tr_port port )
611{
612    static char buf[512];
613
614    if( addr->type == TR_AF_INET )
615        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
616    else
617        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
618    return buf;
619}
620
621const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
622{
623    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
624}
625
626void
627tr_peerIoSetIOFuncs( tr_peerIo        * io,
628                     tr_can_read_cb     readcb,
629                     tr_did_write_cb    writecb,
630                     tr_net_error_cb    errcb,
631                     void             * userData )
632{
633    io->canRead = readcb;
634    io->didWrite = writecb;
635    io->gotError = errcb;
636    io->userData = userData;
637}
638
639void
640tr_peerIoClear( tr_peerIo * io )
641{
642    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
643    tr_peerIoSetEnabled( io, TR_UP, FALSE );
644    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
645}
646
647int
648tr_peerIoReconnect( tr_peerIo * io )
649{
650    short int pendingEvents;
651    tr_session * session;
652
653    assert( tr_isPeerIo( io ) );
654    assert( !tr_peerIoIsIncoming( io ) );
655
656    session = tr_peerIoGetSession( io );
657
658    pendingEvents = io->pendingEvents;
659    event_disable( io, EV_READ | EV_WRITE );
660
661    if( io->socket >= 0 )
662        tr_netClose( session, io->socket );
663
664    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
665    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
666    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
667
668    if( io->socket >= 0 )
669    {
670        event_enable( io, pendingEvents );
671        tr_netSetTOS( io->socket, session->peerSocketTOS );
672        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
673        return 0;
674    }
675
676    return -1;
677}
678
679/**
680***
681**/
682
683void
684tr_peerIoSetTorrentHash( tr_peerIo *     io,
685                         const uint8_t * hash )
686{
687    assert( tr_isPeerIo( io ) );
688
689    tr_cryptoSetTorrentHash( io->crypto, hash );
690}
691
692const uint8_t*
693tr_peerIoGetTorrentHash( tr_peerIo * io )
694{
695    assert( tr_isPeerIo( io ) );
696    assert( io->crypto );
697
698    return tr_cryptoGetTorrentHash( io->crypto );
699}
700
701int
702tr_peerIoHasTorrentHash( const tr_peerIo * io )
703{
704    assert( tr_isPeerIo( io ) );
705    assert( io->crypto );
706
707    return tr_cryptoHasTorrentHash( io->crypto );
708}
709
710/**
711***
712**/
713
714void
715tr_peerIoSetPeersId( tr_peerIo *     io,
716                     const uint8_t * peer_id )
717{
718    assert( tr_isPeerIo( io ) );
719
720    if( ( io->peerIdIsSet = peer_id != NULL ) )
721        memcpy( io->peerId, peer_id, 20 );
722    else
723        memset( io->peerId, 0, 20 );
724}
725
726/**
727***
728**/
729
730static unsigned int
731getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
732{
733    /* this is all kind of arbitrary, but what seems to work well is
734     * being large enough to hold the next 20 seconds' worth of input,
735     * or a few blocks, whichever is bigger.
736     * It's okay to tweak this as needed */
737    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
738    const unsigned int period = 15u; /* arbitrary */
739    /* the 3 is arbitrary; the .5 is to leave room for messages */
740    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
741    return MAX( ceiling, currentSpeed_Bps*period );
742}
743
744size_t
745tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
746{
747    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
748    const size_t currentLen = EVBUFFER_LENGTH( io->outbuf );
749    size_t freeSpace = 0;
750
751    if( desiredLen > currentLen )
752        freeSpace = desiredLen - currentLen;
753
754    return freeSpace;
755}
756
757/**
758***
759**/
760
761void
762tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
763{
764    assert( tr_isPeerIo( io ) );
765    assert( encryptionMode == PEER_ENCRYPTION_NONE
766         || encryptionMode == PEER_ENCRYPTION_RC4 );
767
768    io->encryptionMode = encryptionMode;
769}
770
771/**
772***
773**/
774
775void
776tr_peerIoWrite( tr_peerIo   * io,
777                const void  * bytes,
778                size_t        byteCount,
779                tr_bool       isPieceData )
780{
781    /* FIXME(libevent2): this implementation snould be moved to tr_peerIoWriteBuf.   This function should be implemented as evbuffer_new() + evbuffer_add_reference() + a call to tr_peerIoWriteBuf() + evbuffer_free() */
782    struct tr_datatype * datatype;
783
784    assert( tr_amInEventThread( io->session ) );
785    dbgmsg( io, "adding %zu bytes into io->output", byteCount );
786
787    datatype = tr_new( struct tr_datatype, 1 );
788    datatype->isPieceData = isPieceData != 0;
789    datatype->length = byteCount;
790    tr_list_append( &io->outbuf_datatypes, datatype );
791
792    switch( io->encryptionMode )
793    {
794        case PEER_ENCRYPTION_RC4:
795        {
796            /* FIXME(libevent2): use evbuffer_reserve_space() and evbuffer_commit_space() instead of tmp */
797            void * tmp = tr_sessionGetBuffer( io->session );
798            const size_t tmplen = SESSION_BUFFER_SIZE;
799            const uint8_t * walk = bytes;
800            evbuffer_expand( io->outbuf, byteCount );
801            while( byteCount > 0 )
802            {
803                const size_t thisPass = MIN( byteCount, tmplen );
804                tr_cryptoEncrypt( io->crypto, thisPass, walk, tmp );
805                evbuffer_add( io->outbuf, tmp, thisPass );
806                walk += thisPass;
807                byteCount -= thisPass;
808            }
809            tr_sessionReleaseBuffer( io->session );
810            break;
811        }
812
813        case PEER_ENCRYPTION_NONE:
814            evbuffer_add( io->outbuf, bytes, byteCount );
815            break;
816
817        default:
818            assert( 0 );
819            break;
820    }
821}
822
823void
824tr_peerIoWriteBuf( tr_peerIo         * io,
825                   struct evbuffer   * buf,
826                   tr_bool             isPieceData )
827{
828    /* FIXME(libevent2): loop through calls to evbuffer_get_contiguous_space() + evbuffer_drain() */
829    const size_t n = EVBUFFER_LENGTH( buf );
830    tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
831    evbuffer_drain( buf, n );
832}
833
834void
835tr_peerIoWriteUint16( tr_peerIo        * io,
836                      struct evbuffer  * outbuf,
837                      uint16_t           writeme )
838{
839    const uint16_t tmp = htons( writeme );
840    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint16_t ) );
841}
842
843void
844tr_peerIoWriteUint32( tr_peerIo        * io,
845                      struct evbuffer  * outbuf,
846                      uint32_t           writeme )
847{
848    const uint32_t tmp = htonl( writeme );
849    tr_peerIoWriteBytes( io, outbuf, &tmp, sizeof( uint32_t ) );
850}
851
852/***
853****
854***/
855
856void
857tr_peerIoReadBytes( tr_peerIo       * io,
858                    struct evbuffer * inbuf,
859                    void            * bytes,
860                    size_t            byteCount )
861{
862    assert( tr_isPeerIo( io ) );
863    /* FIXME(libevent2): use evbuffer_get_length() */
864    assert( EVBUFFER_LENGTH( inbuf ) >= byteCount );
865
866    switch( io->encryptionMode )
867    {
868        case PEER_ENCRYPTION_NONE:
869            evbuffer_remove( inbuf, bytes, byteCount );
870            break;
871
872        case PEER_ENCRYPTION_RC4:
873            /* FIXME(libevent2): loop through calls to evbuffer_get_contiguous_space() + evbuffer_drain() */
874            tr_cryptoDecrypt( io->crypto, byteCount, EVBUFFER_DATA(inbuf), bytes );
875            evbuffer_drain(inbuf, byteCount );
876            break;
877
878        default:
879            assert( 0 );
880    }
881}
882
883void
884tr_peerIoReadUint16( tr_peerIo        * io,
885                     struct evbuffer  * inbuf,
886                     uint16_t         * setme )
887{
888    uint16_t tmp;
889    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
890    *setme = ntohs( tmp );
891}
892
893void tr_peerIoReadUint32( tr_peerIo        * io,
894                          struct evbuffer  * inbuf,
895                          uint32_t         * setme )
896{
897    uint32_t tmp;
898    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
899    *setme = ntohl( tmp );
900}
901
902void
903tr_peerIoDrain( tr_peerIo       * io,
904                struct evbuffer * inbuf,
905                size_t            byteCount )
906{
907    void * buf = tr_sessionGetBuffer( io->session );
908    const size_t buflen = SESSION_BUFFER_SIZE;
909
910    while( byteCount > 0 )
911    {
912        const size_t thisPass = MIN( byteCount, buflen );
913        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
914        byteCount -= thisPass;
915    }
916
917    tr_sessionReleaseBuffer( io->session );
918}
919
920/***
921****
922***/
923
924static int
925tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
926{
927    int res = 0;
928
929    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
930    {
931        int e;
932
933        EVUTIL_SET_SOCKET_ERROR( 0 );
934        res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
935        e = EVUTIL_SOCKET_ERROR( );
936
937        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
938
939        if( EVBUFFER_LENGTH( io->inbuf ) )
940            canReadWrapper( io );
941
942        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
943        {
944            char errstr[512];
945            short what = EVBUFFER_READ | EVBUFFER_ERROR;
946            if( res == 0 )
947                what |= EVBUFFER_EOF;
948            tr_net_strerror( errstr, sizeof( errstr ), e );
949            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
950            io->gotError( io, what, io->userData );
951        }
952    }
953
954    return res;
955}
956
957static int
958tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
959{
960    int n = 0;
961
962    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
963    {
964        int e;
965        EVUTIL_SET_SOCKET_ERROR( 0 );
966        n = tr_evbuffer_write( io, io->socket, howmuch );
967        e = EVUTIL_SOCKET_ERROR( );
968
969        if( n > 0 )
970            didWriteWrapper( io, n );
971
972        if( ( n < 0 ) && ( io->gotError ) && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
973        {
974            char errstr[512];
975            const short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
976
977            tr_net_strerror( errstr, sizeof( errstr ), e );
978            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
979
980            if( io->gotError != NULL )
981                io->gotError( io, what, io->userData );
982        }
983    }
984
985    return n;
986}
987
988int
989tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
990{
991    int bytesUsed = 0;
992
993    assert( tr_isPeerIo( io ) );
994    assert( tr_isDirection( dir ) );
995
996    if( io->hasFinishedConnecting )
997    {
998        if( dir == TR_DOWN )
999            bytesUsed = tr_peerIoTryRead( io, limit );
1000        else
1001            bytesUsed = tr_peerIoTryWrite( io, limit );
1002    }
1003
1004    dbgmsg( io, "flushing peer-io, hasFinishedConnecting %d, direction %d, limit %zu, bytesUsed %d", (int)io->hasFinishedConnecting, (int)dir, limit, bytesUsed );
1005    return bytesUsed;
1006}
1007
1008int
1009tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1010{
1011    size_t byteCount = 0;
1012    tr_list * it;
1013
1014    /* count up how many bytes are used by non-piece-data messages
1015       at the front of our outbound queue */
1016    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1017    {
1018        struct tr_datatype * d = it->data;
1019
1020        if( d->isPieceData )
1021            break;
1022
1023        byteCount += d->length;
1024    }
1025
1026    return tr_peerIoFlush( io, TR_UP, byteCount );
1027}
Note: See TracBrowser for help on using the repository browser.