source: trunk/libtransmission/peer-msgs.c @ 7576

Last change on this file since 7576 was 7576, checked in by charles, 12 years ago

(trunk libT) add "inline" hint to several one-liner functions

  • Property svn:keywords set to Date Rev Author Id
File size: 63.6 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7576 2009-01-02 06:28:22Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
36#include "ratecontrol.h"
37#include "stats.h"
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    TR_LTEP_PEX             = 1,
70
71
72
73    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
74
75    /* idle seconds before we send a keepalive */
76    KEEPALIVE_INTERVAL_SECS = 100,
77
78    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
79
80
81    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
82
83
84    /* how long an unsent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    QUEUED_REQUEST_TTL_SECS = 20,
87
88    /* how long a sent request can stay queued before it's returned
89       back to the peer-mgr's pool of requests */
90    SENT_REQUEST_TTL_SECS = 240,
91
92    /* used in lowering the outMessages queue period */
93    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
94    HIGH_PRIORITY_INTERVAL_SECS = 2,
95    LOW_PRIORITY_INTERVAL_SECS = 20,
96
97    /* number of pieces to remove from the bitfield when
98     * lazy bitfields are turned on */
99    LAZY_PIECE_COUNT = 26,
100
101    /* number of pieces we'll allow in our fast set */
102    MAX_FAST_SET_SIZE = 3
103};
104
105/**
106***  REQUEST MANAGEMENT
107**/
108
109struct peer_request
110{
111    uint32_t    index;
112    uint32_t    offset;
113    uint32_t    length;
114    time_t      time_requested;
115};
116
117static int
118compareRequest( const void * va, const void * vb )
119{
120    const struct peer_request * a = va;
121    const struct peer_request * b = vb;
122
123    if( a->index != b->index )
124        return a->index < b->index ? -1 : 1;
125
126    if( a->offset != b->offset )
127        return a->offset < b->offset ? -1 : 1;
128
129    if( a->length != b->length )
130        return a->length < b->length ? -1 : 1;
131
132    return 0;
133}
134
135struct request_list
136{
137    uint16_t               count;
138    uint16_t               max;
139    struct peer_request *  requests;
140};
141
142static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
143
144static void
145reqListReserve( struct request_list * list,
146                uint16_t              max )
147{
148    if( list->max < max )
149    {
150        list->max = max;
151        list->requests = tr_renew( struct peer_request,
152                                   list->requests,
153                                   list->max );
154    }
155}
156
157static void
158reqListClear( struct request_list * list )
159{
160    tr_free( list->requests );
161    *list = REQUEST_LIST_INIT;
162}
163
164static void
165reqListCopy( struct request_list * dest, const struct request_list * src )
166{
167    dest->count = dest->max = src->count;
168    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
169}
170
171static void
172reqListRemoveOne( struct request_list * list,
173                  int                   i )
174{
175    assert( 0 <= i && i < list->count );
176
177    memmove( &list->requests[i],
178             &list->requests[i + 1],
179             sizeof( struct peer_request ) * ( --list->count - i ) );
180}
181
182static void
183reqListAppend( struct request_list *       list,
184               const struct peer_request * req )
185{
186    if( ++list->count >= list->max )
187        reqListReserve( list, list->max + 8 );
188
189    list->requests[list->count - 1] = *req;
190}
191
192static int
193reqListPop( struct request_list * list,
194            struct peer_request * setme )
195{
196    int success;
197
198    if( !list->count )
199        success = FALSE;
200    else {
201        *setme = list->requests[0];
202        reqListRemoveOne( list, 0 );
203        success = TRUE;
204    }
205
206    return success;
207}
208
209static int
210reqListFind( struct request_list *       list,
211             const struct peer_request * key )
212{
213    uint16_t i;
214
215    for( i = 0; i < list->count; ++i )
216        if( !compareRequest( key, list->requests + i ) )
217            return i;
218
219    return -1;
220}
221
222static int
223reqListRemove( struct request_list *       list,
224               const struct peer_request * key )
225{
226    int success;
227    const int i = reqListFind( list, key );
228
229    if( i < 0 )
230        success = FALSE;
231    else {
232        reqListRemoveOne( list, i );
233        success = TRUE;
234    }
235
236    return success;
237}
238
239/**
240***
241**/
242
243/* this is raw, unchanged data from the peer regarding
244 * the current message that it's sending us. */
245struct tr_incoming
246{
247    uint32_t          length; /* includes the +1 for id length */
248    struct evbuffer * block; /* piece data for incoming blocks */
249};
250
251/**
252 * Low-level communication state information about a connected peer.
253 *
254 * This structure remembers the low-level protocol states that we're
255 * in with this peer, such as active requests, pex messages, and so on.
256 * Its fields are all private to peer-msgs.c.
257 *
258 * Data not directly involved with sending & receiving messages is
259 * stored in tr_peer, where it can be accessed by both peermsgs and
260 * the peer manager.
261 *
262 * @see struct peer_atom
263 * @see tr_peer
264 */
265struct tr_peermsgs
266{
267    tr_bool         peerSupportsPex;
268    tr_bool         clientSentLtepHandshake;
269    tr_bool         peerSentLtepHandshake;
270    tr_bool         haveFastSet;
271
272    uint8_t         ut_pex_id;
273    uint16_t        pexCount;
274    uint16_t        pexCount6;
275    uint16_t        maxActiveRequests;
276
277    size_t                 fastsetSize;
278    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
279
280    /* how long the outMessages batch should be allowed to grow before
281     * it's flushed -- some messages (like requests >:) should be sent
282     * very quickly; others aren't as urgent. */
283    int                    outMessagesBatchPeriod;
284
285    tr_peer *              peer;
286
287    tr_session *           session;
288    tr_torrent *           torrent;
289
290    tr_publisher           publisher;
291
292    struct evbuffer *      outMessages; /* all the non-piece messages */
293
294    struct request_list    peerAskedFor;
295    struct request_list    clientAskedFor;
296    struct request_list    clientWillAskFor;
297
298    tr_timer             * pexTimer;
299    tr_pex               * pex;
300    tr_pex               * pex6;
301
302    time_t                 clientSentPexAt;
303    time_t                 clientSentAnythingAt;
304
305    /* when we started batching the outMessages */
306    time_t                outMessagesBatchedAt;
307
308    struct tr_incoming    incoming;
309
310    /* if the peer supports the Extension Protocol in BEP 10 and
311       supplied a reqq argument, it's stored here.  otherwise the
312       value is zero and should be ignored. */
313    int64_t               reqq;
314};
315
316/**
317***
318**/
319
320static void
321myDebug( const char * file, int line,
322         const struct tr_peermsgs * msgs,
323         const char * fmt, ... )
324{
325    FILE * fp = tr_getLog( );
326
327    if( fp )
328    {
329        va_list           args;
330        char              timestr[64];
331        struct evbuffer * buf = tr_getBuffer( );
332        char *            base = tr_basename( file );
333
334        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
335                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
336                             msgs->torrent->info.name,
337                             tr_peerIoGetAddrStr( msgs->peer->io ),
338                             msgs->peer->client );
339        va_start( args, fmt );
340        evbuffer_add_vprintf( buf, fmt, args );
341        va_end( args );
342        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
343        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
344
345        tr_free( base );
346        tr_releaseBuffer( buf );
347    }
348}
349
350#define dbgmsg( msgs, ... ) \
351    do { \
352        if( tr_deepLoggingIsActive( ) ) \
353            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
354    } while( 0 )
355
356/**
357***
358**/
359
360static void
361pokeBatchPeriod( tr_peermsgs * msgs,
362                 int           interval )
363{
364    if( msgs->outMessagesBatchPeriod > interval )
365    {
366        msgs->outMessagesBatchPeriod = interval;
367        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
368    }
369}
370
371static inline void
372dbgOutMessageLen( tr_peermsgs * msgs )
373{
374    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
375}
376
377static void
378protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
379{
380    tr_peerIo       * io  = msgs->peer->io;
381    struct evbuffer * out = msgs->outMessages;
382
383    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
384
385    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
386    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
387    tr_peerIoWriteUint32( io, out, req->index );
388    tr_peerIoWriteUint32( io, out, req->offset );
389    tr_peerIoWriteUint32( io, out, req->length );
390
391    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
392    dbgOutMessageLen( msgs );
393}
394
395static void
396protocolSendRequest( tr_peermsgs               * msgs,
397                     const struct peer_request * req )
398{
399    tr_peerIo       * io  = msgs->peer->io;
400    struct evbuffer * out = msgs->outMessages;
401
402    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
403    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
404    tr_peerIoWriteUint32( io, out, req->index );
405    tr_peerIoWriteUint32( io, out, req->offset );
406    tr_peerIoWriteUint32( io, out, req->length );
407
408    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
409    dbgOutMessageLen( msgs );
410    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
411}
412
413static void
414protocolSendCancel( tr_peermsgs               * msgs,
415                    const struct peer_request * req )
416{
417    tr_peerIo       * io  = msgs->peer->io;
418    struct evbuffer * out = msgs->outMessages;
419
420    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
421    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
422    tr_peerIoWriteUint32( io, out, req->index );
423    tr_peerIoWriteUint32( io, out, req->offset );
424    tr_peerIoWriteUint32( io, out, req->length );
425
426    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
427    dbgOutMessageLen( msgs );
428    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
429}
430
431static void
432protocolSendHave( tr_peermsgs * msgs,
433                  uint32_t      index )
434{
435    tr_peerIo       * io  = msgs->peer->io;
436    struct evbuffer * out = msgs->outMessages;
437
438    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
439    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
440    tr_peerIoWriteUint32( io, out, index );
441
442    dbgmsg( msgs, "sending Have %u", index );
443    dbgOutMessageLen( msgs );
444    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
445}
446
447#if 0
448static void
449protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
450{
451    tr_peerIo       * io  = msgs->peer->io;
452    struct evbuffer * out = msgs->outMessages;
453
454    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
455
456    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
457    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
458    tr_peerIoWriteUint32( io, out, pieceIndex );
459
460    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
461    dbgOutMessageLen( msgs );
462}
463#endif
464
465static void
466protocolSendChoke( tr_peermsgs * msgs,
467                   int           choke )
468{
469    tr_peerIo       * io  = msgs->peer->io;
470    struct evbuffer * out = msgs->outMessages;
471
472    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
473    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
474
475    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
476    dbgOutMessageLen( msgs );
477    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
478}
479
480static void
481protocolSendHaveAll( tr_peermsgs * msgs )
482{
483    tr_peerIo       * io  = msgs->peer->io;
484    struct evbuffer * out = msgs->outMessages;
485
486    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
487
488    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
489    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
490
491    dbgmsg( msgs, "sending HAVE_ALL..." );
492    dbgOutMessageLen( msgs );
493    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
494}
495
496static void
497protocolSendHaveNone( tr_peermsgs * msgs )
498{
499    tr_peerIo       * io  = msgs->peer->io;
500    struct evbuffer * out = msgs->outMessages;
501
502    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
503
504    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
505    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
506
507    dbgmsg( msgs, "sending HAVE_NONE..." );
508    dbgOutMessageLen( msgs );
509    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
510}
511
512/**
513***  EVENTS
514**/
515
516static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
517
518static void
519publish( tr_peermsgs * msgs, tr_peer_event * e )
520{
521    assert( msgs->peer );
522    assert( msgs->peer->msgs == msgs );
523
524    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
525}
526
527static void
528fireError( tr_peermsgs * msgs, int err )
529{
530    tr_peer_event e = blankEvent;
531    e.eventType = TR_PEER_ERROR;
532    e.err = err;
533    publish( msgs, &e );
534}
535
536static void
537fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
538{
539    tr_peer_event e = blankEvent;
540    e.eventType = TR_PEER_UPLOAD_ONLY;
541    e.uploadOnly = uploadOnly;
542    publish( msgs, &e );
543}
544
545static void
546fireNeedReq( tr_peermsgs * msgs )
547{
548    tr_peer_event e = blankEvent;
549    e.eventType = TR_PEER_NEED_REQ;
550    publish( msgs, &e );
551}
552
553static void
554firePeerProgress( tr_peermsgs * msgs )
555{
556    tr_peer_event e = blankEvent;
557    e.eventType = TR_PEER_PEER_PROGRESS;
558    e.progress = msgs->peer->progress;
559    publish( msgs, &e );
560}
561
562static void
563fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
564{
565    tr_peer_event e = blankEvent;
566    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
567    e.pieceIndex = req->index;
568    e.offset = req->offset;
569    e.length = req->length;
570    publish( msgs, &e );
571}
572
573static void
574fireClientGotData( tr_peermsgs * msgs,
575                   uint32_t      length,
576                   int           wasPieceData )
577{
578    tr_peer_event e = blankEvent;
579
580    e.length = length;
581    e.eventType = TR_PEER_CLIENT_GOT_DATA;
582    e.wasPieceData = wasPieceData;
583    publish( msgs, &e );
584}
585
586static void
587fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
588{
589    tr_peer_event e = blankEvent;
590    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
591    e.pieceIndex = pieceIndex;
592    publish( msgs, &e );
593}
594
595static void
596fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
597{
598    tr_peer_event e = blankEvent;
599    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
600    e.pieceIndex = pieceIndex;
601    publish( msgs, &e );
602}
603
604static void
605firePeerGotData( tr_peermsgs  * msgs,
606                 uint32_t       length,
607                 int            wasPieceData )
608{
609    tr_peer_event e = blankEvent;
610
611    e.length = length;
612    e.eventType = TR_PEER_PEER_GOT_DATA;
613    e.wasPieceData = wasPieceData;
614
615    publish( msgs, &e );
616}
617
618static void
619fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
620{
621    tr_peer_event e = blankEvent;
622    e.eventType = TR_PEER_CANCEL;
623    e.pieceIndex = req->index;
624    e.offset = req->offset;
625    e.length = req->length;
626    publish( msgs, &e );
627}
628
629/**
630***  ALLOWED FAST SET
631***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
632**/
633
634size_t
635tr_generateAllowedSet( tr_piece_index_t * setmePieces,
636                       size_t             desiredSetSize,
637                       size_t             pieceCount,
638                       const uint8_t    * infohash,
639                       const tr_address * addr )
640{
641    size_t setSize = 0;
642
643    assert( setmePieces );
644    assert( desiredSetSize <= pieceCount );
645    assert( desiredSetSize );
646    assert( pieceCount );
647    assert( infohash );
648    assert( addr );
649
650    if( addr->type == TR_AF_INET )
651    {
652        uint8_t w[SHA_DIGEST_LENGTH + 4];
653        uint8_t x[SHA_DIGEST_LENGTH];
654
655        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
656        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
657        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
658
659        while( setSize<desiredSetSize )
660        {
661            int i;
662            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
663            {
664                size_t k;
665                uint32_t j = i * 4;                                  /* (5) */
666                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
667                uint32_t index = y % pieceCount;                     /* (7) */
668
669                for( k=0; k<setSize; ++k )                           /* (8) */
670                    if( setmePieces[k] == index )
671                        break;
672
673                if( k == setSize )
674                    setmePieces[setSize++] = index;                  /* (9) */
675            }
676
677            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
678        }
679    }
680
681    return setSize;
682}
683
684static void
685updateFastSet( tr_peermsgs * msgs UNUSED )
686{
687#if 0
688    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
689    const int peerIsNeedy = msgs->peer->progress < 0.10;
690
691    if( fext && peerIsNeedy && !msgs->haveFastSet )
692    {
693        size_t i;
694        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
695        const tr_info * inf = &msgs->torrent->info;
696        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
697
698        /* build the fast set */
699        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
700        msgs->haveFastSet = 1;
701
702        /* send it to the peer */
703        for( i=0; i<msgs->fastsetSize; ++i )
704            protocolSendAllowedFast( msgs, msgs->fastset[i] );
705    }
706#endif
707}
708
709/**
710***  INTEREST
711**/
712
713static int
714isPieceInteresting( const tr_peermsgs * msgs,
715                    tr_piece_index_t    piece )
716{
717    const tr_torrent * torrent = msgs->torrent;
718
719    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
720          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
721          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
722}
723
724/* "interested" means we'll ask for piece data if they unchoke us */
725static int
726isPeerInteresting( const tr_peermsgs * msgs )
727{
728    tr_piece_index_t    i;
729    const tr_torrent *  torrent;
730    const tr_bitfield * bitfield;
731    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
732
733    if( clientIsSeed )
734        return FALSE;
735
736    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
737        return FALSE;
738
739    torrent = msgs->torrent;
740    bitfield = tr_cpPieceBitfield( torrent->completion );
741
742    if( !msgs->peer->have )
743        return TRUE;
744
745    assert( bitfield->byteCount == msgs->peer->have->byteCount );
746
747    for( i = 0; i < torrent->info.pieceCount; ++i )
748        if( isPieceInteresting( msgs, i ) )
749            return TRUE;
750
751    return FALSE;
752}
753
754static void
755sendInterest( tr_peermsgs * msgs,
756              int           weAreInterested )
757{
758    struct evbuffer * out = msgs->outMessages;
759
760    assert( msgs );
761    assert( weAreInterested == 0 || weAreInterested == 1 );
762
763    msgs->peer->clientIsInterested = weAreInterested;
764    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
765    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
766    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
767
768    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
769    dbgOutMessageLen( msgs );
770}
771
772static void
773updateInterest( tr_peermsgs * msgs )
774{
775    const int i = isPeerInteresting( msgs );
776
777    if( i != msgs->peer->clientIsInterested )
778        sendInterest( msgs, i );
779    if( i )
780        fireNeedReq( msgs );
781}
782
783static int
784popNextRequest( tr_peermsgs *         msgs,
785                struct peer_request * setme )
786{
787    return reqListPop( &msgs->peerAskedFor, setme );
788}
789
790static void
791cancelAllRequestsToClient( tr_peermsgs * msgs )
792{
793    struct peer_request req;
794    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
795
796    while( popNextRequest( msgs, &req ) )
797        if( mustSendCancel )
798            protocolSendReject( msgs, &req );
799}
800
801void
802tr_peerMsgsSetChoke( tr_peermsgs * msgs,
803                     int           choke )
804{
805    const time_t now = time( NULL );
806    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
807
808    assert( msgs );
809    assert( msgs->peer );
810    assert( choke == 0 || choke == 1 );
811
812    if( msgs->peer->chokeChangedAt > fibrillationTime )
813    {
814        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
815    }
816    else if( msgs->peer->peerIsChoked != choke )
817    {
818        msgs->peer->peerIsChoked = choke;
819        if( choke )
820            cancelAllRequestsToClient( msgs );
821        protocolSendChoke( msgs, choke );
822        msgs->peer->chokeChangedAt = now;
823    }
824}
825
826/**
827***
828**/
829
830void
831tr_peerMsgsHave( tr_peermsgs * msgs,
832                 uint32_t      index )
833{
834    protocolSendHave( msgs, index );
835
836    /* since we have more pieces now, we might not be interested in this peer */
837    updateInterest( msgs );
838}
839
840/**
841***
842**/
843
844static int
845reqIsValid( const tr_peermsgs * peer,
846            uint32_t            index,
847            uint32_t            offset,
848            uint32_t            length )
849{
850    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
851}
852
853static int
854requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
855{
856    return reqIsValid( msgs, req->index, req->offset, req->length );
857}
858
859static void
860expireFromList( tr_peermsgs          * msgs,
861                struct request_list  * list,
862                const time_t           oldestAllowed )
863{
864    int i;
865
866    /* walk through the list, looking for the first req that's too old */
867    for( i=0; i<list->count; ++i ) {
868        const struct peer_request * req = &list->requests[i];
869        if( req->time_requested < oldestAllowed )
870            break;
871    }
872
873    /* if we found one too old, start pruning them */
874    if( i < list->count ) {
875        struct request_list tmp = REQUEST_LIST_INIT;
876        reqListCopy( &tmp, list );
877        for( ; i<tmp.count; ++i ) {
878            const struct peer_request * req = &tmp.requests[i];
879            if( req->time_requested < oldestAllowed )
880                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
881        }
882        reqListClear( &tmp );
883    }
884}
885
886static void
887expireOldRequests( tr_peermsgs * msgs, const time_t now  )
888{
889    time_t oldestAllowed;
890    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
891    dbgmsg( msgs, "entering `expire old requests' block" );
892
893    /* cancel requests that have been queued for too long */
894    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
895    expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed );
896
897    /* if the peer doesn't support "Reject Request",
898     * cancel requests that were sent too long ago. */
899    if( !fext ) {
900        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
901        expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed );
902    }
903
904    dbgmsg( msgs, "leaving `expire old requests' block" );
905}
906
907static void
908pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
909{
910    const int           max = msgs->maxActiveRequests;
911    int                 sent = 0;
912    int                 count = msgs->clientAskedFor.count;
913    struct peer_request req;
914
915    if( msgs->peer->clientIsChoked )
916        return;
917    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
918        return;
919
920    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
921    {
922        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
923
924        assert( requestIsValid( msgs, &req ) );
925        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
926
927        /* don't ask for it if we've already got it... this block may have
928         * come in from a different peer after we cancelled a request for it */
929        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
930        {
931            protocolSendRequest( msgs, &req );
932            req.time_requested = now;
933            reqListAppend( &msgs->clientAskedFor, &req );
934
935            ++count;
936            ++sent;
937        }
938    }
939
940    if( sent )
941        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
942                sent, msgs->clientAskedFor.count, msgs->clientWillAskFor.count );
943
944    if( count < max )
945        fireNeedReq( msgs );
946}
947
948static inline int
949requestQueueIsFull( const tr_peermsgs * msgs )
950{
951    const int req_max = msgs->maxActiveRequests;
952    return msgs->clientWillAskFor.count >= req_max;
953}
954
955tr_addreq_t
956tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
957                       uint32_t         index,
958                       uint32_t         offset,
959                       uint32_t         length )
960{
961    struct peer_request req;
962
963    assert( msgs );
964    assert( msgs->torrent );
965    assert( reqIsValid( msgs, index, offset, length ) );
966
967    /**
968    ***  Reasons to decline the request
969    **/
970
971    /* don't send requests to choked clients */
972    if( msgs->peer->clientIsChoked ) {
973        dbgmsg( msgs, "declining request because they're choking us" );
974        return TR_ADDREQ_CLIENT_CHOKED;
975    }
976
977    /* peer doesn't have this piece */
978    if( !tr_bitfieldHas( msgs->peer->have, index ) )
979        return TR_ADDREQ_MISSING;
980
981    /* peer's queue is full */
982    if( requestQueueIsFull( msgs ) ) {
983        dbgmsg( msgs, "declining request because we're full" );
984        return TR_ADDREQ_FULL;
985    }
986
987    /* have we already asked for this piece? */
988    req.index = index;
989    req.offset = offset;
990    req.length = length;
991    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
992        dbgmsg( msgs, "declining because it's a duplicate" );
993        return TR_ADDREQ_DUPLICATE;
994    }
995    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
996        dbgmsg( msgs, "declining because it's a duplicate" );
997        return TR_ADDREQ_DUPLICATE;
998    }
999
1000    /**
1001    ***  Accept this request
1002    **/
1003
1004    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1005            index, offset, length );
1006    req.time_requested = time( NULL );
1007    reqListAppend( &msgs->clientWillAskFor, &req );
1008    return TR_ADDREQ_OK;
1009}
1010
1011static void
1012cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1013{
1014    int i;
1015    struct request_list a = msgs->clientWillAskFor;
1016    struct request_list b = msgs->clientAskedFor;
1017    dbgmsg( msgs, "cancelling all requests to peer" );
1018
1019    msgs->clientAskedFor = REQUEST_LIST_INIT;
1020    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1021
1022    for( i=0; i<a.count; ++i )
1023        fireCancelledReq( msgs, &a.requests[i] );
1024
1025    for( i = 0; i < b.count; ++i ) {
1026        fireCancelledReq( msgs, &b.requests[i] );
1027        if( sendCancel )
1028            protocolSendCancel( msgs, &b.requests[i] );
1029    }
1030
1031    reqListClear( &a );
1032    reqListClear( &b );
1033}
1034
1035void
1036tr_peerMsgsCancel( tr_peermsgs * msgs,
1037                   uint32_t      pieceIndex,
1038                   uint32_t      offset,
1039                   uint32_t      length )
1040{
1041    struct peer_request req;
1042
1043    assert( msgs != NULL );
1044    assert( length > 0 );
1045
1046
1047    /* have we asked the peer for this piece? */
1048    req.index = pieceIndex;
1049    req.offset = offset;
1050    req.length = length;
1051
1052    /* if it's only in the queue and hasn't been sent yet, free it */
1053    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1054        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1055        fireCancelledReq( msgs, &req );
1056    }
1057
1058    /* if it's already been sent, send a cancel message too */
1059    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1060        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1061        protocolSendCancel( msgs, &req );
1062        fireCancelledReq( msgs, &req );
1063    }
1064}
1065
1066
1067/**
1068***
1069**/
1070
1071static void
1072sendLtepHandshake( tr_peermsgs * msgs )
1073{
1074    tr_benc val, *m;
1075    char * buf;
1076    int len;
1077    int pex;
1078    struct evbuffer * out = msgs->outMessages;
1079
1080    if( msgs->clientSentLtepHandshake )
1081        return;
1082
1083    dbgmsg( msgs, "sending an ltep handshake" );
1084    msgs->clientSentLtepHandshake = 1;
1085
1086    /* decide if we want to advertise pex support */
1087    if( !tr_torrentAllowsPex( msgs->torrent ) )
1088        pex = 0;
1089    else if( msgs->peerSentLtepHandshake )
1090        pex = msgs->peerSupportsPex ? 1 : 0;
1091    else
1092        pex = 1;
1093
1094    tr_bencInitDict( &val, 5 );
1095    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1096    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1097    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1098    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1099    m  = tr_bencDictAddDict( &val, "m", 1 );
1100    if( pex )
1101        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1102    buf = tr_bencSave( &val, &len );
1103
1104    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1105    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1106    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1107    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1108    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1109    dbgOutMessageLen( msgs );
1110
1111    /* cleanup */
1112    tr_bencFree( &val );
1113    tr_free( buf );
1114}
1115
1116static void
1117parseLtepHandshake( tr_peermsgs *     msgs,
1118                    int               len,
1119                    struct evbuffer * inbuf )
1120{
1121    int64_t   i;
1122    tr_benc   val, * sub;
1123    uint8_t * tmp = tr_new( uint8_t, len );
1124
1125    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1126    msgs->peerSentLtepHandshake = 1;
1127
1128    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1129    {
1130        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1131        tr_free( tmp );
1132        return;
1133    }
1134
1135    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1136
1137    /* does the peer prefer encrypted connections? */
1138    if( tr_bencDictFindInt( &val, "e", &i ) )
1139        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1140                                              : ENCRYPTION_PREFERENCE_NO;
1141
1142    /* check supported messages for utorrent pex */
1143    msgs->peerSupportsPex = 0;
1144    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1145        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1146            msgs->ut_pex_id = (uint8_t) i;
1147            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1148            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1149        }
1150    }
1151
1152    /* look for upload_only (BEP 21) */
1153    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1154        fireUploadOnly( msgs, i!=0 );
1155
1156    /* get peer's listening port */
1157    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1158        msgs->peer->port = htons( (uint16_t)i );
1159        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1160    }
1161
1162    /* get peer's maximum request queue size */
1163    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1164        msgs->reqq = i;
1165
1166    tr_bencFree( &val );
1167    tr_free( tmp );
1168}
1169
1170static void
1171parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1172{
1173    int loaded = 0;
1174    uint8_t * tmp = tr_new( uint8_t, msglen );
1175    tr_benc val;
1176    const tr_torrent * tor = msgs->torrent;
1177    const uint8_t * added;
1178    size_t added_len;
1179
1180    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1181
1182    if( tr_torrentAllowsPex( tor )
1183      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1184    {
1185        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1186        {
1187            const uint8_t * added_f = NULL;
1188            tr_pex *        pex;
1189            size_t          i, n;
1190            size_t          added_f_len = 0;
1191            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1192            pex =
1193                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1194                                        &n );
1195            for( i = 0; i < n; ++i )
1196                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1197                                  TR_PEER_FROM_PEX, pex + i );
1198            tr_free( pex );
1199        }
1200       
1201        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1202        {
1203            const uint8_t * added_f = NULL;
1204            tr_pex *        pex;
1205            size_t          i, n;
1206            size_t          added_f_len = 0;
1207            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1208            pex =
1209                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1210                                         &n );
1211            for( i = 0; i < n; ++i )
1212                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1213                                  TR_PEER_FROM_PEX, pex + i );
1214            tr_free( pex );
1215        }
1216       
1217    }
1218
1219    if( loaded )
1220        tr_bencFree( &val );
1221    tr_free( tmp );
1222}
1223
1224static void sendPex( tr_peermsgs * msgs );
1225
1226static void
1227parseLtep( tr_peermsgs *     msgs,
1228           int               msglen,
1229           struct evbuffer * inbuf )
1230{
1231    uint8_t ltep_msgid;
1232
1233    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1234    msglen--;
1235
1236    if( ltep_msgid == LTEP_HANDSHAKE )
1237    {
1238        dbgmsg( msgs, "got ltep handshake" );
1239        parseLtepHandshake( msgs, msglen, inbuf );
1240        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1241        {
1242            sendLtepHandshake( msgs );
1243            sendPex( msgs );
1244        }
1245    }
1246    else if( ltep_msgid == TR_LTEP_PEX )
1247    {
1248        dbgmsg( msgs, "got ut pex" );
1249        msgs->peerSupportsPex = 1;
1250        parseUtPex( msgs, msglen, inbuf );
1251    }
1252    else
1253    {
1254        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1255        evbuffer_drain( inbuf, msglen );
1256    }
1257}
1258
1259static int
1260readBtLength( tr_peermsgs *     msgs,
1261              struct evbuffer * inbuf,
1262              size_t            inlen )
1263{
1264    uint32_t len;
1265
1266    if( inlen < sizeof( len ) )
1267        return READ_LATER;
1268
1269    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1270    msgs->incoming.length = len;
1271
1272    if( len == 0 ) /* peer sent us a keepalive message */
1273        dbgmsg( msgs, "got KeepAlive" );
1274
1275    return READ_NOW;
1276}
1277
1278static void
1279updatePeerProgress( tr_peermsgs * msgs )
1280{
1281    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1282    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1283    updateFastSet( msgs );
1284    updateInterest( msgs );
1285    firePeerProgress( msgs );
1286}
1287
1288static void
1289peerMadeRequest( tr_peermsgs *               msgs,
1290                 const struct peer_request * req )
1291{
1292    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1293    const int reqIsValid = requestIsValid( msgs, req );
1294    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1295    const int peerIsChoked = msgs->peer->peerIsChoked;
1296
1297    int allow = FALSE;
1298
1299    if( !reqIsValid )
1300        dbgmsg( msgs, "rejecting an invalid request." );
1301    else if( !clientHasPiece )
1302        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1303    else if( peerIsChoked )
1304        dbgmsg( msgs, "rejecting request from choked peer" );
1305    else
1306        allow = TRUE;
1307
1308    if( allow )
1309        reqListAppend( &msgs->peerAskedFor, req );
1310    else if( fext )
1311        protocolSendReject( msgs, req );
1312}
1313
1314static int
1315messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1316{
1317    switch( id )
1318    {
1319        case BT_CHOKE:
1320        case BT_UNCHOKE:
1321        case BT_INTERESTED:
1322        case BT_NOT_INTERESTED:
1323        case BT_FEXT_HAVE_ALL:
1324        case BT_FEXT_HAVE_NONE:
1325            return len == 1;
1326
1327        case BT_HAVE:
1328        case BT_FEXT_SUGGEST:
1329        case BT_FEXT_ALLOWED_FAST:
1330            return len == 5;
1331
1332        case BT_BITFIELD:
1333            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1334
1335        case BT_REQUEST:
1336        case BT_CANCEL:
1337        case BT_FEXT_REJECT:
1338            return len == 13;
1339
1340        case BT_PIECE:
1341            return len > 9 && len <= 16393;
1342
1343        case BT_PORT:
1344            return len == 3;
1345
1346        case BT_LTEP:
1347            return len >= 2;
1348
1349        default:
1350            return FALSE;
1351    }
1352}
1353
1354static int clientGotBlock( tr_peermsgs *               msgs,
1355                           const uint8_t *             block,
1356                           const struct peer_request * req );
1357
1358static int
1359readBtPiece( tr_peermsgs      * msgs,
1360             struct evbuffer  * inbuf,
1361             size_t             inlen,
1362             size_t           * setme_piece_bytes_read )
1363{
1364    struct peer_request req;
1365
1366    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1367    dbgmsg( msgs, "In readBtPiece" );
1368
1369    tr_peerIoReadUint32( msgs->peer->io, inbuf, &req.index );
1370    tr_peerIoReadUint32( msgs->peer->io, inbuf, &req.offset );
1371    req.length = msgs->incoming.length - 9;
1372    dbgmsg( msgs, "got incoming block header %u:%u->%u", req.index, req.offset, req.length );
1373
1374    { 
1375        int err;
1376
1377        /* decrypt the whole block in one go */
1378        evbuffer_expand( msgs->incoming.block, req.length );
1379        tr_peerIoReadBytes( msgs->peer->io, inbuf, EVBUFFER_DATA( msgs->incoming.block ), req.length );
1380        EVBUFFER_LENGTH( msgs->incoming.block ) += req.length;
1381
1382        fireClientGotData( msgs, req.length, TRUE );
1383        *setme_piece_bytes_read += req.length;
1384        dbgmsg( msgs, "got block %u:%u->%u", req.index, req.offset, req.length );
1385        assert( EVBUFFER_LENGTH( msgs->incoming.block ) == req.length );
1386
1387        /* we've got the whole block ... process it */
1388        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), &req );
1389
1390        /* cleanup */
1391        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1392        if( !err )
1393            return READ_NOW;
1394        else {
1395            fireError( msgs, err );
1396            return READ_ERR;
1397        }
1398    }
1399}
1400
1401static int
1402readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen, size_t * piece )
1403{
1404    int           ret = READ_NOW;
1405    uint8_t       id;
1406    uint32_t      ui32;
1407    uint32_t      msglen = msgs->incoming.length;
1408    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1409    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1410
1411    if( inlen < msglen )
1412        return READ_LATER;
1413
1414    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1415
1416    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1417
1418    if( !messageLengthIsCorrect( msgs, id, msglen ) )
1419    {
1420        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1421        fireError( msgs, EMSGSIZE );
1422        return READ_ERR;
1423    }
1424
1425    --msglen;
1426
1427    switch( id )
1428    {
1429        case BT_CHOKE:
1430            dbgmsg( msgs, "got Choke" );
1431            msgs->peer->clientIsChoked = 1;
1432            if( !fext )
1433                cancelAllRequestsToPeer( msgs, FALSE );
1434            break;
1435
1436        case BT_UNCHOKE:
1437            dbgmsg( msgs, "got Unchoke" );
1438            msgs->peer->clientIsChoked = 0;
1439            fireNeedReq( msgs );
1440            break;
1441
1442        case BT_INTERESTED:
1443            dbgmsg( msgs, "got Interested" );
1444            msgs->peer->peerIsInterested = 1;
1445            break;
1446
1447        case BT_NOT_INTERESTED:
1448            dbgmsg( msgs, "got Not Interested" );
1449            msgs->peer->peerIsInterested = 0;
1450            break;
1451
1452        case BT_HAVE:
1453            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1454            dbgmsg( msgs, "got Have: %u", ui32 );
1455            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1456                fireError( msgs, ERANGE );
1457                return READ_ERR;
1458            }
1459            updatePeerProgress( msgs );
1460            tr_rcTransferred( msgs->torrent->swarmSpeed,
1461                              msgs->torrent->info.pieceSize );
1462            break;
1463
1464        case BT_BITFIELD:
1465        {
1466            dbgmsg( msgs, "got a bitfield" );
1467            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1468            updatePeerProgress( msgs );
1469            fireNeedReq( msgs );
1470            break;
1471        }
1472
1473        case BT_REQUEST:
1474        {
1475            struct peer_request r;
1476            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1477            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1478            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1479            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1480            peerMadeRequest( msgs, &r );
1481            break;
1482        }
1483
1484        case BT_CANCEL:
1485        {
1486            struct peer_request r;
1487            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1488            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1489            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1490            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1491            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1492                protocolSendReject( msgs, &r );
1493            break;
1494        }
1495
1496        case BT_PIECE:
1497            ret = readBtPiece( msgs, inbuf, msglen, piece );
1498            break;
1499
1500        case BT_PORT:
1501            dbgmsg( msgs, "Got a BT_PORT" );
1502            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1503            break;
1504
1505        case BT_FEXT_SUGGEST:
1506            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1507            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1508            if( fext )
1509                fireClientGotSuggest( msgs, ui32 );
1510            else {
1511                fireError( msgs, EMSGSIZE );
1512                return READ_ERR;
1513            }
1514            break;
1515
1516        case BT_FEXT_ALLOWED_FAST:
1517            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1518            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1519            if( fext )
1520                fireClientGotAllowedFast( msgs, ui32 );
1521            else {
1522                fireError( msgs, EMSGSIZE );
1523                return READ_ERR;
1524            }
1525            break;
1526
1527        case BT_FEXT_HAVE_ALL:
1528            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1529            if( fext ) {
1530                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1531                updatePeerProgress( msgs );
1532            } else {
1533                fireError( msgs, EMSGSIZE );
1534                return READ_ERR;
1535            }
1536            break;
1537
1538        case BT_FEXT_HAVE_NONE:
1539            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1540            if( fext ) {
1541                tr_bitfieldClear( msgs->peer->have );
1542                updatePeerProgress( msgs );
1543            } else {
1544                fireError( msgs, EMSGSIZE );
1545                return READ_ERR;
1546            }
1547            break;
1548
1549        case BT_FEXT_REJECT:
1550        {
1551            struct peer_request r;
1552            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1553            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1554            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1555            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1556            if( fext )
1557                reqListRemove( &msgs->clientAskedFor, &r );
1558            else {
1559                fireError( msgs, EMSGSIZE );
1560                return READ_ERR;
1561            }
1562            break;
1563        }
1564
1565        case BT_LTEP:
1566            dbgmsg( msgs, "Got a BT_LTEP" );
1567            parseLtep( msgs, msglen, inbuf );
1568            break;
1569
1570        default:
1571            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1572            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1573            break;
1574    }
1575
1576    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen - 1 );
1577
1578    msgs->incoming.length = 0;
1579    return ret;
1580}
1581
1582static inline void
1583decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1584{
1585    tr_torrent * tor = msgs->torrent;
1586
1587    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1588}
1589
1590static inline void
1591clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1592{
1593    decrementDownloadedCount( msgs, req->length );
1594}
1595
1596static void
1597addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1598{
1599    if( !msgs->peer->blame )
1600         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1601    tr_bitfieldAdd( msgs->peer->blame, index );
1602}
1603
1604/* returns 0 on success, or an errno on failure */
1605static int
1606clientGotBlock( tr_peermsgs *               msgs,
1607                const uint8_t *             data,
1608                const struct peer_request * req )
1609{
1610    int err;
1611    tr_torrent * tor = msgs->torrent;
1612    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1613
1614    assert( msgs );
1615    assert( req );
1616
1617    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1618        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1619                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1620        return EMSGSIZE;
1621    }
1622
1623    /* save the block */
1624    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1625
1626    /**
1627    *** Remove the block from our `we asked for this' list
1628    **/
1629
1630    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1631        clientGotUnwantedBlock( msgs, req );
1632        dbgmsg( msgs, "we didn't ask for this message..." );
1633        return 0;
1634    }
1635
1636    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1637            msgs->clientAskedFor.count );
1638
1639    /**
1640    *** Error checks
1641    **/
1642
1643    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1644        dbgmsg( msgs, "we have this block already..." );
1645        clientGotUnwantedBlock( msgs, req );
1646        return 0;
1647    }
1648
1649    /**
1650    ***  Save the block
1651    **/
1652
1653    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1654        return err;
1655
1656    addPeerToBlamefield( msgs, req->index );
1657    fireGotBlock( msgs, req );
1658    return 0;
1659}
1660
1661static int peerPulse( void * vmsgs );
1662
1663static void
1664didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1665{
1666    tr_peermsgs * msgs = vmsgs;
1667    firePeerGotData( msgs, bytesWritten, wasPieceData );
1668    peerPulse( msgs );
1669}
1670
1671static ReadState
1672canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1673{
1674    ReadState         ret;
1675    tr_peermsgs *     msgs = vmsgs;
1676    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1677    const size_t      inlen = EVBUFFER_LENGTH( in );
1678
1679    if( !inlen )
1680        return READ_LATER;
1681
1682    /* Incoming data is processed in two stages. First the length is read
1683     * and then readBtMessage() waits until all the data has arrived in
1684     * the input buffer before starting to parse it */
1685    if( msgs->incoming.length == 0 )
1686        ret = readBtLength ( msgs, in, inlen );
1687    else
1688        ret = readBtMessage( msgs, in, inlen, piece );
1689
1690    /* log the raw data that was read */
1691    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1692        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1693
1694    return ret;
1695}
1696
1697/**
1698***
1699**/
1700
1701static int
1702ratePulse( void * vmsgs )
1703{
1704    tr_peermsgs * msgs = vmsgs;
1705    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1706    const int seconds = 10;
1707    const int floor = 8;
1708    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1709
1710    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1711
1712    if( msgs->reqq > 0 )
1713        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1714
1715    return TRUE;
1716}
1717
1718static size_t
1719fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1720{
1721    size_t bytesWritten = 0;
1722    struct peer_request req;
1723    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1724    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1725
1726    /**
1727    ***  Protocol messages
1728    **/
1729
1730    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1731    {
1732        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1733        msgs->outMessagesBatchedAt = now;
1734    }
1735    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1736    {
1737        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1738        /* flush the protocol messages */
1739        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1740        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1741        msgs->clientSentAnythingAt = now;
1742        msgs->outMessagesBatchedAt = 0;
1743        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1744        bytesWritten +=  len;
1745    }
1746
1747    /**
1748    ***  Blocks
1749    **/
1750
1751    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1752        && popNextRequest( msgs, &req ) )
1753    {
1754        if( requestIsValid( msgs, &req )
1755            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1756        {
1757            int err;
1758            static uint8_t * buf = NULL;
1759
1760            if( buf == NULL )
1761                buf = tr_new( uint8_t, MAX_BLOCK_SIZE );
1762
1763            /* send a block */
1764            if(( err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ))) {
1765                fireError( msgs, err );
1766                bytesWritten = 0;
1767                msgs = NULL;
1768            } else {
1769                tr_peerIo * io = msgs->peer->io;
1770                struct evbuffer * out = tr_getBuffer( );
1771                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1772                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1773                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1774                tr_peerIoWriteUint32( io, out, req.index );
1775                tr_peerIoWriteUint32( io, out, req.offset );
1776                tr_peerIoWriteBytes ( io, out, buf, req.length );
1777                tr_peerIoWriteBuf( io, out, TRUE );
1778                bytesWritten += EVBUFFER_LENGTH( out );
1779                msgs->clientSentAnythingAt = now;
1780                tr_releaseBuffer( out );
1781            }
1782        }
1783        else if( fext ) /* peer needs a reject message */
1784        {
1785            protocolSendReject( msgs, &req );
1786        }
1787    }
1788
1789    /**
1790    ***  Keepalive
1791    **/
1792
1793    if( ( msgs != NULL )
1794        && ( msgs->clientSentAnythingAt != 0 )
1795        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1796    {
1797        dbgmsg( msgs, "sending a keepalive message" );
1798        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1799        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1800    }
1801
1802    return bytesWritten;
1803}
1804
1805static int
1806peerPulse( void * vmsgs )
1807{
1808    tr_peermsgs * msgs = vmsgs;
1809    const time_t  now = time( NULL );
1810
1811    ratePulse( msgs );
1812
1813    pumpRequestQueue( msgs, now );
1814    expireOldRequests( msgs, now );
1815
1816    for( ;; )
1817        if( fillOutputBuffer( msgs, now ) < 1 )
1818            break;
1819
1820    return TRUE; /* loop forever */
1821}
1822
1823void
1824tr_peerMsgsPulse( tr_peermsgs * msgs )
1825{
1826    if( msgs != NULL )
1827        peerPulse( msgs );
1828}
1829
1830static void
1831gotError( tr_peerIo  * io UNUSED,
1832          short        what,
1833          void       * vmsgs )
1834{
1835    if( what & EVBUFFER_TIMEOUT )
1836        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1837    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1838        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1839               what, errno, tr_strerror( errno ) );
1840    fireError( vmsgs, ENOTCONN );
1841}
1842
1843static void
1844sendBitfield( tr_peermsgs * msgs )
1845{
1846    struct evbuffer * out = msgs->outMessages;
1847    tr_bitfield *     field;
1848    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1849    size_t            i;
1850    size_t            lazyCount = 0;
1851
1852    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1853
1854    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1855    {
1856        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1857            speed over a truly random sample -- let's limit the pool size to
1858            the first 1000 pieces so large torrents don't bog things down */
1859        size_t poolSize;
1860        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1861        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1862
1863        /* build the pool */
1864        for( i=poolSize=0; i<maxPoolSize; ++i )
1865            if( tr_bitfieldHas( field, i ) )
1866                pool[poolSize++] = i;
1867
1868        /* pull random piece indices from the pool */
1869        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1870        {
1871            const int pos = tr_cryptoWeakRandInt( poolSize );
1872            const tr_piece_index_t piece = pool[pos];
1873            tr_bitfieldRem( field, piece );
1874            lazyPieces[lazyCount++] = piece;
1875            pool[pos] = pool[--poolSize];
1876        }
1877
1878        /* cleanup */
1879        tr_free( pool );
1880    }
1881
1882    tr_peerIoWriteUint32( msgs->peer->io, out,
1883                          sizeof( uint8_t ) + field->byteCount );
1884    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1885    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1886    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1887            EVBUFFER_LENGTH( out ) );
1888    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1889
1890    for( i = 0; i < lazyCount; ++i )
1891        protocolSendHave( msgs, lazyPieces[i] );
1892
1893    tr_bitfieldFree( field );
1894}
1895
1896static void
1897tellPeerWhatWeHave( tr_peermsgs * msgs )
1898{
1899    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1900
1901    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1902    {
1903        protocolSendHaveAll( msgs );
1904    }
1905    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1906    {
1907        protocolSendHaveNone( msgs );
1908    }
1909    else
1910    {
1911        sendBitfield( msgs );
1912    }
1913}
1914
1915/**
1916***
1917**/
1918
1919/* some peers give us error messages if we send
1920   more than this many peers in a single pex message
1921   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1922#define MAX_PEX_ADDED 50
1923#define MAX_PEX_DROPPED 50
1924
1925typedef struct
1926{
1927    tr_pex *  added;
1928    tr_pex *  dropped;
1929    tr_pex *  elements;
1930    int       addedCount;
1931    int       droppedCount;
1932    int       elementCount;
1933}
1934PexDiffs;
1935
1936static void
1937pexAddedCb( void * vpex,
1938            void * userData )
1939{
1940    PexDiffs * diffs = userData;
1941    tr_pex *   pex = vpex;
1942
1943    if( diffs->addedCount < MAX_PEX_ADDED )
1944    {
1945        diffs->added[diffs->addedCount++] = *pex;
1946        diffs->elements[diffs->elementCount++] = *pex;
1947    }
1948}
1949
1950static inline void
1951pexDroppedCb( void * vpex,
1952              void * userData )
1953{
1954    PexDiffs * diffs = userData;
1955    tr_pex *   pex = vpex;
1956
1957    if( diffs->droppedCount < MAX_PEX_DROPPED )
1958    {
1959        diffs->dropped[diffs->droppedCount++] = *pex;
1960    }
1961}
1962
1963static inline void
1964pexElementCb( void * vpex,
1965              void * userData )
1966{
1967    PexDiffs * diffs = userData;
1968    tr_pex * pex = vpex;
1969
1970    diffs->elements[diffs->elementCount++] = *pex;
1971}
1972
1973static void
1974sendPex( tr_peermsgs * msgs )
1975{
1976    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1977    {
1978        PexDiffs diffs;
1979        PexDiffs diffs6;
1980        tr_pex * newPex = NULL;
1981        tr_pex * newPex6 = NULL;
1982        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
1983                                                 msgs->torrent->info.hash,
1984                                                 &newPex, TR_AF_INET );
1985        const int newCount6 = tr_peerMgrGetPeers( msgs->session->peerMgr,
1986                                                  msgs->torrent->info.hash,
1987                                                  &newPex6, TR_AF_INET6 );
1988
1989        /* build the diffs */
1990        diffs.added = tr_new( tr_pex, newCount );
1991        diffs.addedCount = 0;
1992        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1993        diffs.droppedCount = 0;
1994        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1995        diffs.elementCount = 0;
1996        tr_set_compare( msgs->pex, msgs->pexCount,
1997                        newPex, newCount,
1998                        tr_pexCompare, sizeof( tr_pex ),
1999                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2000        diffs6.added = tr_new( tr_pex, newCount6 );
2001        diffs6.addedCount = 0;
2002        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2003        diffs6.droppedCount = 0;
2004        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2005        diffs6.elementCount = 0;
2006        tr_set_compare( msgs->pex6, msgs->pexCount6,
2007                        newPex6, newCount6,
2008                        tr_pexCompare, sizeof( tr_pex ),
2009                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2010        dbgmsg(
2011            msgs,
2012            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2013            msgs->pexCount, newCount + newCount6,
2014            diffs.addedCount + diffs6.addedCount,
2015            diffs.droppedCount + diffs6.droppedCount );
2016
2017        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2018            !diffs6.droppedCount )
2019        {
2020            tr_free( diffs.elements );
2021            tr_free( diffs6.elements );
2022        }
2023        else
2024        {
2025            int  i;
2026            tr_benc val;
2027            char * benc;
2028            int bencLen;
2029            uint8_t * tmp, *walk;
2030            struct evbuffer * out = msgs->outMessages;
2031
2032            /* update peer */
2033            tr_free( msgs->pex );
2034            msgs->pex = diffs.elements;
2035            msgs->pexCount = diffs.elementCount;
2036            tr_free( msgs->pex6 );
2037            msgs->pex6 = diffs6.elements;
2038            msgs->pexCount6 = diffs6.elementCount;
2039
2040            /* build the pex payload */
2041            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2042                                         * speed vs. likelihood? */
2043
2044            /* "added" */
2045            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2046            for( i = 0; i < diffs.addedCount; ++i )
2047            {
2048                tr_suspectAddress( &diffs.added[i].addr, "pex" );
2049                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2050                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2051            }
2052            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2053            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2054            tr_free( tmp );
2055
2056            /* "added.f" */
2057            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2058            for( i = 0; i < diffs.addedCount; ++i )
2059                *walk++ = diffs.added[i].flags;
2060            assert( ( walk - tmp ) == diffs.addedCount );
2061            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2062            tr_free( tmp );
2063
2064            /* "dropped" */
2065            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2066            for( i = 0; i < diffs.droppedCount; ++i )
2067            {
2068                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2069                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2070            }
2071            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2072            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2073            tr_free( tmp );
2074           
2075            /* "added6" */
2076            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2077            for( i = 0; i < diffs6.addedCount; ++i )
2078            {
2079                tr_suspectAddress( &diffs6.added[i].addr, "pex6" );
2080                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2081                walk += 16;
2082                memcpy( walk, &diffs6.added[i].port, 2 );
2083                walk += 2;
2084            }
2085            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2086            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2087            tr_free( tmp );
2088           
2089            /* "added6.f" */
2090            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2091            for( i = 0; i < diffs6.addedCount; ++i )
2092                *walk++ = diffs6.added[i].flags;
2093            assert( ( walk - tmp ) == diffs6.addedCount );
2094            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2095            tr_free( tmp );
2096           
2097            /* "dropped6" */
2098            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2099            for( i = 0; i < diffs6.droppedCount; ++i )
2100            {
2101                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2102                walk += 16;
2103                memcpy( walk, &diffs6.dropped[i].port, 2 );
2104                walk += 2;
2105            }
2106            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2107            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2108            tr_free( tmp );
2109
2110            /* write the pex message */
2111            benc = tr_bencSave( &val, &bencLen );
2112            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2113            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2114            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2115            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2116            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2117            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2118
2119            tr_free( benc );
2120            tr_bencFree( &val );
2121        }
2122
2123        /* cleanup */
2124        tr_free( diffs.added );
2125        tr_free( diffs.dropped );
2126        tr_free( newPex );
2127        tr_free( diffs6.added );
2128        tr_free( diffs6.dropped );
2129        tr_free( newPex6 );
2130
2131        msgs->clientSentPexAt = time( NULL );
2132    }
2133}
2134
2135static inline int
2136pexPulse( void * vpeer )
2137{
2138    sendPex( vpeer );
2139    return TRUE;
2140}
2141
2142/**
2143***
2144**/
2145
2146tr_peermsgs*
2147tr_peerMsgsNew( struct tr_torrent * torrent,
2148                struct tr_peer    * peer,
2149                tr_delivery_func    func,
2150                void              * userData,
2151                tr_publisher_tag  * setme )
2152{
2153    tr_peermsgs * m;
2154
2155    assert( peer );
2156    assert( peer->io );
2157
2158    m = tr_new0( tr_peermsgs, 1 );
2159    m->publisher = TR_PUBLISHER_INIT;
2160    m->peer = peer;
2161    m->session = torrent->session;
2162    m->torrent = torrent;
2163    m->peer->clientIsChoked = 1;
2164    m->peer->peerIsChoked = 1;
2165    m->peer->clientIsInterested = 0;
2166    m->peer->peerIsInterested = 0;
2167    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2168    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2169    m->outMessages = evbuffer_new( );
2170    m->outMessagesBatchedAt = 0;
2171    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2172    m->incoming.block = evbuffer_new( );
2173    m->peerAskedFor = REQUEST_LIST_INIT;
2174    m->clientAskedFor = REQUEST_LIST_INIT;
2175    m->clientWillAskFor = REQUEST_LIST_INIT;
2176    peer->msgs = m;
2177
2178    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2179
2180    if( tr_peerIoSupportsLTEP( peer->io ) )
2181        sendLtepHandshake( m );
2182
2183    tellPeerWhatWeHave( m );
2184
2185    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2186    ratePulse( m );
2187
2188    return m;
2189}
2190
2191void
2192tr_peerMsgsFree( tr_peermsgs* msgs )
2193{
2194    if( msgs )
2195    {
2196        tr_timerFree( &msgs->pexTimer );
2197        tr_publisherDestruct( &msgs->publisher );
2198        reqListClear( &msgs->clientWillAskFor );
2199        reqListClear( &msgs->clientAskedFor );
2200        reqListClear( &msgs->peerAskedFor );
2201
2202        evbuffer_free( msgs->incoming.block );
2203        evbuffer_free( msgs->outMessages );
2204        tr_free( msgs->pex6 );
2205        tr_free( msgs->pex );
2206
2207        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2208        tr_free( msgs );
2209    }
2210}
2211
2212void
2213tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2214                        tr_publisher_tag tag )
2215{
2216    tr_publisherUnsubscribe( &peer->publisher, tag );
2217}
2218
Note: See TracBrowser for help on using the repository browser.