source: trunk/libtransmission/peer-io.c @ 11910

Last change on this file since 11910 was 11910, checked in by jch, 11 years ago

Add dummy UTP callbacks.

Libutp will sometimes call our callbacks after we called UTP_Close,
notably to notify us of the UTP_STATE_DESTROYING state change, but
also, for some reason, to ask us about our read buffer. The simplest
way to avoid issues with that is to switch to a second set of callbacks.

  • Property svn:keywords set to Date Rev Author Id
File size: 29.6 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11910 2011-02-18 00:24:18Z jch $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28#include "utp.h"
29
30#include "transmission.h"
31#include "session.h"
32#include "bandwidth.h"
33#include "crypto.h"
34#include "list.h"
35#include "net.h"
36#include "peer-common.h" /* MAX_BLOCK_SIZE */
37#include "peer-io.h"
38#include "trevent.h" /* tr_runInEventThread() */
39#include "utils.h"
40
41#define MAGIC_NUMBER 206745
42
43#ifdef WIN32
44 #define EAGAIN       WSAEWOULDBLOCK
45 #define EINTR        WSAEINTR
46 #define EINPROGRESS  WSAEINPROGRESS
47 #define EPIPE        WSAECONNRESET
48#endif
49
50static size_t
51guessPacketOverhead( size_t d )
52{
53    /**
54     * http://sd.wareonearth.com/~phil/net/overhead/
55     *
56     * TCP over Ethernet:
57     * Assuming no header compression (e.g. not PPP)
58     * Add 20 IPv4 header or 40 IPv6 header (no options)
59     * Add 20 TCP header
60     * Add 12 bytes optional TCP timestamps
61     * Max TCP Payload data rates over ethernet are thus:
62     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
63     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
64     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
65     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
66     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
67     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
68     */
69    const double assumed_payload_data_rate = 94.0;
70
71    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
72}
73
74/**
75***
76**/
77
78#define dbgmsg( io, ... ) \
79    do { \
80        if( tr_deepLoggingIsActive( ) ) \
81            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
82    } while( 0 )
83
84struct tr_datatype
85{
86    tr_bool  isPieceData;
87    size_t   length;
88};
89
90
91/***
92****
93***/
94
95static void
96didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
97{
98     while( bytes_transferred && tr_isPeerIo( io ) )
99     {
100        struct tr_datatype * next = io->outbuf_datatypes->data;
101
102        const unsigned int payload = MIN( next->length, bytes_transferred );
103        const unsigned int overhead = guessPacketOverhead( payload );
104        const uint64_t now = tr_sessionGetTimeMsec( io->session );
105
106        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
107
108        if( overhead > 0 )
109            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
110
111        if( io->didWrite )
112            io->didWrite( io, payload, next->isPieceData, io->userData );
113
114        if( tr_isPeerIo( io ) )
115        {
116            bytes_transferred -= payload;
117            next->length -= payload;
118            if( !next->length ) {
119                tr_list_pop_front( &io->outbuf_datatypes );
120                tr_free( next );
121            }
122        }
123    }
124}
125
126static void
127canReadWrapper( tr_peerIo * io )
128{
129    tr_bool err = 0;
130    tr_bool done = 0;
131    tr_session * session;
132
133    dbgmsg( io, "canRead" );
134
135    assert( tr_isPeerIo( io ) );
136    assert( tr_isSession( io->session ) );
137    tr_peerIoRef( io );
138
139    session = io->session;
140
141    /* try to consume the input buffer */
142    if( io->canRead )
143    {
144        const uint64_t now = tr_sessionGetTimeMsec( io->session );
145
146        tr_sessionLock( session );
147
148        while( !done && !err )
149        {
150            size_t piece = 0;
151            const size_t oldLen = evbuffer_get_length( io->inbuf );
152            const int ret = io->canRead( io, io->userData, &piece );
153            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
154            const unsigned int overhead = guessPacketOverhead( used );
155
156            assert( tr_isPeerIo( io ) );
157
158            if( piece || (piece!=used) )
159            {
160                if( piece )
161                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
162
163                if( used != piece )
164                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
165            }
166
167            if( overhead > 0 )
168                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
169
170            switch( ret )
171            {
172                case READ_NOW:
173                    if( evbuffer_get_length( io->inbuf ) )
174                        continue;
175                    done = 1;
176                    break;
177
178                case READ_LATER:
179                    done = 1;
180                    break;
181
182                case READ_ERR:
183                    err = 1;
184                    break;
185            }
186
187            assert( tr_isPeerIo( io ) );
188        }
189
190        tr_sessionUnlock( session );
191    }
192
193    assert( tr_isPeerIo( io ) );
194    tr_peerIoUnref( io );
195}
196
197tr_bool
198tr_isPeerIo( const tr_peerIo * io )
199{
200    return ( io != NULL )
201        && ( io->magicNumber == MAGIC_NUMBER )
202        && ( io->refCount >= 0 )
203        && ( tr_isBandwidth( &io->bandwidth ) )
204        && ( tr_isAddress( &io->addr ) );
205}
206
207static void
208event_read_cb( int fd, short event UNUSED, void * vio )
209{
210    int res;
211    int e;
212    tr_peerIo * io = vio;
213
214    /* Limit the input buffer to 256K, so it doesn't grow too large */
215    unsigned int howmuch;
216    unsigned int curlen;
217    const tr_direction dir = TR_DOWN;
218    const unsigned int max = 256 * 1024;
219
220    assert( tr_isPeerIo( io ) );
221    assert( io->socket >= 0 );
222
223    io->pendingEvents &= ~EV_READ;
224
225    curlen = evbuffer_get_length( io->inbuf );
226    howmuch = curlen >= max ? 0 : max - curlen;
227    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
228
229    dbgmsg( io, "libevent says this peer is ready to read" );
230
231    /* if we don't have any bandwidth left, stop reading */
232    if( howmuch < 1 ) {
233        tr_peerIoSetEnabled( io, dir, FALSE );
234        return;
235    }
236
237    EVUTIL_SET_SOCKET_ERROR( 0 );
238    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
239    e = EVUTIL_SOCKET_ERROR( );
240
241    if( res > 0 )
242    {
243        tr_peerIoSetEnabled( io, dir, TRUE );
244
245        /* Invoke the user callback - must always be called last */
246        canReadWrapper( io );
247    }
248    else
249    {
250        char errstr[512];
251        short what = BEV_EVENT_READING;
252
253        if( res == 0 ) /* EOF */
254            what |= BEV_EVENT_EOF;
255        else if( res == -1 ) {
256            if( e == EAGAIN || e == EINTR ) {
257                tr_peerIoSetEnabled( io, dir, TRUE );
258                return;
259            }
260            what |= BEV_EVENT_ERROR;
261        }
262
263        tr_net_strerror( errstr, sizeof( errstr ), e );
264        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
265
266        if( io->gotError != NULL )
267            io->gotError( io, what, io->userData );
268    }
269}
270
271static int
272tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
273{
274    int e;
275    int n;
276    char errstr[256];
277
278    EVUTIL_SET_SOCKET_ERROR( 0 );
279    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
280    e = EVUTIL_SOCKET_ERROR( );
281    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
282
283    return n;
284}
285
286static void
287event_write_cb( int fd, short event UNUSED, void * vio )
288{
289    int res = 0;
290    int e;
291    short what = BEV_EVENT_WRITING;
292    tr_peerIo * io = vio;
293    size_t howmuch;
294    const tr_direction dir = TR_UP;
295
296    assert( tr_isPeerIo( io ) );
297    assert( io->socket >= 0 );
298
299    io->pendingEvents &= ~EV_WRITE;
300
301    dbgmsg( io, "libevent says this peer is ready to write" );
302
303    /* Write as much as possible, since the socket is non-blocking, write() will
304     * return if it can't write any more data without blocking */
305    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
306
307    /* if we don't have any bandwidth left, stop writing */
308    if( howmuch < 1 ) {
309        tr_peerIoSetEnabled( io, dir, FALSE );
310        return;
311    }
312
313    EVUTIL_SET_SOCKET_ERROR( 0 );
314    res = tr_evbuffer_write( io, fd, howmuch );
315    e = EVUTIL_SOCKET_ERROR( );
316
317    if (res == -1) {
318        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
319            goto reschedule;
320        /* error case */
321        what |= BEV_EVENT_ERROR;
322    } else if (res == 0) {
323        /* eof case */
324        what |= BEV_EVENT_EOF;
325    }
326    if (res <= 0)
327        goto error;
328
329    if( evbuffer_get_length( io->outbuf ) )
330        tr_peerIoSetEnabled( io, dir, TRUE );
331
332    didWriteWrapper( io, res );
333    return;
334
335 reschedule:
336    if( evbuffer_get_length( io->outbuf ) )
337        tr_peerIoSetEnabled( io, dir, TRUE );
338    return;
339
340 error:
341
342    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
343
344    if( io->gotError != NULL )
345        io->gotError( io, what, io->userData );
346}
347
348/**
349***
350**/
351
352static void
353maybeSetCongestionAlgorithm( int socket, const char * algorithm )
354{
355    if( algorithm && *algorithm )
356    {
357        const int rc = tr_netSetCongestionControl( socket, algorithm );
358
359        if( rc < 0 )
360            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
361                     algorithm, tr_strerror( errno ));
362    }
363}
364
365/* UTP callbacks */
366
367static void
368utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
369{
370    tr_peerIo *io = (tr_peerIo *)closure;
371    assert( tr_isPeerIo( io ) );
372    tr_ndbg( "UTP", "On read: %ld", (long)buflen );
373}
374
375static void
376utp_on_write(void *closure, unsigned char *buf, size_t buflen)
377{
378    tr_peerIo *io = (tr_peerIo *)closure;
379    assert( tr_isPeerIo( io ) );
380    tr_ndbg( "UTP", "On write: %ld", (long)buflen );
381}
382
383static size_t
384utp_get_rb_size(void *closure)
385{
386    tr_peerIo *io = (tr_peerIo *)closure;
387    assert( tr_isPeerIo( io ) );
388
389    tr_ndbg( "UTP", "Get RB size" );
390    return 0;
391}
392
393static void
394utp_on_state_change(void *closure, int state)
395{
396    tr_peerIo *io;
397    /* This can be called after UTP_Close, in which case closure can point
398       to an already-destroyed peerIo. */
399    if( state == UTP_STATE_DESTROYING ) {
400        tr_ndbg( "UTP", "Connection destroyed" );
401        return;
402    }
403
404    io = (tr_peerIo *)closure;
405    assert( tr_isPeerIo( io ) );
406
407    tr_ndbg( "UTP", "On state change: %d", state );
408}
409
410static void
411utp_on_error(void *closure, int errcode)
412{
413    tr_peerIo *io = (tr_peerIo *)closure;
414    assert( tr_isPeerIo( io ) );
415
416    tr_ndbg( "UTP", "Error callback: %s", tr_strerror( errcode ) );
417}
418
419static void
420utp_on_overhead(void *closure, bool send, size_t count, int type)
421{
422    tr_peerIo *io = (tr_peerIo *)closure;
423    assert( tr_isPeerIo( io ) );
424
425    tr_ndbg( "UTP", "On overhead: %d %ld %d", (int)send, (long)count, type );
426}
427
428static struct UTPFunctionTable utp_function_table = {
429    .on_read = utp_on_read,
430    .on_write = utp_on_write,
431    .get_rb_size = utp_get_rb_size,
432    .on_state = utp_on_state_change,
433    .on_error = utp_on_error,
434    .on_overhead = utp_on_overhead
435};
436
437/* Dummy UTP callbacks. */
438/* We switch a UTP socket to use these after the associated peerIo has been
439   destroyed -- see io_dtor. */
440
441static void
442dummy_read(void *closure, const unsigned char *buf, size_t buflen)
443{
444    abort();
445}
446
447static void
448dummy_write(void *closure, unsigned char *buf, size_t buflen)
449{
450    abort();
451}
452
453static size_t
454dummy_get_rb_size(void *closure)
455{
456    return 0;
457}
458
459static void
460dummy_on_state_change(void *closure, int state)
461{
462    return;
463}
464
465static void
466dummy_on_error(void *closure, int errcode)
467{
468    return;
469}
470
471static void
472dummy_on_overhead(void *closure, bool send, size_t count, int type)
473{
474}
475
476static struct UTPFunctionTable dummy_utp_function_table = {
477    .on_read = dummy_read,
478    .on_write = dummy_write,
479    .get_rb_size = dummy_get_rb_size,
480    .on_state = dummy_on_state_change,
481    .on_error = dummy_on_error,
482    .on_overhead = dummy_on_overhead
483};
484
485static tr_peerIo*
486tr_peerIoNew( tr_session       * session,
487              tr_bandwidth     * parent,
488              const tr_address * addr,
489              tr_port            port,
490              const uint8_t    * torrentHash,
491              tr_bool            isIncoming,
492              tr_bool            isSeed,
493              int                socket,
494              struct UTPSocket * utp_socket)
495{
496    tr_peerIo * io;
497
498    assert( session != NULL );
499    assert( session->events != NULL );
500    assert( tr_isBool( isIncoming ) );
501    assert( tr_isBool( isSeed ) );
502    assert( tr_amInEventThread( session ) );
503    assert( (socket < 0) == (utp_socket != NULL) );
504
505    if( socket >= 0 ) {
506        tr_netSetTOS( socket, session->peerSocketTOS );
507        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
508    }
509
510    io = tr_new0( tr_peerIo, 1 );
511    io->magicNumber = MAGIC_NUMBER;
512    io->refCount = 1;
513    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
514    io->session = session;
515    io->addr = *addr;
516    io->isSeed = isSeed;
517    io->port = port;
518    io->socket = socket;
519    io->utp_socket = utp_socket;
520    io->isIncoming = isIncoming != 0;
521    io->timeCreated = tr_time( );
522    io->inbuf = evbuffer_new( );
523    io->outbuf = evbuffer_new( );
524    tr_bandwidthConstruct( &io->bandwidth, session, parent );
525    tr_bandwidthSetPeer( &io->bandwidth, io );
526    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
527
528    if( io->socket >= 0 ) {
529        io->event_read = event_new( session->event_base,
530                                    io->socket, EV_READ, event_read_cb, io );
531        io->event_write = event_new( session->event_base,
532                                     io->socket, EV_WRITE, event_write_cb, io );
533    } else {
534        tr_ndbg( "UTP", "New %s connection",
535                 isIncoming ? "incoming" : "outgoing" );
536        UTP_SetCallbacks( utp_socket,
537                          &utp_function_table,
538                          io );
539    }
540
541    return io;
542}
543
544tr_peerIo*
545tr_peerIoNewIncoming( tr_session        * session,
546                      tr_bandwidth      * parent,
547                      const tr_address  * addr,
548                      tr_port             port,
549                      int                 fd,
550                      struct UTPSocket  * utp_socket )
551{
552    assert( session );
553    assert( tr_isAddress( addr ) );
554
555    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, 
556                         fd, utp_socket );
557}
558
559tr_peerIo*
560tr_peerIoNewOutgoing( tr_session        * session,
561                      tr_bandwidth      * parent,
562                      const tr_address  * addr,
563                      tr_port             port,
564                      const uint8_t     * torrentHash,
565                      tr_bool             isSeed )
566{
567    int fd;
568
569    assert( session );
570    assert( tr_isAddress( addr ) );
571    assert( torrentHash );
572
573    fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
574    dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
575
576    return fd < 0 ? NULL
577                  : tr_peerIoNew( session, parent, addr, port,
578                                  torrentHash, FALSE, isSeed, fd, NULL );
579}
580
581/***
582****
583***/
584
585static void
586event_enable( tr_peerIo * io, short event )
587{
588    assert( tr_amInEventThread( io->session ) );
589    assert( io->session != NULL );
590    assert( io->session->events != NULL );
591
592    if( io->socket < 0 )
593        return;
594
595    assert( io->session->events != NULL );
596    assert( event_initialized( io->event_read ) );
597    assert( event_initialized( io->event_write ) );
598
599    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
600    {
601        dbgmsg( io, "enabling libevent ready-to-read polling" );
602        event_add( io->event_read, NULL );
603        io->pendingEvents |= EV_READ;
604    }
605
606    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
607    {
608        dbgmsg( io, "enabling libevent ready-to-write polling" );
609        event_add( io->event_write, NULL );
610        io->pendingEvents |= EV_WRITE;
611    }
612}
613
614static void
615event_disable( struct tr_peerIo * io, short event )
616{
617    assert( tr_amInEventThread( io->session ) );
618    assert( io->session != NULL );
619
620    if( io->socket < 0 )
621        return;
622
623    assert( io->session->events != NULL );
624    assert( event_initialized( io->event_read ) );
625    assert( event_initialized( io->event_write ) );
626
627    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
628    {
629        dbgmsg( io, "disabling libevent ready-to-read polling" );
630        event_del( io->event_read );
631        io->pendingEvents &= ~EV_READ;
632    }
633
634    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
635    {
636        dbgmsg( io, "disabling libevent ready-to-write polling" );
637        event_del( io->event_write );
638        io->pendingEvents &= ~EV_WRITE;
639    }
640}
641
642void
643tr_peerIoSetEnabled( tr_peerIo    * io,
644                     tr_direction   dir,
645                     tr_bool        isEnabled )
646{
647    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
648
649    assert( tr_isPeerIo( io ) );
650    assert( tr_isDirection( dir ) );
651    assert( tr_amInEventThread( io->session ) );
652    assert( io->session->events != NULL );
653
654    if( isEnabled )
655        event_enable( io, event );
656    else
657        event_disable( io, event );
658}
659
660/***
661****
662***/
663
664static void
665io_dtor( void * vio )
666{
667    tr_peerIo * io = vio;
668
669    assert( tr_isPeerIo( io ) );
670    assert( tr_amInEventThread( io->session ) );
671    assert( io->session->events != NULL );
672
673    dbgmsg( io, "in tr_peerIo destructor" );
674    event_disable( io, EV_READ | EV_WRITE );
675    tr_bandwidthDestruct( &io->bandwidth );
676    evbuffer_free( io->outbuf );
677    evbuffer_free( io->inbuf );
678    if( io->socket >= 0 ) {
679        event_free( io->event_read );
680        event_free( io->event_write );
681        tr_netClose( io->session, io->socket );
682    }
683    if( io->utp_socket != NULL ) {
684        tr_ndbg( "UTP", "Destroying connection");
685        UTP_SetCallbacks( io->utp_socket,
686                          &dummy_utp_function_table,
687                          NULL );
688        UTP_Close( io->utp_socket );
689    }
690    tr_cryptoFree( io->crypto );
691    tr_list_free( &io->outbuf_datatypes, tr_free );
692
693    memset( io, ~0, sizeof( tr_peerIo ) );
694    tr_free( io );
695}
696
697static void
698tr_peerIoFree( tr_peerIo * io )
699{
700    if( io )
701    {
702        dbgmsg( io, "in tr_peerIoFree" );
703        io->canRead = NULL;
704        io->didWrite = NULL;
705        io->gotError = NULL;
706        tr_runInEventThread( io->session, io_dtor, io );
707    }
708}
709
710void
711tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
712{
713    assert( tr_isPeerIo( io ) );
714
715    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
716                file, line, io->refCount, io->refCount+1 );
717
718    ++io->refCount;
719}
720
721void
722tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
723{
724    assert( tr_isPeerIo( io ) );
725
726    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
727                file, line, io->refCount, io->refCount-1 );
728
729    if( !--io->refCount )
730        tr_peerIoFree( io );
731}
732
733const tr_address*
734tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
735{
736    assert( tr_isPeerIo( io ) );
737
738    if( port )
739        *port = io->port;
740
741    return &io->addr;
742}
743
744const char*
745tr_peerIoAddrStr( const tr_address * addr, tr_port port )
746{
747    static char buf[512];
748
749    if( addr->type == TR_AF_INET )
750        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
751    else
752        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
753    return buf;
754}
755
756const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
757{
758    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
759}
760
761void
762tr_peerIoSetIOFuncs( tr_peerIo        * io,
763                     tr_can_read_cb     readcb,
764                     tr_did_write_cb    writecb,
765                     tr_net_error_cb    errcb,
766                     void             * userData )
767{
768    io->canRead = readcb;
769    io->didWrite = writecb;
770    io->gotError = errcb;
771    io->userData = userData;
772}
773
774void
775tr_peerIoClear( tr_peerIo * io )
776{
777    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
778    tr_peerIoSetEnabled( io, TR_UP, FALSE );
779    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
780}
781
782int
783tr_peerIoReconnect( tr_peerIo * io )
784{
785    short int pendingEvents;
786    tr_session * session;
787
788    assert( tr_isPeerIo( io ) );
789    assert( !tr_peerIoIsIncoming( io ) );
790
791    session = tr_peerIoGetSession( io );
792
793    pendingEvents = io->pendingEvents;
794    event_disable( io, EV_READ | EV_WRITE );
795
796    if( io->socket >= 0 ) {
797        tr_netClose( session, io->socket );
798        io->socket = -1;
799    }
800    if( io->utp_socket != NULL ) {
801        UTP_Close(io->utp_socket);
802        io->utp_socket = NULL;
803    }
804
805    event_free( io->event_read );
806    event_free( io->event_write );
807    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
808    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
809    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
810
811    if( io->socket >= 0 )
812    {
813        event_enable( io, pendingEvents );
814        tr_netSetTOS( io->socket, session->peerSocketTOS );
815        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
816        return 0;
817    }
818
819    return -1;
820}
821
822/**
823***
824**/
825
826void
827tr_peerIoSetTorrentHash( tr_peerIo *     io,
828                         const uint8_t * hash )
829{
830    assert( tr_isPeerIo( io ) );
831
832    tr_cryptoSetTorrentHash( io->crypto, hash );
833}
834
835const uint8_t*
836tr_peerIoGetTorrentHash( tr_peerIo * io )
837{
838    assert( tr_isPeerIo( io ) );
839    assert( io->crypto );
840
841    return tr_cryptoGetTorrentHash( io->crypto );
842}
843
844int
845tr_peerIoHasTorrentHash( const tr_peerIo * io )
846{
847    assert( tr_isPeerIo( io ) );
848    assert( io->crypto );
849
850    return tr_cryptoHasTorrentHash( io->crypto );
851}
852
853/**
854***
855**/
856
857void
858tr_peerIoSetPeersId( tr_peerIo *     io,
859                     const uint8_t * peer_id )
860{
861    assert( tr_isPeerIo( io ) );
862
863    if( ( io->peerIdIsSet = peer_id != NULL ) )
864        memcpy( io->peerId, peer_id, 20 );
865    else
866        memset( io->peerId, 0, 20 );
867}
868
869/**
870***
871**/
872
873static unsigned int
874getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
875{
876    /* this is all kind of arbitrary, but what seems to work well is
877     * being large enough to hold the next 20 seconds' worth of input,
878     * or a few blocks, whichever is bigger.
879     * It's okay to tweak this as needed */
880    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
881    const unsigned int period = 15u; /* arbitrary */
882    /* the 3 is arbitrary; the .5 is to leave room for messages */
883    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
884    return MAX( ceiling, currentSpeed_Bps*period );
885}
886
887size_t
888tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
889{
890    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
891    const size_t currentLen = evbuffer_get_length( io->outbuf );
892    size_t freeSpace = 0;
893
894    if( desiredLen > currentLen )
895        freeSpace = desiredLen - currentLen;
896
897    return freeSpace;
898}
899
900/**
901***
902**/
903
904void
905tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
906{
907    assert( tr_isPeerIo( io ) );
908    assert( encryptionMode == PEER_ENCRYPTION_NONE
909         || encryptionMode == PEER_ENCRYPTION_RC4 );
910
911    io->encryptionMode = encryptionMode;
912}
913
914/**
915***
916**/
917
918static void
919addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
920{
921    struct tr_datatype * d;
922
923    d = tr_new( struct tr_datatype, 1 );
924    d->isPieceData = isPieceData != 0;
925    d->length = byteCount;
926    tr_list_append( &io->outbuf_datatypes, d );
927}
928
929static struct evbuffer_iovec *
930evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
931{
932    const size_t byteCount = evbuffer_get_length( buf );
933    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
934    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
935    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
936    assert( n == vecCount );
937    *setme_vecCount = n;
938    return iovec;
939}
940
941static void
942maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
943{
944    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
945    {
946        size_t i, n;
947        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
948
949        for( i=0; i<n; ++i )
950            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
951
952        tr_free( iovec );
953    }
954}
955
956void
957tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
958{
959    const size_t byteCount = evbuffer_get_length( buf );
960    maybeEncryptBuffer( io, buf );
961    evbuffer_add_buffer( io->outbuf, buf );
962    addDatatype( io, byteCount, isPieceData );
963}
964
965void
966tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
967{
968    struct evbuffer * buf = evbuffer_new( );
969    evbuffer_add( buf, bytes, byteCount );
970    tr_peerIoWriteBuf( io, buf, isPieceData );
971    evbuffer_free( buf );
972}
973
974/***
975****
976***/
977
978void
979evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
980{
981    evbuffer_add( outbuf, &byte, 1 );
982}
983
984void
985evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
986{
987    const uint16_t ns = htons( addme_hs );
988    evbuffer_add( outbuf, &ns, sizeof( ns ) );
989}
990
991void
992evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
993{
994    const uint32_t nl = htonl( addme_hl );
995    evbuffer_add( outbuf, &nl, sizeof( nl ) );
996}
997
998/***
999****
1000***/
1001
1002void
1003tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1004{
1005    assert( tr_isPeerIo( io ) );
1006    assert( evbuffer_get_length( inbuf )  >= byteCount );
1007
1008    switch( io->encryptionMode )
1009    {
1010        case PEER_ENCRYPTION_NONE:
1011            evbuffer_remove( inbuf, bytes, byteCount );
1012            break;
1013
1014        case PEER_ENCRYPTION_RC4:
1015            evbuffer_remove( inbuf, bytes, byteCount );
1016            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1017            break;
1018
1019        default:
1020            assert( 0 );
1021    }
1022}
1023
1024void
1025tr_peerIoReadUint16( tr_peerIo        * io,
1026                     struct evbuffer  * inbuf,
1027                     uint16_t         * setme )
1028{
1029    uint16_t tmp;
1030    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1031    *setme = ntohs( tmp );
1032}
1033
1034void tr_peerIoReadUint32( tr_peerIo        * io,
1035                          struct evbuffer  * inbuf,
1036                          uint32_t         * setme )
1037{
1038    uint32_t tmp;
1039    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1040    *setme = ntohl( tmp );
1041}
1042
1043void
1044tr_peerIoDrain( tr_peerIo       * io,
1045                struct evbuffer * inbuf,
1046                size_t            byteCount )
1047{
1048    void * buf = tr_sessionGetBuffer( io->session );
1049    const size_t buflen = SESSION_BUFFER_SIZE;
1050
1051    while( byteCount > 0 )
1052    {
1053        const size_t thisPass = MIN( byteCount, buflen );
1054        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1055        byteCount -= thisPass;
1056    }
1057
1058    tr_sessionReleaseBuffer( io->session );
1059}
1060
1061/***
1062****
1063***/
1064
1065static int
1066tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1067{
1068    int res = 0;
1069
1070    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1071    {
1072        int e;
1073
1074        EVUTIL_SET_SOCKET_ERROR( 0 );
1075        res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1076        e = EVUTIL_SOCKET_ERROR( );
1077
1078        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1079
1080        if( evbuffer_get_length( io->inbuf ) )
1081            canReadWrapper( io );
1082
1083        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1084        {
1085            char errstr[512];
1086            short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1087            if( res == 0 )
1088                what |= BEV_EVENT_EOF;
1089            tr_net_strerror( errstr, sizeof( errstr ), e );
1090            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1091            io->gotError( io, what, io->userData );
1092        }
1093    }
1094
1095    return res;
1096}
1097
1098static int
1099tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1100{
1101    int n = 0;
1102
1103    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1104    {
1105        int e;
1106        EVUTIL_SET_SOCKET_ERROR( 0 );
1107        n = tr_evbuffer_write( io, io->socket, howmuch );
1108        e = EVUTIL_SOCKET_ERROR( );
1109
1110        if( n > 0 )
1111            didWriteWrapper( io, n );
1112
1113        if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1114        {
1115            char errstr[512];
1116            const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1117
1118            tr_net_strerror( errstr, sizeof( errstr ), e );
1119            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1120
1121            if( io->gotError != NULL )
1122                io->gotError( io, what, io->userData );
1123        }
1124    }
1125
1126    return n;
1127}
1128
1129int
1130tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1131{
1132    int bytesUsed = 0;
1133
1134    assert( tr_isPeerIo( io ) );
1135    assert( tr_isDirection( dir ) );
1136
1137    if( dir == TR_DOWN )
1138        bytesUsed = tr_peerIoTryRead( io, limit );
1139    else
1140        bytesUsed = tr_peerIoTryWrite( io, limit );
1141
1142    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1143    return bytesUsed;
1144}
1145
1146int
1147tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1148{
1149    size_t byteCount = 0;
1150    tr_list * it;
1151
1152    /* count up how many bytes are used by non-piece-data messages
1153       at the front of our outbound queue */
1154    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1155    {
1156        struct tr_datatype * d = it->data;
1157
1158        if( d->isPieceData )
1159            break;
1160
1161        byteCount += d->length;
1162    }
1163
1164    return tr_peerIoFlush( io, TR_UP, byteCount );
1165}
Note: See TracBrowser for help on using the repository browser.