source: branches/1.5x/libtransmission/peer-msgs.c @ 8204

Last change on this file since 8204 was 8204, checked in by charles, 13 years ago

(1.5x libT) various backports for 1.52:
(1) recognize Aria2 as a client
(2) remove jhujhiti's tr_suspectAddress(), since he removed it from trunka
(3) on Mac, better detection of where the Web UI files are located
(4) reintroduce the web task queue
(5) various minor formatting changes to reduce the diffs between 1.52 and trunk

  • Property svn:keywords set to Date Rev Author Id
File size: 62.6 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 8204 2009-04-10 17:34:25Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
35#include "ratecontrol.h"
36#include "request-list.h"
37#include "stats.h"
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    TR_LTEP_PEX             = 1,
70
71
72
73    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
74
75    /* idle seconds before we send a keepalive */
76    KEEPALIVE_INTERVAL_SECS = 100,
77
78    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
79
80
81    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
82
83
84    /* how long an unsent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    QUEUED_REQUEST_TTL_SECS = 20,
87
88    /* how long a sent request can stay queued before it's returned
89       back to the peer-mgr's pool of requests */
90    SENT_REQUEST_TTL_SECS = 240,
91
92    /* used in lowering the outMessages queue period */
93    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
94    HIGH_PRIORITY_INTERVAL_SECS = 2,
95    LOW_PRIORITY_INTERVAL_SECS = 20,
96
97    /* number of pieces to remove from the bitfield when
98     * lazy bitfields are turned on */
99    LAZY_PIECE_COUNT = 26,
100
101    /* number of pieces we'll allow in our fast set */
102    MAX_FAST_SET_SIZE = 3
103};
104
105enum
106{
107    AWAITING_BT_LENGTH,
108    AWAITING_BT_ID,
109    AWAITING_BT_MESSAGE,
110    AWAITING_BT_PIECE
111};
112
113/**
114***
115**/
116
117/* this is raw, unchanged data from the peer regarding
118 * the current message that it's sending us. */
119struct tr_incoming
120{
121    uint8_t                id;
122    uint32_t               length; /* includes the +1 for id length */
123    struct peer_request    blockReq; /* metadata for incoming blocks */
124    struct evbuffer *      block; /* piece data for incoming blocks */
125};
126
127/**
128 * Low-level communication state information about a connected peer.
129 *
130 * This structure remembers the low-level protocol states that we're
131 * in with this peer, such as active requests, pex messages, and so on.
132 * Its fields are all private to peer-msgs.c.
133 *
134 * Data not directly involved with sending & receiving messages is
135 * stored in tr_peer, where it can be accessed by both peermsgs and
136 * the peer manager.
137 *
138 * @see struct peer_atom
139 * @see tr_peer
140 */
141struct tr_peermsgs
142{
143    tr_bool         peerSupportsPex;
144    tr_bool         clientSentLtepHandshake;
145    tr_bool         peerSentLtepHandshake;
146    tr_bool         haveFastSet;
147
148    uint8_t         state;
149    uint8_t         ut_pex_id;
150    uint16_t        pexCount;
151    uint16_t        pexCount6;
152    uint16_t        maxActiveRequests;
153
154    size_t                 fastsetSize;
155    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
156
157    /* how long the outMessages batch should be allowed to grow before
158     * it's flushed -- some messages (like requests >:) should be sent
159     * very quickly; others aren't as urgent. */
160    int                    outMessagesBatchPeriod;
161
162    tr_peer *              peer;
163
164    tr_session *           session;
165    tr_torrent *           torrent;
166
167    tr_publisher           publisher;
168
169    struct evbuffer *      outMessages; /* all the non-piece messages */
170
171    struct request_list    peerAskedFor;
172    struct request_list    clientAskedFor;
173    struct request_list    clientWillAskFor;
174
175    tr_timer             * pexTimer;
176    tr_pex               * pex;
177    tr_pex               * pex6;
178
179    time_t                 clientSentPexAt;
180    time_t                 clientSentAnythingAt;
181
182    /* when we started batching the outMessages */
183    time_t                outMessagesBatchedAt;
184
185    struct tr_incoming    incoming;
186
187    /* if the peer supports the Extension Protocol in BEP 10 and
188       supplied a reqq argument, it's stored here.  otherwise the
189       value is zero and should be ignored. */
190    int64_t               reqq;
191};
192
193/**
194***
195**/
196
197static void
198myDebug( const char * file, int line,
199         const struct tr_peermsgs * msgs,
200         const char * fmt, ... )
201{
202    FILE * fp = tr_getLog( );
203
204    if( fp )
205    {
206        va_list           args;
207        char              timestr[64];
208        struct evbuffer * buf = tr_getBuffer( );
209        char *            base = tr_basename( file );
210
211        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
212                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
213                             msgs->torrent->info.name,
214                             tr_peerIoGetAddrStr( msgs->peer->io ),
215                             msgs->peer->client );
216        va_start( args, fmt );
217        evbuffer_add_vprintf( buf, fmt, args );
218        va_end( args );
219        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
220        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
221
222        tr_free( base );
223        tr_releaseBuffer( buf );
224    }
225}
226
227#define dbgmsg( msgs, ... ) \
228    do { \
229        if( tr_deepLoggingIsActive( ) ) \
230            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
231    } while( 0 )
232
233/**
234***
235**/
236
237static void
238pokeBatchPeriod( tr_peermsgs * msgs,
239                 int           interval )
240{
241    if( msgs->outMessagesBatchPeriod > interval )
242    {
243        msgs->outMessagesBatchPeriod = interval;
244        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
245    }
246}
247
248static TR_INLINE void
249dbgOutMessageLen( tr_peermsgs * msgs )
250{
251    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
252}
253
254static void
255protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
256{
257    tr_peerIo       * io  = msgs->peer->io;
258    struct evbuffer * out = msgs->outMessages;
259
260    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
261
262    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
263    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
264    tr_peerIoWriteUint32( io, out, req->index );
265    tr_peerIoWriteUint32( io, out, req->offset );
266    tr_peerIoWriteUint32( io, out, req->length );
267
268    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
269    dbgOutMessageLen( msgs );
270}
271
272static void
273protocolSendRequest( tr_peermsgs               * msgs,
274                     const struct peer_request * req )
275{
276    tr_peerIo       * io  = msgs->peer->io;
277    struct evbuffer * out = msgs->outMessages;
278
279    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
280    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
281    tr_peerIoWriteUint32( io, out, req->index );
282    tr_peerIoWriteUint32( io, out, req->offset );
283    tr_peerIoWriteUint32( io, out, req->length );
284
285    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
286    dbgOutMessageLen( msgs );
287    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
288}
289
290static void
291protocolSendCancel( tr_peermsgs               * msgs,
292                    const struct peer_request * req )
293{
294    tr_peerIo       * io  = msgs->peer->io;
295    struct evbuffer * out = msgs->outMessages;
296
297    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
298    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
299    tr_peerIoWriteUint32( io, out, req->index );
300    tr_peerIoWriteUint32( io, out, req->offset );
301    tr_peerIoWriteUint32( io, out, req->length );
302
303    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
304    dbgOutMessageLen( msgs );
305    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
306}
307
308static void
309protocolSendHave( tr_peermsgs * msgs,
310                  uint32_t      index )
311{
312    tr_peerIo       * io  = msgs->peer->io;
313    struct evbuffer * out = msgs->outMessages;
314
315    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
316    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
317    tr_peerIoWriteUint32( io, out, index );
318
319    dbgmsg( msgs, "sending Have %u", index );
320    dbgOutMessageLen( msgs );
321    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
322}
323
324#if 0
325static void
326protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
327{
328    tr_peerIo       * io  = msgs->peer->io;
329    struct evbuffer * out = msgs->outMessages;
330
331    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
332
333    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
334    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
335    tr_peerIoWriteUint32( io, out, pieceIndex );
336
337    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
338    dbgOutMessageLen( msgs );
339}
340#endif
341
342static void
343protocolSendChoke( tr_peermsgs * msgs,
344                   int           choke )
345{
346    tr_peerIo       * io  = msgs->peer->io;
347    struct evbuffer * out = msgs->outMessages;
348
349    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
350    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
351
352    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
353    dbgOutMessageLen( msgs );
354    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
355}
356
357static void
358protocolSendHaveAll( tr_peermsgs * msgs )
359{
360    tr_peerIo       * io  = msgs->peer->io;
361    struct evbuffer * out = msgs->outMessages;
362
363    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
364
365    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
366    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
367
368    dbgmsg( msgs, "sending HAVE_ALL..." );
369    dbgOutMessageLen( msgs );
370    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
371}
372
373static void
374protocolSendHaveNone( tr_peermsgs * msgs )
375{
376    tr_peerIo       * io  = msgs->peer->io;
377    struct evbuffer * out = msgs->outMessages;
378
379    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
380
381    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
382    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
383
384    dbgmsg( msgs, "sending HAVE_NONE..." );
385    dbgOutMessageLen( msgs );
386    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
387}
388
389/**
390***  EVENTS
391**/
392
393static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
394
395static void
396publish( tr_peermsgs * msgs, tr_peer_event * e )
397{
398    assert( msgs->peer );
399    assert( msgs->peer->msgs == msgs );
400
401    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
402}
403
404static void
405fireError( tr_peermsgs * msgs, int err )
406{
407    tr_peer_event e = blankEvent;
408    e.eventType = TR_PEER_ERROR;
409    e.err = err;
410    publish( msgs, &e );
411}
412
413static void
414fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
415{
416    tr_peer_event e = blankEvent;
417    e.eventType = TR_PEER_UPLOAD_ONLY;
418    e.uploadOnly = uploadOnly;
419    publish( msgs, &e );
420}
421
422static void
423fireNeedReq( tr_peermsgs * msgs )
424{
425    tr_peer_event e = blankEvent;
426    e.eventType = TR_PEER_NEED_REQ;
427    publish( msgs, &e );
428}
429
430static void
431firePeerProgress( tr_peermsgs * msgs )
432{
433    tr_peer_event e = blankEvent;
434    e.eventType = TR_PEER_PEER_PROGRESS;
435    e.progress = msgs->peer->progress;
436    publish( msgs, &e );
437}
438
439static void
440fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
441{
442    tr_peer_event e = blankEvent;
443    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
444    e.pieceIndex = req->index;
445    e.offset = req->offset;
446    e.length = req->length;
447    publish( msgs, &e );
448}
449
450static void
451fireClientGotData( tr_peermsgs * msgs,
452                   uint32_t      length,
453                   int           wasPieceData )
454{
455    tr_peer_event e = blankEvent;
456
457    e.length = length;
458    e.eventType = TR_PEER_CLIENT_GOT_DATA;
459    e.wasPieceData = wasPieceData;
460    publish( msgs, &e );
461}
462
463static void
464fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
465{
466    tr_peer_event e = blankEvent;
467    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
468    e.pieceIndex = pieceIndex;
469    publish( msgs, &e );
470}
471
472static void
473fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
474{
475    tr_peer_event e = blankEvent;
476    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
477    e.pieceIndex = pieceIndex;
478    publish( msgs, &e );
479}
480
481static void
482firePeerGotData( tr_peermsgs  * msgs,
483                 uint32_t       length,
484                 int            wasPieceData )
485{
486    tr_peer_event e = blankEvent;
487
488    e.length = length;
489    e.eventType = TR_PEER_PEER_GOT_DATA;
490    e.wasPieceData = wasPieceData;
491
492    publish( msgs, &e );
493}
494
495static void
496fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
497{
498    tr_peer_event e = blankEvent;
499    e.eventType = TR_PEER_CANCEL;
500    e.pieceIndex = req->index;
501    e.offset = req->offset;
502    e.length = req->length;
503    publish( msgs, &e );
504}
505
506/**
507***  ALLOWED FAST SET
508***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
509**/
510
511size_t
512tr_generateAllowedSet( tr_piece_index_t * setmePieces,
513                       size_t             desiredSetSize,
514                       size_t             pieceCount,
515                       const uint8_t    * infohash,
516                       const tr_address * addr )
517{
518    size_t setSize = 0;
519
520    assert( setmePieces );
521    assert( desiredSetSize <= pieceCount );
522    assert( desiredSetSize );
523    assert( pieceCount );
524    assert( infohash );
525    assert( addr );
526
527    if( addr->type == TR_AF_INET )
528    {
529        uint8_t w[SHA_DIGEST_LENGTH + 4];
530        uint8_t x[SHA_DIGEST_LENGTH];
531
532        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
533        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
534        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
535
536        while( setSize<desiredSetSize )
537        {
538            int i;
539            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
540            {
541                size_t k;
542                uint32_t j = i * 4;                                  /* (5) */
543                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
544                uint32_t index = y % pieceCount;                     /* (7) */
545
546                for( k=0; k<setSize; ++k )                           /* (8) */
547                    if( setmePieces[k] == index )
548                        break;
549
550                if( k == setSize )
551                    setmePieces[setSize++] = index;                  /* (9) */
552            }
553
554            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
555        }
556    }
557
558    return setSize;
559}
560
561static void
562updateFastSet( tr_peermsgs * msgs UNUSED )
563{
564#if 0
565    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
566    const int peerIsNeedy = msgs->peer->progress < 0.10;
567
568    if( fext && peerIsNeedy && !msgs->haveFastSet )
569    {
570        size_t i;
571        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
572        const tr_info * inf = &msgs->torrent->info;
573        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
574
575        /* build the fast set */
576        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
577        msgs->haveFastSet = 1;
578
579        /* send it to the peer */
580        for( i=0; i<msgs->fastsetSize; ++i )
581            protocolSendAllowedFast( msgs, msgs->fastset[i] );
582    }
583#endif
584}
585
586/**
587***  INTEREST
588**/
589
590static tr_bool
591isPieceInteresting( const tr_peermsgs * msgs,
592                    tr_piece_index_t    piece )
593{
594    const tr_torrent * torrent = msgs->torrent;
595
596    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
597          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
598          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
599}
600
601/* "interested" means we'll ask for piece data if they unchoke us */
602static tr_bool
603isPeerInteresting( const tr_peermsgs * msgs )
604{
605    tr_piece_index_t    i;
606    const tr_torrent *  torrent;
607    const tr_bitfield * bitfield;
608    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
609
610    if( clientIsSeed )
611        return FALSE;
612
613    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
614        return FALSE;
615
616    torrent = msgs->torrent;
617    bitfield = tr_cpPieceBitfield( &torrent->completion );
618
619    if( !msgs->peer->have )
620        return TRUE;
621
622    assert( bitfield->byteCount == msgs->peer->have->byteCount );
623
624    for( i = 0; i < torrent->info.pieceCount; ++i )
625        if( isPieceInteresting( msgs, i ) )
626            return TRUE;
627
628    return FALSE;
629}
630
631static void
632sendInterest( tr_peermsgs * msgs,
633              int           weAreInterested )
634{
635    struct evbuffer * out = msgs->outMessages;
636
637    assert( msgs );
638    assert( weAreInterested == 0 || weAreInterested == 1 );
639
640    msgs->peer->clientIsInterested = weAreInterested;
641    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
642    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
643    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
644
645    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
646    dbgOutMessageLen( msgs );
647}
648
649static void
650updateInterest( tr_peermsgs * msgs )
651{
652    const int i = isPeerInteresting( msgs );
653
654    if( i != msgs->peer->clientIsInterested )
655        sendInterest( msgs, i );
656    if( i )
657        fireNeedReq( msgs );
658}
659
660static tr_bool
661popNextRequest( tr_peermsgs *         msgs,
662                struct peer_request * setme )
663{
664    return reqListPop( &msgs->peerAskedFor, setme );
665}
666
667static void
668cancelAllRequestsToClient( tr_peermsgs * msgs )
669{
670    struct peer_request req;
671    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
672
673    while( popNextRequest( msgs, &req ) )
674        if( mustSendCancel )
675            protocolSendReject( msgs, &req );
676}
677
678void
679tr_peerMsgsSetChoke( tr_peermsgs * msgs,
680                     int           choke )
681{
682    const time_t now = time( NULL );
683    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
684
685    assert( msgs );
686    assert( msgs->peer );
687    assert( choke == 0 || choke == 1 );
688
689    if( msgs->peer->chokeChangedAt > fibrillationTime )
690    {
691        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
692    }
693    else if( msgs->peer->peerIsChoked != choke )
694    {
695        msgs->peer->peerIsChoked = choke;
696        if( choke )
697            cancelAllRequestsToClient( msgs );
698        protocolSendChoke( msgs, choke );
699        msgs->peer->chokeChangedAt = now;
700    }
701}
702
703/**
704***
705**/
706
707void
708tr_peerMsgsHave( tr_peermsgs * msgs,
709                 uint32_t      index )
710{
711    protocolSendHave( msgs, index );
712
713    /* since we have more pieces now, we might not be interested in this peer */
714    updateInterest( msgs );
715}
716
717/**
718***
719**/
720
721static tr_bool
722reqIsValid( const tr_peermsgs * peer,
723            uint32_t            index,
724            uint32_t            offset,
725            uint32_t            length )
726{
727    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
728}
729
730static tr_bool
731requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
732{
733    return reqIsValid( msgs, req->index, req->offset, req->length );
734}
735
736static void
737expireFromList( tr_peermsgs          * msgs,
738                struct request_list  * list,
739                const time_t           oldestAllowed )
740{
741    size_t i;
742    struct request_list tmp = REQUEST_LIST_INIT;
743
744    /* since the fifo list is sorted by time, the oldest will be first */
745    if( !list->len || ( list->fifo[0].time_requested >= oldestAllowed ) )
746        return;
747
748    /* if we found one too old, start pruning them */
749    reqListCopy( &tmp, list );
750    for( i=0; i<tmp.len; ++i ) {
751        const struct peer_request * req = &tmp.fifo[i];
752        if( req->time_requested >= oldestAllowed )
753            break;
754        tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
755    }
756    reqListClear( &tmp );
757}
758
759static void
760expireOldRequests( tr_peermsgs * msgs, const time_t now  )
761{
762    time_t oldestAllowed;
763    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
764    dbgmsg( msgs, "entering `expire old requests' block" );
765
766    /* cancel requests that have been queued for too long */
767    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
768    expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed );
769
770    /* if the peer doesn't support "Reject Request",
771     * cancel requests that were sent too long ago. */
772    if( !fext ) {
773        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
774        expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed );
775    }
776
777    dbgmsg( msgs, "leaving `expire old requests' block" );
778}
779
780static void
781pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
782{
783    const int           max = msgs->maxActiveRequests;
784    int                 sent = 0;
785    int                 len = msgs->clientAskedFor.len;
786    struct peer_request req;
787
788    dbgmsg( msgs, "clientIsChoked %d, download allowed %d, len %d, max %d, msgs->clientWillAskFor.len %d",
789            (int)msgs->peer->clientIsChoked,
790            (int)tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ),
791            len, max, msgs->clientWillAskFor.len );
792
793    if( msgs->peer->clientIsChoked )
794        return;
795    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
796        return;
797
798    while( ( len < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
799    {
800        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
801
802        assert( requestIsValid( msgs, &req ) );
803        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
804
805        /* don't ask for it if we've already got it... this block may have
806         * come in from a different peer after we cancelled a request for it */
807        if( !tr_cpBlockIsComplete( &msgs->torrent->completion, block ) )
808        {
809            protocolSendRequest( msgs, &req );
810            req.time_requested = now;
811            reqListAppend( &msgs->clientAskedFor, &req );
812
813            ++len;
814            ++sent;
815        }
816        else dbgmsg( msgs, "not asking for it because we've already got it..." );
817    }
818
819    if( sent )
820        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
821                sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len );
822
823    if( len < max )
824        fireNeedReq( msgs );
825}
826
827static TR_INLINE tr_bool
828requestQueueIsFull( const tr_peermsgs * msgs )
829{
830    const int req_max = msgs->maxActiveRequests;
831    return msgs->clientWillAskFor.len >= (size_t)req_max;
832}
833
834tr_addreq_t
835tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
836                       uint32_t         index,
837                       uint32_t         offset,
838                       uint32_t         length )
839{
840    struct peer_request req;
841
842    assert( msgs );
843    assert( msgs->torrent );
844
845    /**
846    ***  Reasons to decline the request
847    **/
848
849    /* don't send requests to choked clients */
850    if( msgs->peer->clientIsChoked ) {
851        dbgmsg( msgs, "declining request because they're choking us" );
852        return TR_ADDREQ_CLIENT_CHOKED;
853    }
854
855    /* peer's queue is full */
856    if( requestQueueIsFull( msgs ) ) {
857        dbgmsg( msgs, "declining request because we're full" );
858        return TR_ADDREQ_FULL;
859    }
860
861    /* peer doesn't have this piece */
862    if( !tr_bitfieldHas( msgs->peer->have, index ) )
863        return TR_ADDREQ_MISSING;
864
865    /* have we already asked for this piece? */
866    req.index = index;
867    req.offset = offset;
868    req.length = length;
869    if( reqListHas( &msgs->clientAskedFor, &req ) ) {
870        dbgmsg( msgs, "declining because it's a duplicate" );
871        return TR_ADDREQ_DUPLICATE;
872    }
873    if( reqListHas( &msgs->clientWillAskFor, &req ) ) {
874        dbgmsg( msgs, "declining because it's a duplicate" );
875        return TR_ADDREQ_DUPLICATE;
876    }
877
878    /**
879    ***  Accept this request
880    **/
881
882    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
883            index, offset, length );
884    req.time_requested = time( NULL );
885    reqListAppend( &msgs->clientWillAskFor, &req );
886    return TR_ADDREQ_OK;
887}
888
889static void
890cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
891{
892    size_t i;
893    struct request_list a = msgs->clientWillAskFor;
894    struct request_list b = msgs->clientAskedFor;
895    dbgmsg( msgs, "cancelling all requests to peer" );
896
897    msgs->clientAskedFor = REQUEST_LIST_INIT;
898    msgs->clientWillAskFor = REQUEST_LIST_INIT;
899
900    for( i=0; i<a.len; ++i )
901        fireCancelledReq( msgs, &a.fifo[i] );
902
903    for( i = 0; i < b.len; ++i ) {
904        fireCancelledReq( msgs, &b.fifo[i] );
905        if( sendCancel )
906            protocolSendCancel( msgs, &b.fifo[i] );
907    }
908
909    reqListClear( &a );
910    reqListClear( &b );
911}
912
913void
914tr_peerMsgsCancel( tr_peermsgs * msgs,
915                   uint32_t      pieceIndex,
916                   uint32_t      offset,
917                   uint32_t      length )
918{
919    struct peer_request req;
920
921    assert( msgs != NULL );
922    assert( length > 0 );
923
924
925    /* have we asked the peer for this piece? */
926    req.index = pieceIndex;
927    req.offset = offset;
928    req.length = length;
929
930    /* if it's only in the queue and hasn't been sent yet, free it */
931    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
932        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
933        fireCancelledReq( msgs, &req );
934    }
935
936    /* if it's already been sent, send a cancel message too */
937    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
938        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
939        protocolSendCancel( msgs, &req );
940        fireCancelledReq( msgs, &req );
941    }
942}
943
944
945/**
946***
947**/
948
949static void
950sendLtepHandshake( tr_peermsgs * msgs )
951{
952    tr_benc val, *m;
953    char * buf;
954    int len;
955    int pex;
956    struct evbuffer * out = msgs->outMessages;
957
958    if( msgs->clientSentLtepHandshake )
959        return;
960
961    dbgmsg( msgs, "sending an ltep handshake" );
962    msgs->clientSentLtepHandshake = 1;
963
964    /* decide if we want to advertise pex support */
965    if( !tr_torrentAllowsPex( msgs->torrent ) )
966        pex = 0;
967    else if( msgs->peerSentLtepHandshake )
968        pex = msgs->peerSupportsPex ? 1 : 0;
969    else
970        pex = 1;
971
972    tr_bencInitDict( &val, 5 );
973    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
974    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
975    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
976    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
977    m  = tr_bencDictAddDict( &val, "m", 1 );
978    if( pex )
979        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
980    buf = tr_bencSave( &val, &len );
981
982    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
983    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
984    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
985    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
986    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
987    dbgOutMessageLen( msgs );
988
989    /* cleanup */
990    tr_bencFree( &val );
991    tr_free( buf );
992}
993
994static void
995parseLtepHandshake( tr_peermsgs *     msgs,
996                    int               len,
997                    struct evbuffer * inbuf )
998{
999    int64_t   i;
1000    tr_benc   val, * sub;
1001    uint8_t * tmp = tr_new( uint8_t, len );
1002
1003    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1004    msgs->peerSentLtepHandshake = 1;
1005
1006    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1007    {
1008        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1009        tr_free( tmp );
1010        return;
1011    }
1012
1013    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1014
1015    /* does the peer prefer encrypted connections? */
1016    if( tr_bencDictFindInt( &val, "e", &i ) )
1017        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1018                                              : ENCRYPTION_PREFERENCE_NO;
1019
1020    /* check supported messages for utorrent pex */
1021    msgs->peerSupportsPex = 0;
1022    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1023        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1024            msgs->ut_pex_id = (uint8_t) i;
1025            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1026            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1027        }
1028    }
1029
1030    /* look for upload_only (BEP 21) */
1031    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1032        fireUploadOnly( msgs, i!=0 );
1033
1034    /* get peer's listening port */
1035    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1036        msgs->peer->port = htons( (uint16_t)i );
1037        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1038    }
1039
1040    /* get peer's maximum request queue size */
1041    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1042        msgs->reqq = i;
1043
1044    tr_bencFree( &val );
1045    tr_free( tmp );
1046}
1047
1048static void
1049parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1050{
1051    int loaded = 0;
1052    uint8_t * tmp = tr_new( uint8_t, msglen );
1053    tr_benc val;
1054    tr_torrent * tor = msgs->torrent;
1055    const uint8_t * added;
1056    size_t added_len;
1057
1058    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1059
1060    if( tr_torrentAllowsPex( tor )
1061      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1062    {
1063        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1064        {
1065            const uint8_t * added_f = NULL;
1066            tr_pex *        pex;
1067            size_t          i, n;
1068            size_t          added_f_len = 0;
1069            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1070            pex =
1071                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1072                                        &n );
1073            for( i = 0; i < n; ++i )
1074                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1075            tr_free( pex );
1076        }
1077       
1078        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1079        {
1080            const uint8_t * added_f = NULL;
1081            tr_pex *        pex;
1082            size_t          i, n;
1083            size_t          added_f_len = 0;
1084            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1085            pex =
1086                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1087                                         &n );
1088            for( i = 0; i < n; ++i )
1089                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1090            tr_free( pex );
1091        }
1092       
1093    }
1094
1095    if( loaded )
1096        tr_bencFree( &val );
1097    tr_free( tmp );
1098}
1099
1100static void sendPex( tr_peermsgs * msgs );
1101
1102static void
1103parseLtep( tr_peermsgs *     msgs,
1104           int               msglen,
1105           struct evbuffer * inbuf )
1106{
1107    uint8_t ltep_msgid;
1108
1109    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1110    msglen--;
1111
1112    if( ltep_msgid == LTEP_HANDSHAKE )
1113    {
1114        dbgmsg( msgs, "got ltep handshake" );
1115        parseLtepHandshake( msgs, msglen, inbuf );
1116        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1117        {
1118            sendLtepHandshake( msgs );
1119            sendPex( msgs );
1120        }
1121    }
1122    else if( ltep_msgid == TR_LTEP_PEX )
1123    {
1124        dbgmsg( msgs, "got ut pex" );
1125        msgs->peerSupportsPex = 1;
1126        parseUtPex( msgs, msglen, inbuf );
1127    }
1128    else
1129    {
1130        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1131        evbuffer_drain( inbuf, msglen );
1132    }
1133}
1134
1135static int
1136readBtLength( tr_peermsgs *     msgs,
1137              struct evbuffer * inbuf,
1138              size_t            inlen )
1139{
1140    uint32_t len;
1141
1142    if( inlen < sizeof( len ) )
1143        return READ_LATER;
1144
1145    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1146
1147    if( len == 0 ) /* peer sent us a keepalive message */
1148        dbgmsg( msgs, "got KeepAlive" );
1149    else
1150    {
1151        msgs->incoming.length = len;
1152        msgs->state = AWAITING_BT_ID;
1153    }
1154
1155    return READ_NOW;
1156}
1157
1158static int readBtMessage( tr_peermsgs *     msgs,
1159                          struct evbuffer * inbuf,
1160                          size_t            inlen );
1161
1162static int
1163readBtId( tr_peermsgs *     msgs,
1164          struct evbuffer * inbuf,
1165          size_t            inlen )
1166{
1167    uint8_t id;
1168
1169    if( inlen < sizeof( uint8_t ) )
1170        return READ_LATER;
1171
1172    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1173    msgs->incoming.id = id;
1174    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1175
1176    if( id == BT_PIECE )
1177    {
1178        msgs->state = AWAITING_BT_PIECE;
1179        return READ_NOW;
1180    }
1181    else if( msgs->incoming.length != 1 )
1182    {
1183        msgs->state = AWAITING_BT_MESSAGE;
1184        return READ_NOW;
1185    }
1186    else return readBtMessage( msgs, inbuf, inlen - 1 );
1187}
1188
1189static void
1190updatePeerProgress( tr_peermsgs * msgs )
1191{
1192    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1193    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1194    updateFastSet( msgs );
1195    updateInterest( msgs );
1196    firePeerProgress( msgs );
1197}
1198
1199static void
1200peerMadeRequest( tr_peermsgs *               msgs,
1201                 const struct peer_request * req )
1202{
1203    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1204    const int reqIsValid = requestIsValid( msgs, req );
1205    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1206    const int peerIsChoked = msgs->peer->peerIsChoked;
1207
1208    int allow = FALSE;
1209
1210    if( !reqIsValid )
1211        dbgmsg( msgs, "rejecting an invalid request." );
1212    else if( !clientHasPiece )
1213        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1214    else if( peerIsChoked )
1215        dbgmsg( msgs, "rejecting request from choked peer" );
1216    else
1217        allow = TRUE;
1218
1219    if( allow )
1220        reqListAppend( &msgs->peerAskedFor, req );
1221    else if( fext )
1222        protocolSendReject( msgs, req );
1223}
1224
1225static tr_bool
1226messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1227{
1228    switch( id )
1229    {
1230        case BT_CHOKE:
1231        case BT_UNCHOKE:
1232        case BT_INTERESTED:
1233        case BT_NOT_INTERESTED:
1234        case BT_FEXT_HAVE_ALL:
1235        case BT_FEXT_HAVE_NONE:
1236            return len == 1;
1237
1238        case BT_HAVE:
1239        case BT_FEXT_SUGGEST:
1240        case BT_FEXT_ALLOWED_FAST:
1241            return len == 5;
1242
1243        case BT_BITFIELD:
1244            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1245
1246        case BT_REQUEST:
1247        case BT_CANCEL:
1248        case BT_FEXT_REJECT:
1249            return len == 13;
1250
1251        case BT_PIECE:
1252            return len > 9 && len <= 16393;
1253
1254        case BT_PORT:
1255            return len == 3;
1256
1257        case BT_LTEP:
1258            return len >= 2;
1259
1260        default:
1261            return FALSE;
1262    }
1263}
1264
1265static int clientGotBlock( tr_peermsgs *               msgs,
1266                           const uint8_t *             block,
1267                           const struct peer_request * req );
1268
1269static int
1270readBtPiece( tr_peermsgs      * msgs,
1271             struct evbuffer  * inbuf,
1272             size_t             inlen,
1273             size_t           * setme_piece_bytes_read )
1274{
1275    struct peer_request * req = &msgs->incoming.blockReq;
1276
1277    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1278    dbgmsg( msgs, "In readBtPiece" );
1279
1280    if( !req->length )
1281    {
1282        if( inlen < 8 )
1283            return READ_LATER;
1284
1285        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1286        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1287        req->length = msgs->incoming.length - 9;
1288        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1289        return READ_NOW;
1290    }
1291    else
1292    {
1293        int err;
1294
1295        /* read in another chunk of data */
1296        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1297        size_t n = MIN( nLeft, inlen );
1298        size_t i = n;
1299
1300        while( i > 0 )
1301        {
1302            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1303            const size_t thisPass = MIN( i, sizeof( buf ) );
1304            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1305            evbuffer_add( msgs->incoming.block, buf, thisPass );
1306            i -= thisPass;
1307        }
1308
1309        fireClientGotData( msgs, n, TRUE );
1310        *setme_piece_bytes_read += n;
1311        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1312               n, req->index, req->offset, req->length,
1313               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1314        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1315            return READ_LATER;
1316
1317        /* we've got the whole block ... process it */
1318        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1319
1320        /* cleanup */
1321        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1322        req->length = 0;
1323        msgs->state = AWAITING_BT_LENGTH;
1324        if( !err )
1325            return READ_NOW;
1326        else {
1327            fireError( msgs, err );
1328            return READ_ERR;
1329        }
1330    }
1331}
1332
1333static int
1334readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1335{
1336    uint32_t      ui32;
1337    uint32_t      msglen = msgs->incoming.length;
1338    const uint8_t id = msgs->incoming.id;
1339    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1340    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1341
1342    --msglen; /* id length */
1343
1344    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1345
1346    if( inlen < msglen )
1347        return READ_LATER;
1348
1349    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1350    {
1351        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1352        fireError( msgs, EMSGSIZE );
1353        return READ_ERR;
1354    }
1355
1356    switch( id )
1357    {
1358        case BT_CHOKE:
1359            dbgmsg( msgs, "got Choke" );
1360            msgs->peer->clientIsChoked = 1;
1361            if( !fext )
1362                cancelAllRequestsToPeer( msgs, FALSE );
1363            break;
1364
1365        case BT_UNCHOKE:
1366            dbgmsg( msgs, "got Unchoke" );
1367            msgs->peer->clientIsChoked = 0;
1368            fireNeedReq( msgs );
1369            break;
1370
1371        case BT_INTERESTED:
1372            dbgmsg( msgs, "got Interested" );
1373            msgs->peer->peerIsInterested = 1;
1374            break;
1375
1376        case BT_NOT_INTERESTED:
1377            dbgmsg( msgs, "got Not Interested" );
1378            msgs->peer->peerIsInterested = 0;
1379            break;
1380
1381        case BT_HAVE:
1382            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1383            dbgmsg( msgs, "got Have: %u", ui32 );
1384            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1385                fireError( msgs, ERANGE );
1386                return READ_ERR;
1387            }
1388            updatePeerProgress( msgs );
1389            tr_rcTransferred( &msgs->torrent->swarmSpeed,
1390                              msgs->torrent->info.pieceSize );
1391            break;
1392
1393        case BT_BITFIELD:
1394        {
1395            dbgmsg( msgs, "got a bitfield" );
1396            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1397            updatePeerProgress( msgs );
1398            fireNeedReq( msgs );
1399            break;
1400        }
1401
1402        case BT_REQUEST:
1403        {
1404            struct peer_request r;
1405            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1406            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1407            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1408            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1409            peerMadeRequest( msgs, &r );
1410            break;
1411        }
1412
1413        case BT_CANCEL:
1414        {
1415            struct peer_request r;
1416            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1417            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1418            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1419            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1420            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1421                protocolSendReject( msgs, &r );
1422            break;
1423        }
1424
1425        case BT_PIECE:
1426            assert( 0 ); /* handled elsewhere! */
1427            break;
1428
1429        case BT_PORT:
1430            dbgmsg( msgs, "Got a BT_PORT" );
1431            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1432            break;
1433
1434        case BT_FEXT_SUGGEST:
1435            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1436            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1437            if( fext )
1438                fireClientGotSuggest( msgs, ui32 );
1439            else {
1440                fireError( msgs, EMSGSIZE );
1441                return READ_ERR;
1442            }
1443            break;
1444
1445        case BT_FEXT_ALLOWED_FAST:
1446            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1447            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1448            if( fext )
1449                fireClientGotAllowedFast( msgs, ui32 );
1450            else {
1451                fireError( msgs, EMSGSIZE );
1452                return READ_ERR;
1453            }
1454            break;
1455
1456        case BT_FEXT_HAVE_ALL:
1457            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1458            if( fext ) {
1459                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1460                updatePeerProgress( msgs );
1461            } else {
1462                fireError( msgs, EMSGSIZE );
1463                return READ_ERR;
1464            }
1465            break;
1466
1467        case BT_FEXT_HAVE_NONE:
1468            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1469            if( fext ) {
1470                tr_bitfieldClear( msgs->peer->have );
1471                updatePeerProgress( msgs );
1472            } else {
1473                fireError( msgs, EMSGSIZE );
1474                return READ_ERR;
1475            }
1476            break;
1477
1478        case BT_FEXT_REJECT:
1479        {
1480            struct peer_request r;
1481            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1482            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1483            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1484            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1485            if( fext )
1486                reqListRemove( &msgs->clientAskedFor, &r );
1487            else {
1488                fireError( msgs, EMSGSIZE );
1489                return READ_ERR;
1490            }
1491            break;
1492        }
1493
1494        case BT_LTEP:
1495            dbgmsg( msgs, "Got a BT_LTEP" );
1496            parseLtep( msgs, msglen, inbuf );
1497            break;
1498
1499        default:
1500            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1501            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1502            break;
1503    }
1504
1505    assert( msglen + 1 == msgs->incoming.length );
1506    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1507
1508    msgs->state = AWAITING_BT_LENGTH;
1509    return READ_NOW;
1510}
1511
1512static TR_INLINE void
1513decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1514{
1515    tr_torrent * tor = msgs->torrent;
1516
1517    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1518}
1519
1520static TR_INLINE void
1521clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1522{
1523    decrementDownloadedCount( msgs, req->length );
1524}
1525
1526static void
1527addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1528{
1529    if( !msgs->peer->blame )
1530         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1531    tr_bitfieldAdd( msgs->peer->blame, index );
1532}
1533
1534/* returns 0 on success, or an errno on failure */
1535static int
1536clientGotBlock( tr_peermsgs *               msgs,
1537                const uint8_t *             data,
1538                const struct peer_request * req )
1539{
1540    int err;
1541    tr_torrent * tor = msgs->torrent;
1542    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1543
1544    assert( msgs );
1545    assert( req );
1546
1547    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1548        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1549                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1550        return EMSGSIZE;
1551    }
1552
1553    /* save the block */
1554    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1555
1556    /**
1557    *** Remove the block from our `we asked for this' list
1558    **/
1559
1560    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1561        clientGotUnwantedBlock( msgs, req );
1562        dbgmsg( msgs, "we didn't ask for this message..." );
1563        return 0;
1564    }
1565
1566    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1567            msgs->clientAskedFor.len );
1568
1569    /**
1570    *** Error checks
1571    **/
1572
1573    if( tr_cpBlockIsComplete( &tor->completion, block ) ) {
1574        dbgmsg( msgs, "we have this block already..." );
1575        clientGotUnwantedBlock( msgs, req );
1576        return 0;
1577    }
1578
1579    /**
1580    ***  Save the block
1581    **/
1582
1583    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1584        return err;
1585
1586    addPeerToBlamefield( msgs, req->index );
1587    fireGotBlock( msgs, req );
1588    return 0;
1589}
1590
1591static int peerPulse( void * vmsgs );
1592
1593static void
1594didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1595{
1596    tr_peermsgs * msgs = vmsgs;
1597    firePeerGotData( msgs, bytesWritten, wasPieceData );
1598
1599    if ( tr_isPeerIo( io ) && io->userData )
1600        peerPulse( msgs );
1601}
1602
1603static ReadState
1604canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1605{
1606    ReadState         ret;
1607    tr_peermsgs *     msgs = vmsgs;
1608    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1609    const size_t      inlen = EVBUFFER_LENGTH( in );
1610
1611    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1612
1613    if( !inlen )
1614    {
1615        ret = READ_LATER;
1616    }
1617    else if( msgs->state == AWAITING_BT_PIECE )
1618    {
1619        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1620    }
1621    else switch( msgs->state )
1622    {
1623        case AWAITING_BT_LENGTH:
1624            ret = readBtLength ( msgs, in, inlen ); break;
1625
1626        case AWAITING_BT_ID:
1627            ret = readBtId     ( msgs, in, inlen ); break;
1628
1629        case AWAITING_BT_MESSAGE:
1630            ret = readBtMessage( msgs, in, inlen ); break;
1631
1632        default:
1633            ret = READ_ERR;
1634            assert( 0 );
1635    }
1636
1637    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1638
1639    /* log the raw data that was read */
1640    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1641        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1642
1643    return ret;
1644}
1645
1646/**
1647***
1648**/
1649
1650static int
1651ratePulse( tr_peermsgs * msgs, uint64_t now )
1652{
1653    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1654    const int seconds = 10;
1655    const int floor = 8;
1656    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1657
1658    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1659
1660    if( msgs->reqq > 0 )
1661        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1662
1663    return TRUE;
1664}
1665
1666static size_t
1667fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1668{
1669    size_t bytesWritten = 0;
1670    struct peer_request req;
1671    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1672    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1673
1674    /**
1675    ***  Protocol messages
1676    **/
1677
1678    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1679    {
1680        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1681        msgs->outMessagesBatchedAt = now;
1682    }
1683    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1684    {
1685        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1686        /* flush the protocol messages */
1687        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1688        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1689        msgs->clientSentAnythingAt = now;
1690        msgs->outMessagesBatchedAt = 0;
1691        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1692        bytesWritten +=  len;
1693    }
1694
1695    /**
1696    ***  Blocks
1697    **/
1698
1699    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1700        && popNextRequest( msgs, &req ) )
1701    {
1702        if( requestIsValid( msgs, &req )
1703            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1704        {
1705            int err;
1706            static uint8_t buf[MAX_BLOCK_SIZE];
1707
1708            /* send a block */
1709            if(( err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ))) {
1710                fireError( msgs, err );
1711                bytesWritten = 0;
1712                msgs = NULL;
1713            } else {
1714                tr_peerIo * io = msgs->peer->io;
1715                struct evbuffer * out = tr_getBuffer( );
1716                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1717                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1718                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1719                tr_peerIoWriteUint32( io, out, req.index );
1720                tr_peerIoWriteUint32( io, out, req.offset );
1721                tr_peerIoWriteBytes ( io, out, buf, req.length );
1722                tr_peerIoWriteBuf( io, out, TRUE );
1723                bytesWritten += EVBUFFER_LENGTH( out );
1724                msgs->clientSentAnythingAt = now;
1725                tr_releaseBuffer( out );
1726            }
1727        }
1728        else if( fext ) /* peer needs a reject message */
1729        {
1730            protocolSendReject( msgs, &req );
1731        }
1732    }
1733
1734    /**
1735    ***  Keepalive
1736    **/
1737
1738    if( ( msgs != NULL )
1739        && ( msgs->clientSentAnythingAt != 0 )
1740        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1741    {
1742        dbgmsg( msgs, "sending a keepalive message" );
1743        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1744        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1745    }
1746
1747    return bytesWritten;
1748}
1749
1750static int
1751peerPulse( void * vmsgs )
1752{
1753    tr_peermsgs * msgs = vmsgs;
1754    const time_t  now = time( NULL );
1755
1756    ratePulse( msgs, now );
1757
1758    pumpRequestQueue( msgs, now );
1759    expireOldRequests( msgs, now );
1760
1761    for( ;; )
1762        if( fillOutputBuffer( msgs, now ) < 1 )
1763            break;
1764
1765    return TRUE; /* loop forever */
1766}
1767
1768void
1769tr_peerMsgsPulse( tr_peermsgs * msgs )
1770{
1771    if( msgs != NULL )
1772        peerPulse( msgs );
1773}
1774
1775static void
1776gotError( tr_peerIo  * io UNUSED,
1777          short        what,
1778          void       * vmsgs )
1779{
1780    if( what & EVBUFFER_TIMEOUT )
1781        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1782    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1783        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1784               what, errno, tr_strerror( errno ) );
1785    fireError( vmsgs, ENOTCONN );
1786}
1787
1788static void
1789sendBitfield( tr_peermsgs * msgs )
1790{
1791    struct evbuffer * out = msgs->outMessages;
1792    tr_bitfield *     field;
1793    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1794    size_t            i;
1795    size_t            lazyCount = 0;
1796
1797    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
1798
1799    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1800    {
1801        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1802            speed over a truly random sample -- let's limit the pool size to
1803            the first 1000 pieces so large torrents don't bog things down */
1804        size_t poolSize;
1805        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1806        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1807
1808        /* build the pool */
1809        for( i=poolSize=0; i<maxPoolSize; ++i )
1810            if( tr_bitfieldHas( field, i ) )
1811                pool[poolSize++] = i;
1812
1813        /* pull random piece indices from the pool */
1814        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1815        {
1816            const int pos = tr_cryptoWeakRandInt( poolSize );
1817            const tr_piece_index_t piece = pool[pos];
1818            tr_bitfieldRem( field, piece );
1819            lazyPieces[lazyCount++] = piece;
1820            pool[pos] = pool[--poolSize];
1821        }
1822
1823        /* cleanup */
1824        tr_free( pool );
1825    }
1826
1827    tr_peerIoWriteUint32( msgs->peer->io, out,
1828                          sizeof( uint8_t ) + field->byteCount );
1829    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1830    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1831    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1832            EVBUFFER_LENGTH( out ) );
1833    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1834
1835    for( i = 0; i < lazyCount; ++i )
1836        protocolSendHave( msgs, lazyPieces[i] );
1837
1838    tr_bitfieldFree( field );
1839}
1840
1841static void
1842tellPeerWhatWeHave( tr_peermsgs * msgs )
1843{
1844    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1845
1846    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
1847    {
1848        protocolSendHaveAll( msgs );
1849    }
1850    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
1851    {
1852        protocolSendHaveNone( msgs );
1853    }
1854    else
1855    {
1856        sendBitfield( msgs );
1857    }
1858}
1859
1860/**
1861***
1862**/
1863
1864/* some peers give us error messages if we send
1865   more than this many peers in a single pex message
1866   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1867#define MAX_PEX_ADDED 50
1868#define MAX_PEX_DROPPED 50
1869
1870typedef struct
1871{
1872    tr_pex *  added;
1873    tr_pex *  dropped;
1874    tr_pex *  elements;
1875    int       addedCount;
1876    int       droppedCount;
1877    int       elementCount;
1878}
1879PexDiffs;
1880
1881static void
1882pexAddedCb( void * vpex,
1883            void * userData )
1884{
1885    PexDiffs * diffs = userData;
1886    tr_pex *   pex = vpex;
1887
1888    if( diffs->addedCount < MAX_PEX_ADDED )
1889    {
1890        diffs->added[diffs->addedCount++] = *pex;
1891        diffs->elements[diffs->elementCount++] = *pex;
1892    }
1893}
1894
1895static TR_INLINE void
1896pexDroppedCb( void * vpex,
1897              void * userData )
1898{
1899    PexDiffs * diffs = userData;
1900    tr_pex *   pex = vpex;
1901
1902    if( diffs->droppedCount < MAX_PEX_DROPPED )
1903    {
1904        diffs->dropped[diffs->droppedCount++] = *pex;
1905    }
1906}
1907
1908static TR_INLINE void
1909pexElementCb( void * vpex,
1910              void * userData )
1911{
1912    PexDiffs * diffs = userData;
1913    tr_pex * pex = vpex;
1914
1915    diffs->elements[diffs->elementCount++] = *pex;
1916}
1917
1918static void
1919sendPex( tr_peermsgs * msgs )
1920{
1921    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1922    {
1923        PexDiffs diffs;
1924        PexDiffs diffs6;
1925        tr_pex * newPex = NULL;
1926        tr_pex * newPex6 = NULL;
1927        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET );
1928        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6 );
1929
1930        /* build the diffs */
1931        diffs.added = tr_new( tr_pex, newCount );
1932        diffs.addedCount = 0;
1933        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1934        diffs.droppedCount = 0;
1935        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1936        diffs.elementCount = 0;
1937        tr_set_compare( msgs->pex, msgs->pexCount,
1938                        newPex, newCount,
1939                        tr_pexCompare, sizeof( tr_pex ),
1940                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1941        diffs6.added = tr_new( tr_pex, newCount6 );
1942        diffs6.addedCount = 0;
1943        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
1944        diffs6.droppedCount = 0;
1945        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
1946        diffs6.elementCount = 0;
1947        tr_set_compare( msgs->pex6, msgs->pexCount6,
1948                        newPex6, newCount6,
1949                        tr_pexCompare, sizeof( tr_pex ),
1950                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
1951        dbgmsg(
1952            msgs,
1953            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1954            msgs->pexCount, newCount + newCount6,
1955            diffs.addedCount + diffs6.addedCount,
1956            diffs.droppedCount + diffs6.droppedCount );
1957
1958        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
1959            !diffs6.droppedCount )
1960        {
1961            tr_free( diffs.elements );
1962            tr_free( diffs6.elements );
1963        }
1964        else
1965        {
1966            int  i;
1967            tr_benc val;
1968            char * benc;
1969            int bencLen;
1970            uint8_t * tmp, *walk;
1971            tr_peerIo       * io  = msgs->peer->io;
1972            struct evbuffer * out = msgs->outMessages;
1973
1974            /* update peer */
1975            tr_free( msgs->pex );
1976            msgs->pex = diffs.elements;
1977            msgs->pexCount = diffs.elementCount;
1978            tr_free( msgs->pex6 );
1979            msgs->pex6 = diffs6.elements;
1980            msgs->pexCount6 = diffs6.elementCount;
1981
1982            /* build the pex payload */
1983            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
1984                                         * speed vs. likelihood? */
1985
1986            /* "added" */
1987            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1988            for( i = 0; i < diffs.addedCount; ++i )
1989            {
1990                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
1991                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1992            }
1993            assert( ( walk - tmp ) == diffs.addedCount * 6 );
1994            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1995            tr_free( tmp );
1996
1997            /* "added.f" */
1998            tmp = walk = tr_new( uint8_t, diffs.addedCount );
1999            for( i = 0; i < diffs.addedCount; ++i )
2000                *walk++ = diffs.added[i].flags;
2001            assert( ( walk - tmp ) == diffs.addedCount );
2002            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2003            tr_free( tmp );
2004
2005            /* "dropped" */
2006            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2007            for( i = 0; i < diffs.droppedCount; ++i )
2008            {
2009                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2010                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2011            }
2012            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2013            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2014            tr_free( tmp );
2015           
2016            /* "added6" */
2017            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2018            for( i = 0; i < diffs6.addedCount; ++i )
2019            {
2020                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2021                walk += 16;
2022                memcpy( walk, &diffs6.added[i].port, 2 );
2023                walk += 2;
2024            }
2025            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2026            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2027            tr_free( tmp );
2028           
2029            /* "added6.f" */
2030            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2031            for( i = 0; i < diffs6.addedCount; ++i )
2032                *walk++ = diffs6.added[i].flags;
2033            assert( ( walk - tmp ) == diffs6.addedCount );
2034            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2035            tr_free( tmp );
2036           
2037            /* "dropped6" */
2038            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2039            for( i = 0; i < diffs6.droppedCount; ++i )
2040            {
2041                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2042                walk += 16;
2043                memcpy( walk, &diffs6.dropped[i].port, 2 );
2044                walk += 2;
2045            }
2046            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2047            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2048            tr_free( tmp );
2049
2050            /* write the pex message */
2051            benc = tr_bencSave( &val, &bencLen );
2052            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2053            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2054            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2055            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2056            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2057            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2058            dbgOutMessageLen( msgs );
2059
2060            tr_free( benc );
2061            tr_bencFree( &val );
2062        }
2063
2064        /* cleanup */
2065        tr_free( diffs.added );
2066        tr_free( diffs.dropped );
2067        tr_free( newPex );
2068        tr_free( diffs6.added );
2069        tr_free( diffs6.dropped );
2070        tr_free( newPex6 );
2071
2072        msgs->clientSentPexAt = time( NULL );
2073    }
2074}
2075
2076static TR_INLINE int
2077pexPulse( void * vpeer )
2078{
2079    sendPex( vpeer );
2080    return TRUE;
2081}
2082
2083/**
2084***
2085**/
2086
2087tr_peermsgs*
2088tr_peerMsgsNew( struct tr_torrent * torrent,
2089                struct tr_peer    * peer,
2090                tr_delivery_func    func,
2091                void              * userData,
2092                tr_publisher_tag  * setme )
2093{
2094    tr_peermsgs * m;
2095
2096    assert( peer );
2097    assert( peer->io );
2098
2099    m = tr_new0( tr_peermsgs, 1 );
2100    m->publisher = TR_PUBLISHER_INIT;
2101    m->peer = peer;
2102    m->session = torrent->session;
2103    m->torrent = torrent;
2104    m->peer->clientIsChoked = 1;
2105    m->peer->peerIsChoked = 1;
2106    m->peer->clientIsInterested = 0;
2107    m->peer->peerIsInterested = 0;
2108    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2109    m->state = AWAITING_BT_LENGTH;
2110    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2111    m->outMessages = evbuffer_new( );
2112    m->outMessagesBatchedAt = 0;
2113    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2114    m->incoming.block = evbuffer_new( );
2115    m->peerAskedFor = REQUEST_LIST_INIT;
2116    m->clientAskedFor = REQUEST_LIST_INIT;
2117    m->clientWillAskFor = REQUEST_LIST_INIT;
2118    peer->msgs = m;
2119
2120    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2121
2122    if( tr_peerIoSupportsLTEP( peer->io ) )
2123        sendLtepHandshake( m );
2124
2125    tellPeerWhatWeHave( m );
2126
2127    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2128    ratePulse( m, tr_date() );
2129
2130    return m;
2131}
2132
2133void
2134tr_peerMsgsFree( tr_peermsgs* msgs )
2135{
2136    if( msgs )
2137    {
2138        tr_timerFree( &msgs->pexTimer );
2139        tr_publisherDestruct( &msgs->publisher );
2140        reqListClear( &msgs->clientWillAskFor );
2141        reqListClear( &msgs->clientAskedFor );
2142        reqListClear( &msgs->peerAskedFor );
2143
2144        evbuffer_free( msgs->incoming.block );
2145        evbuffer_free( msgs->outMessages );
2146        tr_free( msgs->pex6 );
2147        tr_free( msgs->pex );
2148
2149        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2150        tr_free( msgs );
2151    }
2152}
2153
2154void
2155tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2156                        tr_publisher_tag tag )
2157{
2158    tr_publisherUnsubscribe( &peer->publisher, tag );
2159}
2160
Note: See TracBrowser for help on using the repository browser.