source: trunk/libtransmission/peer-io.c @ 11709

Last change on this file since 11709 was 11709, checked in by jordan, 11 years ago

Update the copyright year in the source code comments.

The Berne Convention says that the copyright year is moot, so instead of adding another year to each file as in previous years, I've removed the year altogether from the source code comments in libtransmission, gtk, qt, utils, daemon, and cli.

Juliusz's copyright notice in tr-dht and Johannes' copyright notice in tr-lpd have been left alone; it didn't seem appropriate to modify them.

  • Property svn:keywords set to Date Rev Author Id
File size: 25.5 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 11709 2011-01-19 13:48:47Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <string.h>
17#include <stdio.h>
18#include <unistd.h>
19
20#ifdef WIN32
21 #include <winsock2.h>
22#else
23 #include <arpa/inet.h> /* inet_ntoa */
24#endif
25
26#include <event2/event.h>
27#include <event2/bufferevent.h>
28
29#include "transmission.h"
30#include "session.h"
31#include "bandwidth.h"
32#include "crypto.h"
33#include "list.h"
34#include "net.h"
35#include "peer-common.h" /* MAX_BLOCK_SIZE */
36#include "peer-io.h"
37#include "trevent.h" /* tr_runInEventThread() */
38#include "utils.h"
39
40#define MAGIC_NUMBER 206745
41
42#ifdef WIN32
43 #define EAGAIN       WSAEWOULDBLOCK
44 #define EINTR        WSAEINTR
45 #define EINPROGRESS  WSAEINPROGRESS
46 #define EPIPE        WSAECONNRESET
47#endif
48
49static size_t
50guessPacketOverhead( size_t d )
51{
52    /**
53     * http://sd.wareonearth.com/~phil/net/overhead/
54     *
55     * TCP over Ethernet:
56     * Assuming no header compression (e.g. not PPP)
57     * Add 20 IPv4 header or 40 IPv6 header (no options)
58     * Add 20 TCP header
59     * Add 12 bytes optional TCP timestamps
60     * Max TCP Payload data rates over ethernet are thus:
61     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
62     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
63     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
64     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
65     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
66     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
67     */
68    const double assumed_payload_data_rate = 94.0;
69
70    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
71}
72
73/**
74***
75**/
76
77#define dbgmsg( io, ... ) \
78    do { \
79        if( tr_deepLoggingIsActive( ) ) \
80            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
81    } while( 0 )
82
83struct tr_datatype
84{
85    tr_bool  isPieceData;
86    size_t   length;
87};
88
89/***
90****
91***/
92
93static void
94didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
95{
96     while( bytes_transferred && tr_isPeerIo( io ) )
97     {
98        struct tr_datatype * next = io->outbuf_datatypes->data;
99
100        const unsigned int payload = MIN( next->length, bytes_transferred );
101        const unsigned int overhead = guessPacketOverhead( payload );
102        const uint64_t now = tr_time_msec( );
103
104        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
105
106        if( overhead > 0 )
107            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, FALSE, now );
108
109        if( io->didWrite )
110            io->didWrite( io, payload, next->isPieceData, io->userData );
111
112        if( tr_isPeerIo( io ) )
113        {
114            bytes_transferred -= payload;
115            next->length -= payload;
116            if( !next->length ) {
117                tr_list_pop_front( &io->outbuf_datatypes );
118                tr_free( next );
119            }
120        }
121    }
122}
123
124static void
125canReadWrapper( tr_peerIo * io )
126{
127    tr_bool err = 0;
128    tr_bool done = 0;
129    tr_session * session;
130
131    dbgmsg( io, "canRead" );
132
133    assert( tr_isPeerIo( io ) );
134    assert( tr_isSession( io->session ) );
135    tr_peerIoRef( io );
136
137    session = io->session;
138
139    /* try to consume the input buffer */
140    if( io->canRead )
141    {
142        tr_sessionLock( session );
143
144        while( !done && !err )
145        {
146            size_t piece = 0;
147            const size_t oldLen = evbuffer_get_length( io->inbuf );
148            const int ret = io->canRead( io, io->userData, &piece );
149
150            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
151
152            assert( tr_isPeerIo( io ) );
153
154            if( piece || (piece!=used) )
155            {
156                const uint64_t now = tr_time_msec( );
157
158                if( piece )
159                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, TRUE, now );
160
161                if( used != piece )
162                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, FALSE, now );
163            }
164
165            switch( ret )
166            {
167                case READ_NOW:
168                    if( evbuffer_get_length( io->inbuf ) )
169                        continue;
170                    done = 1;
171                    break;
172
173                case READ_LATER:
174                    done = 1;
175                    break;
176
177                case READ_ERR:
178                    err = 1;
179                    break;
180            }
181
182            assert( tr_isPeerIo( io ) );
183        }
184
185        tr_sessionUnlock( session );
186    }
187
188    assert( tr_isPeerIo( io ) );
189    tr_peerIoUnref( io );
190}
191
192tr_bool
193tr_isPeerIo( const tr_peerIo * io )
194{
195    return ( io != NULL )
196        && ( io->magicNumber == MAGIC_NUMBER )
197        && ( io->refCount >= 0 )
198        && ( tr_isBandwidth( &io->bandwidth ) )
199        && ( tr_isAddress( &io->addr ) );
200}
201
202static void
203event_read_cb( int fd, short event UNUSED, void * vio )
204{
205    int res;
206    int e;
207    tr_peerIo * io = vio;
208
209    /* Limit the input buffer to 256K, so it doesn't grow too large */
210    unsigned int howmuch;
211    unsigned int curlen;
212    const tr_direction dir = TR_DOWN;
213    const unsigned int max = 256 * 1024;
214
215    assert( tr_isPeerIo( io ) );
216
217    io->pendingEvents &= ~EV_READ;
218
219    curlen = evbuffer_get_length( io->inbuf );
220    howmuch = curlen >= max ? 0 : max - curlen;
221    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
222
223    dbgmsg( io, "libevent says this peer is ready to read" );
224
225    /* if we don't have any bandwidth left, stop reading */
226    if( howmuch < 1 ) {
227        tr_peerIoSetEnabled( io, dir, FALSE );
228        return;
229    }
230
231    EVUTIL_SET_SOCKET_ERROR( 0 );
232    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
233    e = EVUTIL_SOCKET_ERROR( );
234
235    if( res > 0 )
236    {
237        tr_peerIoSetEnabled( io, dir, TRUE );
238
239        /* Invoke the user callback - must always be called last */
240        canReadWrapper( io );
241    }
242    else
243    {
244        char errstr[512];
245        short what = BEV_EVENT_READING;
246
247        if( res == 0 ) /* EOF */
248            what |= BEV_EVENT_EOF;
249        else if( res == -1 ) {
250            if( e == EAGAIN || e == EINTR ) {
251                tr_peerIoSetEnabled( io, dir, TRUE );
252                return;
253            }
254            what |= BEV_EVENT_ERROR;
255        }
256
257        tr_net_strerror( errstr, sizeof( errstr ), e );
258        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
259
260        if( io->gotError != NULL )
261            io->gotError( io, what, io->userData );
262    }
263}
264
265static int
266tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
267{
268    int e;
269    int n;
270    char errstr[256];
271
272    EVUTIL_SET_SOCKET_ERROR( 0 );
273    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
274    e = EVUTIL_SOCKET_ERROR( );
275    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
276
277    return n;
278}
279
280static void
281event_write_cb( int fd, short event UNUSED, void * vio )
282{
283    int res = 0;
284    int e;
285    short what = BEV_EVENT_WRITING;
286    tr_peerIo * io = vio;
287    size_t howmuch;
288    const tr_direction dir = TR_UP;
289
290    assert( tr_isPeerIo( io ) );
291
292    io->pendingEvents &= ~EV_WRITE;
293
294    dbgmsg( io, "libevent says this peer is ready to write" );
295
296    /* Write as much as possible, since the socket is non-blocking, write() will
297     * return if it can't write any more data without blocking */
298    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
299
300    /* if we don't have any bandwidth left, stop writing */
301    if( howmuch < 1 ) {
302        tr_peerIoSetEnabled( io, dir, FALSE );
303        return;
304    }
305
306    EVUTIL_SET_SOCKET_ERROR( 0 );
307    res = tr_evbuffer_write( io, fd, howmuch );
308    e = EVUTIL_SOCKET_ERROR( );
309
310    if (res == -1) {
311        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
312            goto reschedule;
313        /* error case */
314        what |= BEV_EVENT_ERROR;
315    } else if (res == 0) {
316        /* eof case */
317        what |= BEV_EVENT_EOF;
318    }
319    if (res <= 0)
320        goto error;
321
322    if( evbuffer_get_length( io->outbuf ) )
323        tr_peerIoSetEnabled( io, dir, TRUE );
324
325    didWriteWrapper( io, res );
326    return;
327
328 reschedule:
329    if( evbuffer_get_length( io->outbuf ) )
330        tr_peerIoSetEnabled( io, dir, TRUE );
331    return;
332
333 error:
334
335    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, strerror( e ) );
336
337    if( io->gotError != NULL )
338        io->gotError( io, what, io->userData );
339}
340
341/**
342***
343**/
344
345static void
346maybeSetCongestionAlgorithm( int socket, const char * algorithm )
347{
348    if( algorithm && *algorithm )
349    {
350        const int rc = tr_netSetCongestionControl( socket, algorithm );
351
352        if( rc < 0 )
353            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
354                     algorithm, tr_strerror( errno ));
355    }
356}
357
358static tr_peerIo*
359tr_peerIoNew( tr_session       * session,
360              tr_bandwidth     * parent,
361              const tr_address * addr,
362              tr_port            port,
363              const uint8_t    * torrentHash,
364              tr_bool            isIncoming,
365              tr_bool            isSeed,
366              int                socket )
367{
368    tr_peerIo * io;
369
370    assert( session != NULL );
371    assert( session->events != NULL );
372    assert( tr_isBool( isIncoming ) );
373    assert( tr_isBool( isSeed ) );
374    assert( tr_amInEventThread( session ) );
375
376    if( socket >= 0 ) {
377        tr_netSetTOS( socket, session->peerSocketTOS );
378        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
379    }
380
381    io = tr_new0( tr_peerIo, 1 );
382    io->magicNumber = MAGIC_NUMBER;
383    io->refCount = 1;
384    io->crypto = tr_cryptoNew( torrentHash, isIncoming );
385    io->session = session;
386    io->addr = *addr;
387    io->isSeed = isSeed;
388    io->port = port;
389    io->socket = socket;
390    io->isIncoming = isIncoming != 0;
391    io->timeCreated = tr_time( );
392    io->inbuf = evbuffer_new( );
393    io->outbuf = evbuffer_new( );
394    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
395    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
396    tr_bandwidthConstruct( &io->bandwidth, session, parent );
397    tr_bandwidthSetPeer( &io->bandwidth, io );
398    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
399
400    return io;
401}
402
403tr_peerIo*
404tr_peerIoNewIncoming( tr_session        * session,
405                      tr_bandwidth      * parent,
406                      const tr_address  * addr,
407                      tr_port             port,
408                      int                 fd )
409{
410    assert( session );
411    assert( tr_isAddress( addr ) );
412    assert( fd >= 0 );
413
414    return tr_peerIoNew( session, parent, addr, port, NULL, TRUE, FALSE, fd );
415}
416
417tr_peerIo*
418tr_peerIoNewOutgoing( tr_session        * session,
419                      tr_bandwidth      * parent,
420                      const tr_address  * addr,
421                      tr_port             port,
422                      const uint8_t     * torrentHash,
423                      tr_bool             isSeed )
424{
425    int fd;
426
427    assert( session );
428    assert( tr_isAddress( addr ) );
429    assert( torrentHash );
430
431    fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
432    dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
433
434    return fd < 0 ? NULL
435                  : tr_peerIoNew( session, parent, addr, port, torrentHash, FALSE, isSeed, fd );
436}
437
438/***
439****
440***/
441
442static void
443event_enable( tr_peerIo * io, short event )
444{
445    assert( tr_amInEventThread( io->session ) );
446    assert( io->session != NULL );
447    assert( io->session->events != NULL );
448    assert( event_initialized( io->event_read ) );
449    assert( event_initialized( io->event_write ) );
450
451    if( io->socket < 0 )
452        return;
453
454    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
455    {
456        dbgmsg( io, "enabling libevent ready-to-read polling" );
457        event_add( io->event_read, NULL );
458        io->pendingEvents |= EV_READ;
459    }
460
461    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
462    {
463        dbgmsg( io, "enabling libevent ready-to-write polling" );
464        event_add( io->event_write, NULL );
465        io->pendingEvents |= EV_WRITE;
466    }
467}
468
469static void
470event_disable( struct tr_peerIo * io, short event )
471{
472    assert( tr_amInEventThread( io->session ) );
473    assert( io->session != NULL );
474    assert( io->session->events != NULL );
475    assert( event_initialized( io->event_read ) );
476    assert( event_initialized( io->event_write ) );
477
478    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
479    {
480        dbgmsg( io, "disabling libevent ready-to-read polling" );
481        event_del( io->event_read );
482        io->pendingEvents &= ~EV_READ;
483    }
484
485    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
486    {
487        dbgmsg( io, "disabling libevent ready-to-write polling" );
488        event_del( io->event_write );
489        io->pendingEvents &= ~EV_WRITE;
490    }
491}
492
493void
494tr_peerIoSetEnabled( tr_peerIo    * io,
495                     tr_direction   dir,
496                     tr_bool        isEnabled )
497{
498    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
499
500    assert( tr_isPeerIo( io ) );
501    assert( tr_isDirection( dir ) );
502    assert( tr_amInEventThread( io->session ) );
503    assert( io->session->events != NULL );
504
505    if( isEnabled )
506        event_enable( io, event );
507    else
508        event_disable( io, event );
509}
510
511/***
512****
513***/
514
515static void
516io_dtor( void * vio )
517{
518    tr_peerIo * io = vio;
519
520    assert( tr_isPeerIo( io ) );
521    assert( tr_amInEventThread( io->session ) );
522    assert( io->session->events != NULL );
523
524    dbgmsg( io, "in tr_peerIo destructor" );
525    event_disable( io, EV_READ | EV_WRITE );
526    event_free( io->event_read );
527    event_free( io->event_write );
528    tr_bandwidthDestruct( &io->bandwidth );
529    evbuffer_free( io->outbuf );
530    evbuffer_free( io->inbuf );
531    tr_netClose( io->session, io->socket );
532    tr_cryptoFree( io->crypto );
533    tr_list_free( &io->outbuf_datatypes, tr_free );
534
535    memset( io, ~0, sizeof( tr_peerIo ) );
536    tr_free( io );
537}
538
539static void
540tr_peerIoFree( tr_peerIo * io )
541{
542    if( io )
543    {
544        dbgmsg( io, "in tr_peerIoFree" );
545        io->canRead = NULL;
546        io->didWrite = NULL;
547        io->gotError = NULL;
548        tr_runInEventThread( io->session, io_dtor, io );
549    }
550}
551
552void
553tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
554{
555    assert( tr_isPeerIo( io ) );
556
557    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
558                file, line, io->refCount, io->refCount+1 );
559
560    ++io->refCount;
561}
562
563void
564tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
565{
566    assert( tr_isPeerIo( io ) );
567
568    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
569                file, line, io->refCount, io->refCount-1 );
570
571    if( !--io->refCount )
572        tr_peerIoFree( io );
573}
574
575const tr_address*
576tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
577{
578    assert( tr_isPeerIo( io ) );
579
580    if( port )
581        *port = io->port;
582
583    return &io->addr;
584}
585
586const char*
587tr_peerIoAddrStr( const tr_address * addr, tr_port port )
588{
589    static char buf[512];
590
591    if( addr->type == TR_AF_INET )
592        tr_snprintf( buf, sizeof( buf ), "%s:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
593    else
594        tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_ntop_non_ts( addr ), ntohs( port ) );
595    return buf;
596}
597
598const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
599{
600    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
601}
602
603void
604tr_peerIoSetIOFuncs( tr_peerIo        * io,
605                     tr_can_read_cb     readcb,
606                     tr_did_write_cb    writecb,
607                     tr_net_error_cb    errcb,
608                     void             * userData )
609{
610    io->canRead = readcb;
611    io->didWrite = writecb;
612    io->gotError = errcb;
613    io->userData = userData;
614}
615
616void
617tr_peerIoClear( tr_peerIo * io )
618{
619    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
620    tr_peerIoSetEnabled( io, TR_UP, FALSE );
621    tr_peerIoSetEnabled( io, TR_DOWN, FALSE );
622}
623
624int
625tr_peerIoReconnect( tr_peerIo * io )
626{
627    short int pendingEvents;
628    tr_session * session;
629
630    assert( tr_isPeerIo( io ) );
631    assert( !tr_peerIoIsIncoming( io ) );
632
633    session = tr_peerIoGetSession( io );
634
635    pendingEvents = io->pendingEvents;
636    event_disable( io, EV_READ | EV_WRITE );
637
638    if( io->socket >= 0 )
639        tr_netClose( session, io->socket );
640
641    event_del( io->event_read );
642    event_del( io->event_write );
643    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
644    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
645    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
646
647    if( io->socket >= 0 )
648    {
649        event_enable( io, pendingEvents );
650        tr_netSetTOS( io->socket, session->peerSocketTOS );
651        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
652        return 0;
653    }
654
655    return -1;
656}
657
658/**
659***
660**/
661
662void
663tr_peerIoSetTorrentHash( tr_peerIo *     io,
664                         const uint8_t * hash )
665{
666    assert( tr_isPeerIo( io ) );
667
668    tr_cryptoSetTorrentHash( io->crypto, hash );
669}
670
671const uint8_t*
672tr_peerIoGetTorrentHash( tr_peerIo * io )
673{
674    assert( tr_isPeerIo( io ) );
675    assert( io->crypto );
676
677    return tr_cryptoGetTorrentHash( io->crypto );
678}
679
680int
681tr_peerIoHasTorrentHash( const tr_peerIo * io )
682{
683    assert( tr_isPeerIo( io ) );
684    assert( io->crypto );
685
686    return tr_cryptoHasTorrentHash( io->crypto );
687}
688
689/**
690***
691**/
692
693void
694tr_peerIoSetPeersId( tr_peerIo *     io,
695                     const uint8_t * peer_id )
696{
697    assert( tr_isPeerIo( io ) );
698
699    if( ( io->peerIdIsSet = peer_id != NULL ) )
700        memcpy( io->peerId, peer_id, 20 );
701    else
702        memset( io->peerId, 0, 20 );
703}
704
705/**
706***
707**/
708
709static unsigned int
710getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
711{
712    /* this is all kind of arbitrary, but what seems to work well is
713     * being large enough to hold the next 20 seconds' worth of input,
714     * or a few blocks, whichever is bigger.
715     * It's okay to tweak this as needed */
716    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
717    const unsigned int period = 15u; /* arbitrary */
718    /* the 3 is arbitrary; the .5 is to leave room for messages */
719    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
720    return MAX( ceiling, currentSpeed_Bps*period );
721}
722
723size_t
724tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
725{
726    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
727    const size_t currentLen = evbuffer_get_length( io->outbuf );
728    size_t freeSpace = 0;
729
730    if( desiredLen > currentLen )
731        freeSpace = desiredLen - currentLen;
732
733    return freeSpace;
734}
735
736/**
737***
738**/
739
740void
741tr_peerIoSetEncryption( tr_peerIo * io, uint32_t encryptionMode )
742{
743    assert( tr_isPeerIo( io ) );
744    assert( encryptionMode == PEER_ENCRYPTION_NONE
745         || encryptionMode == PEER_ENCRYPTION_RC4 );
746
747    io->encryptionMode = encryptionMode;
748}
749
750/**
751***
752**/
753
754static void
755addDatatype( tr_peerIo * io, size_t byteCount, tr_bool isPieceData )
756{
757    struct tr_datatype * d;
758
759    d = tr_new( struct tr_datatype, 1 );
760    d->isPieceData = isPieceData != 0;
761    d->length = byteCount;
762    tr_list_append( &io->outbuf_datatypes, d );
763}
764
765static struct evbuffer_iovec *
766evbuffer_peek_all( struct evbuffer * buf, size_t * setme_vecCount )
767{
768    const size_t byteCount = evbuffer_get_length( buf );
769    const int vecCount = evbuffer_peek( buf, byteCount, NULL, NULL, 0 );
770    struct evbuffer_iovec * iovec = tr_new0( struct evbuffer_iovec, vecCount );
771    const int n = evbuffer_peek( buf, byteCount, NULL, iovec, vecCount );
772    assert( vecCount == n );
773    *setme_vecCount = vecCount;
774    return iovec;
775}
776
777static void
778maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
779{
780    if( io->encryptionMode == PEER_ENCRYPTION_RC4 )
781    {
782        size_t i, n;
783        struct evbuffer_iovec * iovec = evbuffer_peek_all( buf, &n );
784
785        for( i=0; i<n; ++i )
786            tr_cryptoEncrypt( io->crypto, iovec[i].iov_len, iovec[i].iov_base, iovec[i].iov_base );
787
788        tr_free( iovec );
789    }
790}
791
792void
793tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, tr_bool isPieceData )
794{
795    const size_t byteCount = evbuffer_get_length( buf );
796    maybeEncryptBuffer( io, buf );
797    evbuffer_add_buffer( io->outbuf, buf );
798    addDatatype( io, byteCount, isPieceData );
799}
800
801void
802tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, tr_bool isPieceData )
803{
804    struct evbuffer * buf = evbuffer_new( );
805    evbuffer_add( buf, bytes, byteCount );
806    tr_peerIoWriteBuf( io, buf, isPieceData );
807    evbuffer_free( buf );
808}
809
810/***
811****
812***/
813
814void
815evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
816{
817    const uint16_t ns = htons( addme_hs );
818    evbuffer_add( outbuf, &ns, sizeof( ns ) );
819}
820
821void
822evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
823{
824    const uint32_t nl = htonl( addme_hl );
825    evbuffer_add( outbuf, &nl, sizeof( nl ) );
826}
827
828/***
829****
830***/
831
832void
833tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
834{
835    assert( tr_isPeerIo( io ) );
836    assert( evbuffer_get_length( inbuf )  >= byteCount );
837
838    switch( io->encryptionMode )
839    {
840        case PEER_ENCRYPTION_NONE:
841            evbuffer_remove( inbuf, bytes, byteCount );
842            break;
843
844        case PEER_ENCRYPTION_RC4:
845            evbuffer_remove( inbuf, bytes, byteCount );
846            tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
847            break;
848
849        default:
850            assert( 0 );
851    }
852}
853
854void
855tr_peerIoReadUint16( tr_peerIo        * io,
856                     struct evbuffer  * inbuf,
857                     uint16_t         * setme )
858{
859    uint16_t tmp;
860    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
861    *setme = ntohs( tmp );
862}
863
864void tr_peerIoReadUint32( tr_peerIo        * io,
865                          struct evbuffer  * inbuf,
866                          uint32_t         * setme )
867{
868    uint32_t tmp;
869    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
870    *setme = ntohl( tmp );
871}
872
873void
874tr_peerIoDrain( tr_peerIo       * io,
875                struct evbuffer * inbuf,
876                size_t            byteCount )
877{
878    void * buf = tr_sessionGetBuffer( io->session );
879    const size_t buflen = SESSION_BUFFER_SIZE;
880
881    while( byteCount > 0 )
882    {
883        const size_t thisPass = MIN( byteCount, buflen );
884        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
885        byteCount -= thisPass;
886    }
887
888    tr_sessionReleaseBuffer( io->session );
889}
890
891/***
892****
893***/
894
895static int
896tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
897{
898    int res = 0;
899
900    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
901    {
902        int e;
903
904        EVUTIL_SET_SOCKET_ERROR( 0 );
905        res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
906        e = EVUTIL_SOCKET_ERROR( );
907
908        dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(e):"") );
909
910        if( evbuffer_get_length( io->inbuf ) )
911            canReadWrapper( io );
912
913        if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
914        {
915            char errstr[512];
916            short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
917            if( res == 0 )
918                what |= BEV_EVENT_EOF;
919            tr_net_strerror( errstr, sizeof( errstr ), e );
920            dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
921            io->gotError( io, what, io->userData );
922        }
923    }
924
925    return res;
926}
927
928static int
929tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
930{
931    int n = 0;
932
933    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
934    {
935        int e;
936        EVUTIL_SET_SOCKET_ERROR( 0 );
937        n = tr_evbuffer_write( io, io->socket, howmuch );
938        e = EVUTIL_SOCKET_ERROR( );
939
940        if( n > 0 )
941            didWriteWrapper( io, n );
942
943        if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
944        {
945            char errstr[512];
946            const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
947
948            tr_net_strerror( errstr, sizeof( errstr ), e );
949            dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, errstr );
950
951            if( io->gotError != NULL )
952                io->gotError( io, what, io->userData );
953        }
954    }
955
956    return n;
957}
958
959int
960tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
961{
962    int bytesUsed = 0;
963
964    assert( tr_isPeerIo( io ) );
965    assert( tr_isDirection( dir ) );
966
967    if( dir == TR_DOWN )
968        bytesUsed = tr_peerIoTryRead( io, limit );
969    else
970        bytesUsed = tr_peerIoTryWrite( io, limit );
971
972    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
973    return bytesUsed;
974}
975
976int
977tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
978{
979    size_t byteCount = 0;
980    tr_list * it;
981
982    /* count up how many bytes are used by non-piece-data messages
983       at the front of our outbound queue */
984    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
985    {
986        struct tr_datatype * d = it->data;
987
988        if( d->isPieceData )
989            break;
990
991        byteCount += d->length;
992    }
993
994    return tr_peerIoFlush( io, TR_UP, byteCount );
995}
Note: See TracBrowser for help on using the repository browser.