source: trunk/libtransmission/peer-io.c @ 11917

Last change on this file since 11917 was 11917, checked in by jch, 11 years ago

Use dummy callbacks in peerIoReconnect.

  • Property svn:keywords set to Date Rev Author Id
File size: 30.9 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11917 2011-02-18 00:24:45Z jch $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28#include "utp.h"
29
30#include "transmission.h"
31#include "session.h"
32#include "bandwidth.h"
33#include "crypto.h"
34#include "list.h"
35#include "net.h"
36#include "peer-common.h" /* MAX_BLOCK_SIZE */
37#include "peer-io.h"
38#include "trevent.h" /* tr_runInEventThread() */
39#include "utils.h"
40
41#define MAGIC_NUMBER 206745
42
43#ifdef WIN32
44 #define EAGAIN       WSAEWOULDBLOCK
45 #define EINTR        WSAEINTR
46 #define EINPROGRESS  WSAEINPROGRESS
47 #define EPIPE        WSAECONNRESET
48#endif
49
50static size_t
51guessPacketOverhead( size_t d )
52{
53    /**
54     * http://sd.wareonearth.com/~phil/net/overhead/
55     *
56     * TCP over Ethernet:
57     * Assuming no header compression (e.g. not PPP)
58     * Add 20 IPv4 header or 40 IPv6 header (no options)
59     * Add 20 TCP header
60     * Add 12 bytes optional TCP timestamps
61     * Max TCP Payload data rates over ethernet are thus:
62     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
63     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
64     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
65     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
66     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
67     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
68     */
69    const double assumed_payload_data_rate = 94.0;
70
71    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
72}
73
74/**
75***
76**/
77
78#define dbgmsg( io, ... ) \
79    do { \
80        if( tr_deepLoggingIsActive( ) ) \
81            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
82    } while( 0 )
83
84struct tr_datatype
85{
86    tr_bool  isPieceData;
87    size_t   length;
88};
89
90
91/***
92****
93***/
94
95static void
96didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
97{
98     while( bytes_transferred && tr_isPeerIo( io ) )
99     {
100        struct tr_datatype * next = io->outbuf_datatypes->data;
101
102        const unsigned int payload = MIN( next->length, bytes_transferred );
103        const unsigned int overhead = guessPacketOverhead( payload );
104        const uint64_t now = tr_sessionGetTimeMsec( io->session );
105
106        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
107
108        if( overhead > 0 )
109            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
110
111        if( io->didWrite )
112            io->didWrite( io, payload, next->isPieceData, io->userData );
113
114        if( tr_isPeerIo( io ) )
115        {
116            bytes_transferred -= payload;
117            next->length -= payload;
118            if( !next->length ) {
119                tr_list_pop_front( &io->outbuf_datatypes );
120                tr_free( next );
121            }
122        }
123    }
124}
125
126static void
127canReadWrapper( tr_peerIo * io )
128{
129    tr_bool err = 0;
130    tr_bool done = 0;
131    tr_session * session;
132
133    dbgmsg( io, "canRead" );
134
135    assert( tr_isPeerIo( io ) );
136    assert( tr_isSession( io->session ) );
137    tr_peerIoRef( io );
138
139    session = io->session;
140
141    /* try to consume the input buffer */
142    if( io->canRead )
143    {
144        const uint64_t now = tr_sessionGetTimeMsec( io->session );
145
146        tr_sessionLock( session );
147
148        while( !done && !err )
149        {
150            size_t piece = 0;
151            const size_t oldLen = evbuffer_get_length( io->inbuf );
152            const int ret = io->canRead( io, io->userData, &piece );
153            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
154            const unsigned int overhead = guessPacketOverhead( used );
155
156            assert( tr_isPeerIo( io ) );
157
158            if( piece || (piece!=used) )
159            {
160                if( piece )
161                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
162
163                if( used != piece )
164                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
165            }
166
167            if( overhead > 0 )
168                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
169
170            switch( ret )
171            {
172                case READ_NOW:
173                    if( evbuffer_get_length( io->inbuf ) )
174                        continue;
175                    done = 1;
176                    break;
177
178                case READ_LATER:
179                    done = 1;
180                    break;
181
182                case READ_ERR:
183                    err = 1;
184                    break;
185            }
186
187            assert( tr_isPeerIo( io ) );
188        }
189
190        tr_sessionUnlock( session );
191    }
192
193    assert( tr_isPeerIo( io ) );
194    tr_peerIoUnref( io );
195}
196
197tr_bool
198tr_isPeerIo( const tr_peerIo * io )
199{
200    return ( io != NULL )
201        && ( io->magicNumber == MAGIC_NUMBER )
202        && ( io->refCount >= 0 )
203        && ( tr_isBandwidth( &io->bandwidth ) )
204        && ( tr_isAddress( &io->addr ) );
205}
206
207static void
208event_read_cb( int fd, short event UNUSED, void * vio )
209{
210    int res;
211    int e;
212    tr_peerIo * io = vio;
213
214    /* Limit the input buffer to 256K, so it doesn't grow too large */
215    unsigned int howmuch;
216    unsigned int curlen;
217    const tr_direction dir = TR_DOWN;
218    const unsigned int max = 256 * 1024;
219
220    assert( tr_isPeerIo( io ) );
221    assert( io->socket >= 0 );
222
223    io->pendingEvents &= ~EV_READ;
224
225    curlen = evbuffer_get_length( io->inbuf );
226    howmuch = curlen >= max ? 0 : max - curlen;
227    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
228
229    dbgmsg( io, "libevent says this peer is ready to read" );
230
231    /* if we don't have any bandwidth left, stop reading */
232    if( howmuch < 1 ) {
233        tr_peerIoSetEnabled( io, dir, FALSE );
234        return;
235    }
236
237    EVUTIL_SET_SOCKET_ERROR( 0 );
238    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
239    e = EVUTIL_SOCKET_ERROR( );
240
241    if( res > 0 )
242    {
243        tr_peerIoSetEnabled( io, dir, TRUE );
244
245        /* Invoke the user callback - must always be called last */
246        canReadWrapper( io );
247    }
248    else
249    {
250        char errstr[512];
251        short what = BEV_EVENT_READING;
252
253        if( res == 0 ) /* EOF */
254            what |= BEV_EVENT_EOF;
255        else if( res == -1 ) {
256            if( e == EAGAIN || e == EINTR ) {
257                tr_peerIoSetEnabled( io, dir, TRUE );
258                return;
259            }
260            what |= BEV_EVENT_ERROR;
261        }
262
263        tr_net_strerror( errstr, sizeof( errstr ), e );
264        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
265
266        if( io->gotError != NULL )
267            io->gotError( io, what, io->userData );
268    }
269}
270
271static int
272tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
273{
274    int e;
275    int n;
276    char errstr[256];
277
278    EVUTIL_SET_SOCKET_ERROR( 0 );
279    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
280    e = EVUTIL_SOCKET_ERROR( );
281    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
282
283    return n;
284}
285
286static void
287event_write_cb( int fd, short event UNUSED, void * vio )
288{
289    int res = 0;
290    int e;
291    short what = BEV_EVENT_WRITING;
292    tr_peerIo * io = vio;
293    size_t howmuch;
294    const tr_direction dir = TR_UP;
295
296    assert( tr_isPeerIo( io ) );
297    assert( io->socket >= 0 );
298
299    io->pendingEvents &= ~EV_WRITE;
300
301    dbgmsg( io, "libevent says this peer is ready to write" );
302
303    /* Write as much as possible, since the socket is non-blocking, write() will
304     * return if it can't write any more data without blocking */
305    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
306
307    /* if we don't have any bandwidth left, stop writing */
308    if( howmuch < 1 ) {
309        tr_peerIoSetEnabled( io, dir, FALSE );
310        return;
311    }
312
313    EVUTIL_SET_SOCKET_ERROR( 0 );
314    res = tr_evbuffer_write( io, fd, howmuch );
315    e = EVUTIL_SOCKET_ERROR( );
316
317    if (res == -1) {
318        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
319            goto reschedule;
320        /* error case */
321        what |= BEV_EVENT_ERROR;
322    } else if (res == 0) {
323        /* eof case */
324        what |= BEV_EVENT_EOF;
325    }
326    if (res <= 0)
327        goto error;
328
329    if( evbuffer_get_length( io->outbuf ) )
330        tr_peerIoSetEnabled( io, dir, TRUE );
331
332    didWriteWrapper( io, res );
333    return;
334
335 reschedule:
336    if( evbuffer_get_length( io->outbuf ) )
337        tr_peerIoSetEnabled( io, dir, TRUE );
338    return;
339
340 error:
341
342    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
343
344    if( io->gotError != NULL )
345        io->gotError( io, what, io->userData );
346}
347
348/**
349***
350**/
351
352static void
353maybeSetCongestionAlgorithm( int socket, const char * algorithm )
354{
355    if( algorithm && *algorithm )
356    {
357        const int rc = tr_netSetCongestionControl( socket, algorithm );
358
359        if( rc < 0 )
360            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
361                     algorithm, tr_strerror( errno ));
362    }
363}
364
365/* UTP callbacks */
366
367static void
368utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
369{
370    int rc;
371    tr_peerIo *io = (tr_peerIo *)closure;
372    assert( tr_isPeerIo( io ) );
373
374    tr_ndbg( "UTP", "On read: %ld", (long)buflen );
375
376    rc = evbuffer_add( io->inbuf, buf, buflen );
377    if( rc < 0 ) {
378        tr_nerr( "UTP", "On read evbuffer_add" );
379        return;
380    }
381
382    tr_peerIoSetEnabled( io, TR_DOWN, TRUE );
383    canReadWrapper( io );
384}
385
386static void
387utp_on_write(void *closure, unsigned char *buf, size_t buflen)
388{
389    tr_peerIo *io = (tr_peerIo *)closure;
390    int rc;
391
392    assert( tr_isPeerIo( io ) );
393    tr_ndbg( "UTP", "On write: %ld", (long)buflen );
394
395    rc = evbuffer_remove( io->outbuf, buf, buflen );
396    if( rc < (long)buflen ) {
397        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
398    }
399}
400
401static size_t
402utp_get_rb_size(void *closure)
403{
404    tr_peerIo *io = (tr_peerIo *)closure;
405    assert( tr_isPeerIo( io ) );
406
407    tr_ndbg( "UTP", "Get RB size" );
408    return 0;
409}
410
411static void
412utp_on_state_change(void *closure, int state)
413{
414    tr_peerIo *io = (tr_peerIo *)closure;
415    assert( tr_isPeerIo( io ) );
416
417    tr_ndbg( "UTP", "On state change: %d", state );
418
419    if( state == UTP_STATE_CONNECT || state == UTP_STATE_WRITABLE ) {
420        size_t count = evbuffer_get_length( io->outbuf );
421        if( count > 0 )
422            UTP_Write( io->utp_socket, count );
423    } else if( state == UTP_STATE_EOF ) {
424        if( io->gotError )
425            io->gotError( io, BEV_EVENT_EOF, io->userData );
426    } else if( state == UTP_STATE_DESTROYING ) {
427        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
428        return;
429    } else {
430        tr_nerr( "UTP", "Unknown state %d", state );
431    }
432}
433
434static void
435utp_on_error(void *closure, int errcode)
436{
437    tr_peerIo *io = (tr_peerIo *)closure;
438    assert( tr_isPeerIo( io ) );
439
440    tr_ndbg( "UTP", "Error callback: %s", tr_strerror( errcode ) );
441
442    if( io->gotError ) {
443        errno = errcode;
444        io->gotError( io, BEV_EVENT_ERROR, io->userData );
445    }
446}
447
448static void
449utp_on_overhead(void *closure, bool send, size_t count, int type)
450{
451    tr_peerIo *io = (tr_peerIo *)closure;
452    assert( tr_isPeerIo( io ) );
453
454    tr_ndbg( "UTP", "On overhead: %d %ld %d", (int)send, (long)count, type );
455}
456
457static struct UTPFunctionTable utp_function_table = {
458    .on_read = utp_on_read,
459    .on_write = utp_on_write,
460    .get_rb_size = utp_get_rb_size,
461    .on_state = utp_on_state_change,
462    .on_error = utp_on_error,
463    .on_overhead = utp_on_overhead
464};
465
466/* Dummy UTP callbacks. */
467/* We switch a UTP socket to use these after the associated peerIo has been
468   destroyed -- see io_dtor. */
469
470static void
471dummy_read(void *closure, const unsigned char *buf, size_t buflen)
472{
473    /* This cannot happen, as far as I'm aware. */
474    tr_nerr("UTP", "On_read called on closed socket");
475
476}
477
478static void
479dummy_write(void *closure, unsigned char *buf, size_t buflen)
480{
481    /* This can very well happen if we've shut down a peer connection that
482       had unflushed buffers.  Complain and send zeroes. */
483    tr_ndbg("UTP", "On_write called on closed socket");
484    memset(buf, 0, buflen);
485}
486
487static size_t
488dummy_get_rb_size(void *closure)
489{
490    return 0;
491}
492
493static void
494dummy_on_state_change(void *closure, int state)
495{
496    return;
497}
498
499static void
500dummy_on_error(void *closure, int errcode)
501{
502    return;
503}
504
505static void
506dummy_on_overhead(void *closure, bool send, size_t count, int type)
507{
508    return;
509}
510
511static struct UTPFunctionTable dummy_utp_function_table = {
512    .on_read = dummy_read,
513    .on_write = dummy_write,
514    .get_rb_size = dummy_get_rb_size,
515    .on_state = dummy_on_state_change,
516    .on_error = dummy_on_error,
517    .on_overhead = dummy_on_overhead
518};
519
520static tr_peerIo*
521tr_peerIoNew( tr_session       * session,
522              tr_bandwidth     * parent,
523              const tr_address * addr,
524              tr_port            port,
525              const uint8_t    * torrentHash,
526              tr_bool            isIncoming,
527              tr_bool            isSeed,
528              int                socket,
529              struct UTPSocket * utp_socket)
530{
531    tr_peerIo * io;
532
533    assert( session != NULL );
534    assert( session->events != NULL );
535    assert( tr_isBool( isIncoming ) );
536    assert( tr_isBool( isSeed ) );
537    assert( tr_amInEventThread( session ) );
538    assert( (socket < 0) == (utp_socket != NULL) );
539
540    if( socket >= 0 ) {
541        tr_netSetTOS( socket, session->peerSocketTOS );
542        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
543    }
544
545    io = tr_new0( tr_peerIo, 1 );
546    io->magicNumber = MAGIC_NUMBER;
547    io->refCount = 1;
548    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
549    io->session = session;
550    io->addr = *addr;
551    io->isSeed = isSeed;
552    io->port = port;
553    io->socket = socket;
554    io->utp_socket = utp_socket;
555    io->isIncoming = isIncoming != 0;
556    io->timeCreated = tr_time( );
557    io->inbuf = evbuffer_new( );
558    io->outbuf = evbuffer_new( );
559    tr_bandwidthConstruct( &io->bandwidth, session, parent );
560    tr_bandwidthSetPeer( &io->bandwidth, io );
561    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
562
563    if( io->socket >= 0 ) {
564        io->event_read = event_new( session->event_base,
565                                    io->socket, EV_READ, event_read_cb, io );
566        io->event_write = event_new( session->event_base,
567                                     io->socket, EV_WRITE, event_write_cb, io );
568    } else {
569        tr_ndbg( "UTP", "New %s connection",
570                 isIncoming ? "incoming" : "outgoing" );
571        UTP_SetCallbacks( utp_socket,
572                          &utp_function_table,
573                          io );
574    }
575
576    return io;
577}
578
579tr_peerIo*
580tr_peerIoNewIncoming( tr_session        * session,
581                      tr_bandwidth      * parent,
582                      const tr_address  * addr,
583                      tr_port             port,
584                      int                 fd,
585                      struct UTPSocket  * utp_socket )
586{
587    assert( session );
588    assert( tr_isAddress( addr ) );
589
590    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, 
591                         fd, utp_socket );
592}
593
594tr_peerIo*
595tr_peerIoNewOutgoing( tr_session        * session,
596                      tr_bandwidth      * parent,
597                      const tr_address  * addr,
598                      tr_port             port,
599                      const uint8_t     * torrentHash,
600                      tr_bool             isSeed )
601{
602    int fd;
603
604    assert( session );
605    assert( tr_isAddress( addr ) );
606    assert( torrentHash );
607
608    fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
609    dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
610
611    return fd < 0 ? NULL
612                  : tr_peerIoNew( session, parent, addr, port,
613                                  torrentHash, FALSE, isSeed, fd, NULL );
614}
615
616/***
617****
618***/
619
620static void
621event_enable( tr_peerIo * io, short event )
622{
623    assert( tr_amInEventThread( io->session ) );
624    assert( io->session != NULL );
625    assert( io->session->events != NULL );
626
627    if( io->socket < 0 )
628        return;
629
630    assert( io->session->events != NULL );
631    assert( event_initialized( io->event_read ) );
632    assert( event_initialized( io->event_write ) );
633
634    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
635    {
636        dbgmsg( io, "enabling libevent ready-to-read polling" );
637        event_add( io->event_read, NULL );
638        io->pendingEvents |= EV_READ;
639    }
640
641    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
642    {
643        dbgmsg( io, "enabling libevent ready-to-write polling" );
644        event_add( io->event_write, NULL );
645        io->pendingEvents |= EV_WRITE;
646    }
647}
648
649static void
650event_disable( struct tr_peerIo * io, short event )
651{
652    assert( tr_amInEventThread( io->session ) );
653    assert( io->session != NULL );
654
655    if( io->socket < 0 )
656        return;
657
658    assert( io->session->events != NULL );
659    assert( event_initialized( io->event_read ) );
660    assert( event_initialized( io->event_write ) );
661
662    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
663    {
664        dbgmsg( io, "disabling libevent ready-to-read polling" );
665        event_del( io->event_read );
666        io->pendingEvents &= ~EV_READ;
667    }
668
669    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
670    {
671        dbgmsg( io, "disabling libevent ready-to-write polling" );
672        event_del( io->event_write );
673        io->pendingEvents &= ~EV_WRITE;
674    }
675}
676
677void
678tr_peerIoSetEnabled( tr_peerIo    * io,
679                     tr_direction   dir,
680                     tr_bool        isEnabled )
681{
682    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
683
684    assert( tr_isPeerIo( io ) );
685    assert( tr_isDirection( dir ) );
686    assert( tr_amInEventThread( io->session ) );
687    assert( io->session->events != NULL );
688
689    if( isEnabled )
690        event_enable( io, event );
691    else
692        event_disable( io, event );
693}
694
695/***
696****
697***/
698
699static void
700io_dtor( void * vio )
701{
702    tr_peerIo * io = vio;
703
704    assert( tr_isPeerIo( io ) );
705    assert( tr_amInEventThread( io->session ) );
706    assert( io->session->events != NULL );
707
708    dbgmsg( io, "in tr_peerIo destructor" );
709    event_disable( io, EV_READ | EV_WRITE );
710    tr_bandwidthDestruct( &io->bandwidth );
711    evbuffer_free( io->outbuf );
712    evbuffer_free( io->inbuf );
713    if( io->socket >= 0 ) {
714        event_free( io->event_read );
715        event_free( io->event_write );
716        tr_netClose( io->session, io->socket );
717    }
718    if( io->utp_socket != NULL ) {
719        tr_ndbg( "UTP", "Destroying connection");
720        UTP_SetCallbacks( io->utp_socket,
721                          &dummy_utp_function_table,
722                          NULL );
723        UTP_Close( io->utp_socket );
724    }
725    tr_cryptoFree( io->crypto );
726    tr_list_free( &io->outbuf_datatypes, tr_free );
727
728    memset( io, ~0, sizeof( tr_peerIo ) );
729    tr_free( io );
730}
731
732static void
733tr_peerIoFree( tr_peerIo * io )
734{
735    if( io )
736    {
737        dbgmsg( io, "in tr_peerIoFree" );
738        io->canRead = NULL;
739        io->didWrite = NULL;
740        io->gotError = NULL;
741        tr_runInEventThread( io->session, io_dtor, io );
742    }
743}
744
745void
746tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
747{
748    assert( tr_isPeerIo( io ) );
749
750    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
751                file, line, io->refCount, io->refCount+1 );
752
753    ++io->refCount;
754}
755
756void
757tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
758{
759    assert( tr_isPeerIo( io ) );
760
761    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
762                file, line, io->refCount, io->refCount-1 );
763
764    if( !--io->refCount )
765        tr_peerIoFree( io );
766}
767
768const tr_address*
769tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
770{
771    assert( tr_isPeerIo( io ) );
772
773    if( port )
774        *port = io->port;
775
776    return &io->addr;
777}
778
779const char*
780tr_peerIoAddrStr( const tr_address * addr, tr_port port )
781{
782    static char buf[512];
783
784    if( addr->type == TR_AF_INET )
785        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
786    else
787        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
788    return buf;
789}
790
791const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
792{
793    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
794}
795
796void
797tr_peerIoSetIOFuncs( tr_peerIo        * io,
798                     tr_can_read_cb     readcb,
799                     tr_did_write_cb    writecb,
800                     tr_net_error_cb    errcb,
801                     void             * userData )
802{
803    io->canRead = readcb;
804    io->didWrite = writecb;
805    io->gotError = errcb;
806    io->userData = userData;
807}
808
809void
810tr_peerIoClear( tr_peerIo * io )
811{
812    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
813    tr_peerIoSetEnabled( io, TR_UP, FALSE );
814    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
815}
816
817int
818tr_peerIoReconnect( tr_peerIo * io )
819{
820    short int pendingEvents;
821    tr_session * session;
822
823    assert( tr_isPeerIo( io ) );
824    assert( !tr_peerIoIsIncoming( io ) );
825
826    session = tr_peerIoGetSession( io );
827
828    pendingEvents = io->pendingEvents;
829    event_disable( io, EV_READ | EV_WRITE );
830
831    if( io->socket >= 0 ) {
832        tr_netClose( session, io->socket );
833        io->socket = -1;
834    }
835    if( io->utp_socket != NULL ) {
836        UTP_SetCallbacks( io->utp_socket,
837                          &dummy_utp_function_table,
838                          NULL );
839        UTP_Close(io->utp_socket);
840        io->utp_socket = NULL;
841    }
842
843    event_free( io->event_read );
844    event_free( io->event_write );
845    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
846    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
847    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
848
849    if( io->socket >= 0 )
850    {
851        event_enable( io, pendingEvents );
852        tr_netSetTOS( io->socket, session->peerSocketTOS );
853        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
854        return 0;
855    }
856
857    return -1;
858}
859
860/**
861***
862**/
863
864void
865tr_peerIoSetTorrentHash( tr_peerIo *     io,
866                         const uint8_t * hash )
867{
868    assert( tr_isPeerIo( io ) );
869
870    tr_cryptoSetTorrentHash( io->crypto, hash );
871}
872
873const uint8_t*
874tr_peerIoGetTorrentHash( tr_peerIo * io )
875{
876    assert( tr_isPeerIo( io ) );
877    assert( io->crypto );
878
879    return tr_cryptoGetTorrentHash( io->crypto );
880}
881
882int
883tr_peerIoHasTorrentHash( const tr_peerIo * io )
884{
885    assert( tr_isPeerIo( io ) );
886    assert( io->crypto );
887
888    return tr_cryptoHasTorrentHash( io->crypto );
889}
890
891/**
892***
893**/
894
895void
896tr_peerIoSetPeersId( tr_peerIo *     io,
897                     const uint8_t * peer_id )
898{
899    assert( tr_isPeerIo( io ) );
900
901    if( ( io->peerIdIsSet = peer_id != NULL ) )
902        memcpy( io->peerId, peer_id, 20 );
903    else
904        memset( io->peerId, 0, 20 );
905}
906
907/**
908***
909**/
910
911static unsigned int
912getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
913{
914    /* this is all kind of arbitrary, but what seems to work well is
915     * being large enough to hold the next 20 seconds' worth of input,
916     * or a few blocks, whichever is bigger.
917     * It's okay to tweak this as needed */
918    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
919    const unsigned int period = 15u; /* arbitrary */
920    /* the 3 is arbitrary; the .5 is to leave room for messages */
921    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
922    return MAX( ceiling, currentSpeed_Bps*period );
923}
924
925size_t
926tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
927{
928    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
929    const size_t currentLen = evbuffer_get_length( io->outbuf );
930    size_t freeSpace = 0;
931
932    if( desiredLen > currentLen )
933        freeSpace = desiredLen - currentLen;
934
935    return freeSpace;
936}
937
938/**
939***
940**/
941
942void
943tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
944{
945    assert( tr_isPeerIo( io ) );
946    assert( encryptionMode == PEER_ENCRYPTION_NONE
947         || encryptionMode == PEER_ENCRYPTION_RC4 );
948
949    io->encryptionMode = encryptionMode;
950}
951
952/**
953***
954**/
955
956static void
957addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
958{
959    struct tr_datatype * d;
960
961    d = tr_new( struct tr_datatype, 1 );
962    d->isPieceData = isPieceData != 0;
963    d->length = byteCount;
964    tr_list_append( &io->outbuf_datatypes, d );
965}
966
967static struct evbuffer_iovec *
968evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
969{
970    const size_t byteCount = evbuffer_get_length( buf );
971    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
972    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
973    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
974    assert( n == vecCount );
975    *setme_vecCount = n;
976    return iovec;
977}
978
979static void
980maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
981{
982    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
983    {
984        size_t i, n;
985        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
986
987        for( i=0; i<n; ++i )
988            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
989
990        tr_free( iovec );
991    }
992}
993
994void
995tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
996{
997    const size_t byteCount = evbuffer_get_length( buf );
998    maybeEncryptBuffer( io, buf );
999    evbuffer_add_buffer( io->outbuf, buf );
1000    addDatatype( io, byteCount, isPieceData );
1001    if( io->utp_socket )
1002        UTP_Write( io->utp_socket, byteCount );
1003}
1004
1005void
1006tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
1007{
1008    struct evbuffer * buf = evbuffer_new( );
1009    evbuffer_add( buf, bytes, byteCount );
1010    tr_peerIoWriteBuf( io, buf, isPieceData );
1011    evbuffer_free( buf );
1012}
1013
1014/***
1015****
1016***/
1017
1018void
1019evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1020{
1021    evbuffer_add( outbuf, &byte, 1 );
1022}
1023
1024void
1025evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1026{
1027    const uint16_t ns = htons( addme_hs );
1028    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1029}
1030
1031void
1032evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1033{
1034    const uint32_t nl = htonl( addme_hl );
1035    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1036}
1037
1038/***
1039****
1040***/
1041
1042void
1043tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1044{
1045    assert( tr_isPeerIo( io ) );
1046    assert( evbuffer_get_length( inbuf )  >= byteCount );
1047
1048    switch( io->encryptionMode )
1049    {
1050        case PEER_ENCRYPTION_NONE:
1051            evbuffer_remove( inbuf, bytes, byteCount );
1052            break;
1053
1054        case PEER_ENCRYPTION_RC4:
1055            evbuffer_remove( inbuf, bytes, byteCount );
1056            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
1057            break;
1058
1059        default:
1060            assert( 0 );
1061    }
1062}
1063
1064void
1065tr_peerIoReadUint16( tr_peerIo        * io,
1066                     struct evbuffer  * inbuf,
1067                     uint16_t         * setme )
1068{
1069    uint16_t tmp;
1070    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1071    *setme = ntohs( tmp );
1072}
1073
1074void tr_peerIoReadUint32( tr_peerIo        * io,
1075                          struct evbuffer  * inbuf,
1076                          uint32_t         * setme )
1077{
1078    uint32_t tmp;
1079    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1080    *setme = ntohl( tmp );
1081}
1082
1083void
1084tr_peerIoDrain( tr_peerIo       * io,
1085                struct evbuffer * inbuf,
1086                size_t            byteCount )
1087{
1088    void * buf = tr_sessionGetBuffer( io->session );
1089    const size_t buflen = SESSION_BUFFER_SIZE;
1090
1091    while( byteCount > 0 )
1092    {
1093        const size_t thisPass = MIN( byteCount, buflen );
1094        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1095        byteCount -= thisPass;
1096    }
1097
1098    tr_sessionReleaseBuffer( io->session );
1099}
1100
1101/***
1102****
1103***/
1104
1105static int
1106tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1107{
1108    int res = 0;
1109
1110    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1111    {
1112        int e;
1113
1114        EVUTIL_SET_SOCKET_ERROR( 0 );
1115        res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1116        e = EVUTIL_SOCKET_ERROR( );
1117
1118        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
1119
1120        if( evbuffer_get_length( io->inbuf ) )
1121            canReadWrapper( io );
1122
1123        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1124        {
1125            char errstr[512];
1126            short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1127            if( res == 0 )
1128                what |= BEV_EVENT_EOF;
1129            tr_net_strerror( errstr, sizeof( errstr ), e );
1130            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
1131            io->gotError( io, what, io->userData );
1132        }
1133    }
1134
1135    return res;
1136}
1137
1138static int
1139tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1140{
1141    int n = 0;
1142
1143    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1144    {
1145        int e;
1146        EVUTIL_SET_SOCKET_ERROR( 0 );
1147        n = tr_evbuffer_write( io, io->socket, howmuch );
1148        e = EVUTIL_SOCKET_ERROR( );
1149
1150        if( n > 0 )
1151            didWriteWrapper( io, n );
1152
1153        if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1154        {
1155            char errstr[512];
1156            const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1157
1158            tr_net_strerror( errstr, sizeof( errstr ), e );
1159            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
1160
1161            if( io->gotError != NULL )
1162                io->gotError( io, what, io->userData );
1163        }
1164    }
1165
1166    return n;
1167}
1168
1169int
1170tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1171{
1172    int bytesUsed = 0;
1173
1174    assert( tr_isPeerIo( io ) );
1175    assert( tr_isDirection( dir ) );
1176
1177    if( dir == TR_DOWN )
1178        bytesUsed = tr_peerIoTryRead( io, limit );
1179    else
1180        bytesUsed = tr_peerIoTryWrite( io, limit );
1181
1182    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1183    return bytesUsed;
1184}
1185
1186int
1187tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1188{
1189    size_t byteCount = 0;
1190    tr_list * it;
1191
1192    /* count up how many bytes are used by non-piece-data messages
1193       at the front of our outbound queue */
1194    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1195    {
1196        struct tr_datatype * d = it->data;
1197
1198        if( d->isPieceData )
1199            break;
1200
1201        byteCount += d->length;
1202    }
1203
1204    return tr_peerIoFlush( io, TR_UP, byteCount );
1205}
Note: See TracBrowser for help on using the repository browser.