source: trunk/libtransmission/peer-io.c @ 11908

Last change on this file since 11908 was 11908, checked in by jch, 11 years ago

First pass at uTP callbacks.

  • Property svn:keywords set to Date Rev Author Id
File size: 28.5 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11908 2011-02-18 00:24:13Z jch $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28#include "utp.h"
29
30#include "transmission.h"
31#include "session.h"
32#include "bandwidth.h"
33#include "crypto.h"
34#include "list.h"
35#include "net.h"
36#include "peer-common.h" /* MAX_BLOCK_SIZE */
37#include "peer-io.h"
38#include "trevent.h" /* tr_runInEventThread() */
39#include "utils.h"
40
41#define MAGIC_NUMBER 206745
42
43#ifdef WIN32
44 #define EAGAIN       WSAEWOULDBLOCK
45 #define EINTR        WSAEINTR
46 #define EINPROGRESS  WSAEINPROGRESS
47 #define EPIPE        WSAECONNRESET
48#endif
49
50static size_t
51guessPacketOverhead( size_t d )
52{
53    /**
54     * http://sd.wareonearth.com/~phil/net/overhead/
55     *
56     * TCP over Ethernet:
57     * Assuming no header compression (e.g. not PPP)
58     * Add 20 IPv4 header or 40 IPv6 header (no options)
59     * Add 20 TCP header
60     * Add 12 bytes optional TCP timestamps
61     * Max TCP Payload data rates over ethernet are thus:
62     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
63     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
64     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
65     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
66     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
67     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
68     */
69    const double assumed_payload_data_rate = 94.0;
70
71    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
72}
73
74/**
75***
76**/
77
78#define dbgmsg( io, ... ) \
79    do { \
80        if( tr_deepLoggingIsActive( ) ) \
81            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
82    } while( 0 )
83
84struct tr_datatype
85{
86    tr_bool  isPieceData;
87    size_t   length;
88};
89
90
91/***
92****
93***/
94
95static void
96didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
97{
98     while( bytes_transferred && tr_isPeerIo( io ) )
99     {
100        struct tr_datatype * next = io->outbuf_datatypes->data;
101
102        const unsigned int payload = MIN( next->length, bytes_transferred );
103        const unsigned int overhead = guessPacketOverhead( payload );
104        const uint64_t now = tr_sessionGetTimeMsec( io->session );
105
106        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
107
108        if( overhead > 0 )
109            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
110
111        if( io->didWrite )
112            io->didWrite( io, payload, next->isPieceData, io->userData );
113
114        if( tr_isPeerIo( io ) )
115        {
116            bytes_transferred -= payload;
117            next->length -= payload;
118            if( !next->length ) {
119                tr_list_pop_front( &io->outbuf_datatypes );
120                tr_free( next );
121            }
122        }
123    }
124}
125
126static void
127canReadWrapper( tr_peerIo * io )
128{
129    tr_bool err = 0;
130    tr_bool done = 0;
131    tr_session * session;
132
133    dbgmsg( io, "canRead" );
134
135    assert( tr_isPeerIo( io ) );
136    assert( tr_isSession( io->session ) );
137    tr_peerIoRef( io );
138
139    session = io->session;
140
141    /* try to consume the input buffer */
142    if( io->canRead )
143    {
144        const uint64_t now = tr_sessionGetTimeMsec( io->session );
145
146        tr_sessionLock( session );
147
148        while( !done && !err )
149        {
150            size_t piece = 0;
151            const size_t oldLen = evbuffer_get_length( io->inbuf );
152            const int ret = io->canRead( io, io->userData, &piece );
153            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
154            const unsigned int overhead = guessPacketOverhead( used );
155
156            assert( tr_isPeerIo( io ) );
157
158            if( piece || (piece!=used) )
159            {
160                if( piece )
161                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
162
163                if( used != piece )
164                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
165            }
166
167            if( overhead > 0 )
168                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
169
170            switch( ret )
171            {
172                case READ_NOW:
173                    if( evbuffer_get_length( io->inbuf ) )
174                        continue;
175                    done = 1;
176                    break;
177
178                case READ_LATER:
179                    done = 1;
180                    break;
181
182                case READ_ERR:
183                    err = 1;
184                    break;
185            }
186
187            assert( tr_isPeerIo( io ) );
188        }
189
190        tr_sessionUnlock( session );
191    }
192
193    assert( tr_isPeerIo( io ) );
194    tr_peerIoUnref( io );
195}
196
197tr_bool
198tr_isPeerIo( const tr_peerIo * io )
199{
200    return ( io != NULL )
201        && ( io->magicNumber == MAGIC_NUMBER )
202        && ( io->refCount >= 0 )
203        && ( tr_isBandwidth( &io->bandwidth ) )
204        && ( tr_isAddress( &io->addr ) );
205}
206
207static void
208event_read_cb( int fd, short event UNUSED, void * vio )
209{
210    int res;
211    int e;
212    tr_peerIo * io = vio;
213
214    /* Limit the input buffer to 256K, so it doesn't grow too large */
215    unsigned int howmuch;
216    unsigned int curlen;
217    const tr_direction dir = TR_DOWN;
218    const unsigned int max = 256 * 1024;
219
220    assert( tr_isPeerIo( io ) );
221
222    io->pendingEvents &= ~EV_READ;
223
224    curlen = evbuffer_get_length( io->inbuf );
225    howmuch = curlen >= max ? 0 : max - curlen;
226    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
227
228    dbgmsg( io, "libevent says this peer is ready to read" );
229
230    /* if we don't have any bandwidth left, stop reading */
231    if( howmuch < 1 ) {
232        tr_peerIoSetEnabled( io, dir, FALSE );
233        return;
234    }
235
236    EVUTIL_SET_SOCKET_ERROR( 0 );
237    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
238    e = EVUTIL_SOCKET_ERROR( );
239
240    if( res > 0 )
241    {
242        tr_peerIoSetEnabled( io, dir, TRUE );
243
244        /* Invoke the user callback - must always be called last */
245        canReadWrapper( io );
246    }
247    else
248    {
249        char errstr[512];
250        short what = BEV_EVENT_READING;
251
252        if( res == 0 ) /* EOF */
253            what |= BEV_EVENT_EOF;
254        else if( res == -1 ) {
255            if( e == EAGAIN || e == EINTR ) {
256                tr_peerIoSetEnabled( io, dir, TRUE );
257                return;
258            }
259            what |= BEV_EVENT_ERROR;
260        }
261
262        tr_net_strerror( errstr, sizeof( errstr ), e );
263        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
264
265        if( io->gotError != NULL )
266            io->gotError( io, what, io->userData );
267    }
268}
269
270static int
271tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
272{
273    int e;
274    int n;
275    char errstr[256];
276
277    EVUTIL_SET_SOCKET_ERROR( 0 );
278    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
279    e = EVUTIL_SOCKET_ERROR( );
280    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
281
282    return n;
283}
284
285static void
286event_write_cb( int fd, short event UNUSED, void * vio )
287{
288    int res = 0;
289    int e;
290    short what = BEV_EVENT_WRITING;
291    tr_peerIo * io = vio;
292    size_t howmuch;
293    const tr_direction dir = TR_UP;
294
295    assert( tr_isPeerIo( io ) );
296
297    io->pendingEvents &= ~EV_WRITE;
298
299    dbgmsg( io, "libevent says this peer is ready to write" );
300
301    /* Write as much as possible, since the socket is non-blocking, write() will
302     * return if it can't write any more data without blocking */
303    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
304
305    /* if we don't have any bandwidth left, stop writing */
306    if( howmuch < 1 ) {
307        tr_peerIoSetEnabled( io, dir, FALSE );
308        return;
309    }
310
311    EVUTIL_SET_SOCKET_ERROR( 0 );
312    res = tr_evbuffer_write( io, fd, howmuch );
313    e = EVUTIL_SOCKET_ERROR( );
314
315    if (res == -1) {
316        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
317            goto reschedule;
318        /* error case */
319        what |= BEV_EVENT_ERROR;
320    } else if (res == 0) {
321        /* eof case */
322        what |= BEV_EVENT_EOF;
323    }
324    if (res <= 0)
325        goto error;
326
327    if( evbuffer_get_length( io->outbuf ) )
328        tr_peerIoSetEnabled( io, dir, TRUE );
329
330    didWriteWrapper( io, res );
331    return;
332
333 reschedule:
334    if( evbuffer_get_length( io->outbuf ) )
335        tr_peerIoSetEnabled( io, dir, TRUE );
336    return;
337
338 error:
339
340    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
341
342    if( io->gotError != NULL )
343        io->gotError( io, what, io->userData );
344}
345
346/**
347***
348**/
349
350static void
351maybeSetCongestionAlgorithm( int socket, const char * algorithm )
352{
353    if( algorithm && *algorithm )
354    {
355        const int rc = tr_netSetCongestionControl( socket, algorithm );
356
357        if( rc < 0 )
358            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
359                     algorithm, tr_strerror( errno ));
360    }
361}
362
363/* UTP callbacks */
364
365static void
366utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
367{
368    tr_peerIo *io = (tr_peerIo *)closure;
369    assert( tr_isPeerIo( io ) );
370    tr_ndbg( "UTP", "On read: %ld", (long)buflen );
371}
372
373static void
374utp_on_write(void *closure, unsigned char *buf, size_t buflen)
375{
376    tr_peerIo *io = (tr_peerIo *)closure;
377    assert( tr_isPeerIo( io ) );
378    tr_ndbg( "UTP", "On write: %ld", (long)buflen );
379}
380
381static size_t
382utp_get_rb_size(void *closure)
383{
384    tr_peerIo *io = (tr_peerIo *)closure;
385    assert( tr_isPeerIo( io ) );
386
387    tr_ndbg( "UTP", "Get RB size" );
388    return 0;
389}
390
391static void
392utp_on_state_change(void *closure, int state)
393{
394    tr_peerIo *io;
395    /* This can be called after UTP_Close, in which case closure can point
396       to an already-destroyed peerIo. */
397    if( state == UTP_STATE_DESTROYING ) {
398        tr_ndbg( "UTP", "Connection destroyed" );
399        return;
400    }
401
402    io = (tr_peerIo *)closure;
403    assert( tr_isPeerIo( io ) );
404
405    tr_ndbg( "UTP", "On state change: %d", state );
406}
407
408static void
409utp_on_error(void *closure, int errcode)
410{
411    tr_peerIo *io = (tr_peerIo *)closure;
412    assert( tr_isPeerIo( io ) );
413
414    tr_ndbg( "UTP", "Error callback: %s", tr_strerror( errcode ) );
415}
416
417static void
418utp_on_overhead(void *closure, bool send, size_t count, int type)
419{
420    tr_peerIo *io = (tr_peerIo *)closure;
421    assert( tr_isPeerIo( io ) );
422
423    tr_ndbg( "UTP", "On overhead: %d %ld %d", (int)send, (long)count, type );
424}
425
426static struct UTPFunctionTable utp_function_table = {
427    .on_read = utp_on_read,
428    .on_write = utp_on_write,
429    .get_rb_size = utp_get_rb_size,
430    .on_state = utp_on_state_change,
431    .on_error = utp_on_error,
432    .on_overhead = utp_on_overhead
433};
434
435static tr_peerIo*
436tr_peerIoNew( tr_session       * session,
437              tr_bandwidth     * parent,
438              const tr_address * addr,
439              tr_port            port,
440              const uint8_t    * torrentHash,
441              tr_bool            isIncoming,
442              tr_bool            isSeed,
443              int                socket,
444              struct UTPSocket * utp_socket)
445{
446    tr_peerIo * io;
447
448    assert( session != NULL );
449    assert( session->events != NULL );
450    assert( tr_isBool( isIncoming ) );
451    assert( tr_isBool( isSeed ) );
452    assert( tr_amInEventThread( session ) );
453    assert( (socket < 0) == (utp_socket != NULL) );
454
455    if( socket >= 0 ) {
456        tr_netSetTOS( socket, session->peerSocketTOS );
457        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
458    }
459
460    io = tr_new0( tr_peerIo, 1 );
461    io->magicNumber = MAGIC_NUMBER;
462    io->refCount = 1;
463    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
464    io->session = session;
465    io->addr = *addr;
466    io->isSeed = isSeed;
467    io->port = port;
468    io->socket = socket;
469    io->utp_socket = utp_socket;
470    io->isIncoming = isIncoming != 0;
471    io->timeCreated = tr_time( );
472    io->inbuf = evbuffer_new( );
473    io->outbuf = evbuffer_new( );
474    tr_bandwidthConstruct( &io->bandwidth, session, parent );
475    tr_bandwidthSetPeer( &io->bandwidth, io );
476    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
477
478    if( io->socket >= 0 ) {
479        io->event_read = event_new( session->event_base,
480                                    io->socket, EV_READ, event_read_cb, io );
481        io->event_write = event_new( session->event_base,
482                                     io->socket, EV_WRITE, event_write_cb, io );
483    } else {
484        tr_ndbg( "UTP", "New %s connection",
485                 isIncoming ? "incoming" : "outgoing" );
486        UTP_SetCallbacks( utp_socket,
487                          &utp_function_table,
488                          io );
489    }
490
491    return io;
492}
493
494tr_peerIo*
495tr_peerIoNewIncoming( tr_session        * session,
496                      tr_bandwidth      * parent,
497                      const tr_address  * addr,
498                      tr_port             port,
499                      int                 fd,
500                      struct UTPSocket  * utp_socket )
501{
502    assert( session );
503    assert( tr_isAddress( addr ) );
504
505    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, 
506                         fd, utp_socket );
507}
508
509tr_peerIo*
510tr_peerIoNewOutgoing( tr_session        * session,
511                      tr_bandwidth      * parent,
512                      const tr_address  * addr,
513                      tr_port             port,
514                      const uint8_t     * torrentHash,
515                      tr_bool             isSeed )
516{
517    int fd;
518
519    assert( session );
520    assert( tr_isAddress( addr ) );
521    assert( torrentHash );
522
523    fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
524    dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
525
526    return fd < 0 ? NULL
527                  : tr_peerIoNew( session, parent, addr, port,
528                                  torrentHash, FALSE, isSeed, fd, NULL );
529}
530
531/***
532****
533***/
534
535static void
536event_enable( tr_peerIo * io, short event )
537{
538    assert( tr_amInEventThread( io->session ) );
539    assert( io->session != NULL );
540    assert( io->session->events != NULL );
541
542    if( io->socket < 0 )
543        return;
544
545    assert( io->session->events != NULL );
546    assert( event_initialized( io->event_read ) );
547    assert( event_initialized( io->event_write ) );
548
549    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
550    {
551        dbgmsg( io, "enabling libevent ready-to-read polling" );
552        event_add( io->event_read, NULL );
553        io->pendingEvents |= EV_READ;
554    }
555
556    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
557    {
558        dbgmsg( io, "enabling libevent ready-to-write polling" );
559        event_add( io->event_write, NULL );
560        io->pendingEvents |= EV_WRITE;
561    }
562}
563
564static void
565event_disable( struct tr_peerIo * io, short event )
566{
567    assert( tr_amInEventThread( io->session ) );
568    assert( io->session != NULL );
569
570    if( io->socket < 0 )
571        return;
572
573    assert( io->session->events != NULL );
574    assert( event_initialized( io->event_read ) );
575    assert( event_initialized( io->event_write ) );
576
577    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
578    {
579        dbgmsg( io, "disabling libevent ready-to-read polling" );
580        event_del( io->event_read );
581        io->pendingEvents &= ~EV_READ;
582    }
583
584    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
585    {
586        dbgmsg( io, "disabling libevent ready-to-write polling" );
587        event_del( io->event_write );
588        io->pendingEvents &= ~EV_WRITE;
589    }
590}
591
592void
593tr_peerIoSetEnabled( tr_peerIo    * io,
594                     tr_direction   dir,
595                     tr_bool        isEnabled )
596{
597    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
598
599    assert( tr_isPeerIo( io ) );
600    assert( tr_isDirection( dir ) );
601    assert( tr_amInEventThread( io->session ) );
602    assert( io->session->events != NULL );
603
604    if( isEnabled )
605        event_enable( io, event );
606    else
607        event_disable( io, event );
608}
609
610/***
611****
612***/
613
614static void
615io_dtor( void * vio )
616{
617    tr_peerIo * io = vio;
618
619    assert( tr_isPeerIo( io ) );
620    assert( tr_amInEventThread( io->session ) );
621    assert( io->session->events != NULL );
622
623    dbgmsg( io, "in tr_peerIo destructor" );
624    event_disable( io, EV_READ | EV_WRITE );
625    tr_bandwidthDestruct( &io->bandwidth );
626    evbuffer_free( io->outbuf );
627    evbuffer_free( io->inbuf );
628    if( io->socket >= 0 ) {
629        event_free( io->event_read );
630        event_free( io->event_write );
631        tr_netClose( io->session, io->socket );
632    }
633    if( io->utp_socket != NULL ) {
634        tr_ndbg( "UTP", "Destroying connection");
635        UTP_Close( io->utp_socket );
636    }
637    tr_cryptoFree( io->crypto );
638    tr_list_free( &io->outbuf_datatypes, tr_free );
639
640    memset( io, ~0, sizeof( tr_peerIo ) );
641    tr_free( io );
642}
643
644static void
645tr_peerIoFree( tr_peerIo * io )
646{
647    if( io )
648    {
649        dbgmsg( io, "in tr_peerIoFree" );
650        io->canRead = NULL;
651        io->didWrite = NULL;
652        io->gotError = NULL;
653        tr_runInEventThread( io->session, io_dtor, io );
654    }
655}
656
657void
658tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
659{
660    assert( tr_isPeerIo( io ) );
661
662    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
663                file, line, io->refCount, io->refCount+1 );
664
665    ++io->refCount;
666}
667
668void
669tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
670{
671    assert( tr_isPeerIo( io ) );
672
673    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
674                file, line, io->refCount, io->refCount-1 );
675
676    if( !--io->refCount )
677        tr_peerIoFree( io );
678}
679
680const tr_address*
681tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
682{
683    assert( tr_isPeerIo( io ) );
684
685    if( port )
686        *port = io->port;
687
688    return &io->addr;
689}
690
691const char*
692tr_peerIoAddrStr( const tr_address * addr, tr_port port )
693{
694    static char buf[512];
695
696    if( addr->type == TR_AF_INET )
697        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
698    else
699        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
700    return buf;
701}
702
703const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
704{
705    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
706}
707
708void
709tr_peerIoSetIOFuncs( tr_peerIo        * io,
710                     tr_can_read_cb     readcb,
711                     tr_did_write_cb    writecb,
712                     tr_net_error_cb    errcb,
713                     void             * userData )
714{
715    io->canRead = readcb;
716    io->didWrite = writecb;
717    io->gotError = errcb;
718    io->userData = userData;
719}
720
721void
722tr_peerIoClear( tr_peerIo * io )
723{
724    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
725    tr_peerIoSetEnabled( io, TR_UP, FALSE );
726    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
727}
728
729int
730tr_peerIoReconnect( tr_peerIo * io )
731{
732    short int pendingEvents;
733    tr_session * session;
734
735    assert( tr_isPeerIo( io ) );
736    assert( !tr_peerIoIsIncoming( io ) );
737
738    session = tr_peerIoGetSession( io );
739
740    pendingEvents = io->pendingEvents;
741    event_disable( io, EV_READ | EV_WRITE );
742
743    if( io->socket >= 0 ) {
744        tr_netClose( session, io->socket );
745        io->socket = -1;
746    }
747    if( io->utp_socket != NULL ) {
748        UTP_Close(io->utp_socket);
749        io->utp_socket = NULL;
750    }
751
752    event_free( io->event_read );
753    event_free( io->event_write );
754    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
755    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
756    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
757
758    if( io->socket >= 0 )
759    {
760        event_enable( io, pendingEvents );
761        tr_netSetTOS( io->socket, session->peerSocketTOS );
762        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
763        return 0;
764    }
765
766    return -1;
767}
768
769/**
770***
771**/
772
773void
774tr_peerIoSetTorrentHash( tr_peerIo *     io,
775                         const uint8_t * hash )
776{
777    assert( tr_isPeerIo( io ) );
778
779    tr_cryptoSetTorrentHash( io->crypto, hash );
780}
781
782const uint8_t*
783tr_peerIoGetTorrentHash( tr_peerIo * io )
784{
785    assert( tr_isPeerIo( io ) );
786    assert( io->crypto );
787
788    return tr_cryptoGetTorrentHash( io->crypto );
789}
790
791int
792tr_peerIoHasTorrentHash( const tr_peerIo * io )
793{
794    assert( tr_isPeerIo( io ) );
795    assert( io->crypto );
796
797    return tr_cryptoHasTorrentHash( io->crypto );
798}
799
800/**
801***
802**/
803
804void
805tr_peerIoSetPeersId( tr_peerIo *     io,
806                     const uint8_t * peer_id )
807{
808    assert( tr_isPeerIo( io ) );
809
810    if( ( io->peerIdIsSet = peer_id != NULL ) )
811        memcpy( io->peerId, peer_id, 20 );
812    else
813        memset( io->peerId, 0, 20 );
814}
815
816/**
817***
818**/
819
820static unsigned int
821getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
822{
823    /* this is all kind of arbitrary, but what seems to work well is
824     * being large enough to hold the next 20 seconds' worth of input,
825     * or a few blocks, whichever is bigger.
826     * It's okay to tweak this as needed */
827    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
828    const unsigned int period = 15u; /* arbitrary */
829    /* the 3 is arbitrary; the .5 is to leave room for messages */
830    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
831    return MAX( ceiling, currentSpeed_Bps*period );
832}
833
834size_t
835tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
836{
837    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
838    const size_t currentLen = evbuffer_get_length( io->outbuf );
839    size_t freeSpace = 0;
840
841    if( desiredLen > currentLen )
842        freeSpace = desiredLen - currentLen;
843
844    return freeSpace;
845}
846
847/**
848***
849**/
850
851void
852tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
853{
854    assert( tr_isPeerIo( io ) );
855    assert( encryptionMode == PEER_ENCRYPTION_NONE
856         || encryptionMode == PEER_ENCRYPTION_RC4 );
857
858    io->encryptionMode = encryptionMode;
859}
860
861/**
862***
863**/
864
865static void
866addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
867{
868    struct tr_datatype * d;
869
870    d = tr_new( struct tr_datatype, 1 );
871    d->isPieceData = isPieceData != 0;
872    d->length = byteCount;
873    tr_list_append( &io->outbuf_datatypes, d );
874}
875
876static struct evbuffer_iovec *
877evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
878{
879    const size_t byteCount = evbuffer_get_length( buf );
880    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
881    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
882    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
883    assert( n == vecCount );
884    *setme_vecCount = n;
885    return iovec;
886}
887
888static void
889maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
890{
891    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
892    {
893        size_t i, n;
894        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
895
896        for( i=0; i<n; ++i )
897            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
898
899        tr_free( iovec );
900    }
901}
902
903void
904tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
905{
906    const size_t byteCount = evbuffer_get_length( buf );
907    maybeEncryptBuffer( io, buf );
908    evbuffer_add_buffer( io->outbuf, buf );
909    addDatatype( io, byteCount, isPieceData );
910}
911
912void
913tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
914{
915    struct evbuffer * buf = evbuffer_new( );
916    evbuffer_add( buf, bytes, byteCount );
917    tr_peerIoWriteBuf( io, buf, isPieceData );
918    evbuffer_free( buf );
919}
920
921/***
922****
923***/
924
925void
926evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
927{
928    evbuffer_add( outbuf, &byte, 1 );
929}
930
931void
932evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
933{
934    const uint16_t ns = htons( addme_hs );
935    evbuffer_add( outbuf, &ns, sizeof( ns ) );
936}
937
938void
939evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
940{
941    const uint32_t nl = htonl( addme_hl );
942    evbuffer_add( outbuf, &nl, sizeof( nl ) );
943}
944
945/***
946****
947***/
948
949void
950tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
951{
952    assert( tr_isPeerIo( io ) );
953    assert( evbuffer_get_length( inbuf )  >= byteCount );
954
955    switch( io->encryptionMode )
956    {
957        case PEER_ENCRYPTION_NONE:
958            evbuffer_remove( inbuf, bytes, byteCount );
959            break;
960
961        case PEER_ENCRYPTION_RC4:
962            evbuffer_remove( inbuf, bytes, byteCount );
963            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
964            break;
965
966        default:
967            assert( 0 );
968    }
969}
970
971void
972tr_peerIoReadUint16( tr_peerIo        * io,
973                     struct evbuffer  * inbuf,
974                     uint16_t         * setme )
975{
976    uint16_t tmp;
977    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
978    *setme = ntohs( tmp );
979}
980
981void tr_peerIoReadUint32( tr_peerIo        * io,
982                          struct evbuffer  * inbuf,
983                          uint32_t         * setme )
984{
985    uint32_t tmp;
986    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
987    *setme = ntohl( tmp );
988}
989
990void
991tr_peerIoDrain( tr_peerIo       * io,
992                struct evbuffer * inbuf,
993                size_t            byteCount )
994{
995    void * buf = tr_sessionGetBuffer( io->session );
996    const size_t buflen = SESSION_BUFFER_SIZE;
997
998    while( byteCount > 0 )
999    {
1000        const size_t thisPass = MIN( byteCount, buflen );
1001        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1002        byteCount -= thisPass;
1003    }
1004
1005    tr_sessionReleaseBuffer( io->session );
1006}
1007
1008/***
1009****
1010***/
1011
1012static int
1013tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1014{
1015    int res = 0;
1016
1017    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1018    {
1019        int e;
1020
1021        EVUTIL_SET_SOCKET_ERROR( 0 );
1022        res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1023        e = EVUTIL_SOCKET_ERROR( );
1024
1025        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1026
1027        if( evbuffer_get_length( io->inbuf ) )
1028            canReadWrapper( io );
1029
1030        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1031        {
1032            char errstr[512];
1033            short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1034            if( res == 0 )
1035                what |= BEV_EVENT_EOF;
1036            tr_net_strerror( errstr, sizeof( errstr ), e );
1037            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1038            io->gotError( io, what, io->userData );
1039        }
1040    }
1041
1042    return res;
1043}
1044
1045static int
1046tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1047{
1048    int n = 0;
1049
1050    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1051    {
1052        int e;
1053        EVUTIL_SET_SOCKET_ERROR( 0 );
1054        n = tr_evbuffer_write( io, io->socket, howmuch );
1055        e = EVUTIL_SOCKET_ERROR( );
1056
1057        if( n > 0 )
1058            didWriteWrapper( io, n );
1059
1060        if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1061        {
1062            char errstr[512];
1063            const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1064
1065            tr_net_strerror( errstr, sizeof( errstr ), e );
1066            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1067
1068            if( io->gotError != NULL )
1069                io->gotError( io, what, io->userData );
1070        }
1071    }
1072
1073    return n;
1074}
1075
1076int
1077tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1078{
1079    int bytesUsed = 0;
1080
1081    assert( tr_isPeerIo( io ) );
1082    assert( tr_isDirection( dir ) );
1083
1084    if( dir == TR_DOWN )
1085        bytesUsed = tr_peerIoTryRead( io, limit );
1086    else
1087        bytesUsed = tr_peerIoTryWrite( io, limit );
1088
1089    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1090    return bytesUsed;
1091}
1092
1093int
1094tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1095{
1096    size_t byteCount = 0;
1097    tr_list * it;
1098
1099    /* count up how many bytes are used by non-piece-data messages
1100       at the front of our outbound queue */
1101    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1102    {
1103        struct tr_datatype * d = it->data;
1104
1105        if( d->isPieceData )
1106            break;
1107
1108        byteCount += d->length;
1109    }
1110
1111    return tr_peerIoFlush( io, TR_UP, byteCount );
1112}
Note: See TracBrowser for help on using the repository browser.