source: trunk/libtransmission/peer-msgs.c @ 7445

Last change on this file since 7445 was 7445, checked in by charles, 12 years ago

(trunk libT) remove unused PEER_PULSE_INTERVAL

  • Property svn:keywords set to Date Rev Author Id
File size: 65.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7445 2008-12-21 17:51:46Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56
57    BT_FEXT_SUGGEST         = 13,
58    BT_FEXT_HAVE_ALL        = 14,
59    BT_FEXT_HAVE_NONE       = 15,
60    BT_FEXT_REJECT          = 16,
61    BT_FEXT_ALLOWED_FAST    = 17,
62
63    BT_LTEP                 = 20,
64
65    LTEP_HANDSHAKE          = 0,
66
67    TR_LTEP_PEX             = 1,
68
69
70
71    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
72
73    /* idle seconds before we send a keepalive */
74    KEEPALIVE_INTERVAL_SECS = 100,
75
76    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
77
78    MAX_QUEUE_SIZE          = ( 100 ),
79
80    /* how long an unsent request can stay queued before it's returned
81       back to the peer-mgr's pool of requests */
82    QUEUED_REQUEST_TTL_SECS = 20,
83
84    /* how long a sent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    SENT_REQUEST_TTL_SECS = 240,
87
88    /* used in lowering the outMessages queue period */
89    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
90    HIGH_PRIORITY_INTERVAL_SECS = 2,
91    LOW_PRIORITY_INTERVAL_SECS = 20,
92
93    /* number of pieces to remove from the bitfield when
94     * lazy bitfields are turned on */
95    LAZY_PIECE_COUNT = 26,
96
97    /* number of pieces we'll allow in our fast set */
98    MAX_FAST_SET_SIZE = 3
99};
100
101/**
102***  REQUEST MANAGEMENT
103**/
104
105enum
106{
107    AWAITING_BT_LENGTH,
108    AWAITING_BT_ID,
109    AWAITING_BT_MESSAGE,
110    AWAITING_BT_PIECE
111};
112
113struct peer_request
114{
115    uint32_t    index;
116    uint32_t    offset;
117    uint32_t    length;
118    time_t      time_requested;
119};
120
121static int
122compareRequest( const void * va, const void * vb )
123{
124    const struct peer_request * a = va;
125    const struct peer_request * b = vb;
126
127    if( a->index != b->index )
128        return a->index < b->index ? -1 : 1;
129
130    if( a->offset != b->offset )
131        return a->offset < b->offset ? -1 : 1;
132
133    if( a->length != b->length )
134        return a->length < b->length ? -1 : 1;
135
136    return 0;
137}
138
139struct request_list
140{
141    uint16_t               count;
142    uint16_t               max;
143    struct peer_request *  requests;
144};
145
146static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
147
148static void
149reqListReserve( struct request_list * list,
150                uint16_t              max )
151{
152    if( list->max < max )
153    {
154        list->max = max;
155        list->requests = tr_renew( struct peer_request,
156                                   list->requests,
157                                   list->max );
158    }
159}
160
161static void
162reqListClear( struct request_list * list )
163{
164    tr_free( list->requests );
165    *list = REQUEST_LIST_INIT;
166}
167
168static void
169reqListCopy( struct request_list * dest, const struct request_list * src )
170{
171    dest->count = dest->max = src->count;
172    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
173}
174
175static void
176reqListRemoveOne( struct request_list * list,
177                  int                   i )
178{
179    assert( 0 <= i && i < list->count );
180
181    memmove( &list->requests[i],
182             &list->requests[i + 1],
183             sizeof( struct peer_request ) * ( --list->count - i ) );
184}
185
186static void
187reqListAppend( struct request_list *       list,
188               const struct peer_request * req )
189{
190    if( ++list->count >= list->max )
191        reqListReserve( list, list->max + 8 );
192
193    list->requests[list->count - 1] = *req;
194}
195
196static int
197reqListPop( struct request_list * list,
198            struct peer_request * setme )
199{
200    int success;
201
202    if( !list->count )
203        success = FALSE;
204    else {
205        *setme = list->requests[0];
206        reqListRemoveOne( list, 0 );
207        success = TRUE;
208    }
209
210    return success;
211}
212
213static int
214reqListFind( struct request_list *       list,
215             const struct peer_request * key )
216{
217    uint16_t i;
218
219    for( i = 0; i < list->count; ++i )
220        if( !compareRequest( key, list->requests + i ) )
221            return i;
222
223    return -1;
224}
225
226static int
227reqListRemove( struct request_list *       list,
228               const struct peer_request * key )
229{
230    int success;
231    const int i = reqListFind( list, key );
232
233    if( i < 0 )
234        success = FALSE;
235    else {
236        reqListRemoveOne( list, i );
237        success = TRUE;
238    }
239
240    return success;
241}
242
243/**
244***
245**/
246
247/* this is raw, unchanged data from the peer regarding
248 * the current message that it's sending us. */
249struct tr_incoming
250{
251    uint8_t                id;
252    uint32_t               length; /* includes the +1 for id length */
253    struct peer_request    blockReq; /* metadata for incoming blocks */
254    struct evbuffer *      block; /* piece data for incoming blocks */
255};
256
257/**
258 * Low-level communication state information about a connected peer.
259 *
260 * This structure remembers the low-level protocol states that we're
261 * in with this peer, such as active requests, pex messages, and so on.
262 * Its fields are all private to peer-msgs.c.
263 *
264 * Data not directly involved with sending & receiving messages is
265 * stored in tr_peer, where it can be accessed by both peermsgs and
266 * the peer manager.
267 *
268 * @see struct peer_atom
269 * @see tr_peer
270 */
271struct tr_peermsgs
272{
273    tr_bool         peerSentBitfield;
274    tr_bool         peerSupportsPex;
275    tr_bool         clientSentLtepHandshake;
276    tr_bool         peerSentLtepHandshake;
277    tr_bool         haveFastSet;
278
279    uint8_t         state;
280    uint8_t         ut_pex_id;
281    uint16_t        pexCount;
282    uint16_t        pexCount6;
283    uint16_t        minActiveRequests;
284    uint16_t        maxActiveRequests;
285
286    size_t                 fastsetSize;
287    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
288
289    /* how long the outMessages batch should be allowed to grow before
290     * it's flushed -- some messages (like requests >:) should be sent
291     * very quickly; others aren't as urgent. */
292    int                    outMessagesBatchPeriod;
293
294    tr_peer *              peer;
295
296    tr_session *           session;
297    tr_torrent *           torrent;
298
299    tr_publisher_t *       publisher;
300
301    struct evbuffer *      outMessages; /* all the non-piece messages */
302
303    struct request_list    peerAskedFor;
304    struct request_list    clientAskedFor;
305    struct request_list    clientWillAskFor;
306
307    tr_timer             * pexTimer;
308    tr_pex               * pex;
309    tr_pex               * pex6;
310
311    time_t                 clientSentPexAt;
312    time_t                 clientSentAnythingAt;
313
314    /* when we started batching the outMessages */
315    time_t                outMessagesBatchedAt;
316
317    struct tr_incoming    incoming;
318
319    /* if the peer supports the Extension Protocol in BEP 10 and
320       supplied a reqq argument, it's stored here.  otherwise the
321       value is zero and should be ignored. */
322    int64_t               reqq;
323};
324
325/**
326***
327**/
328
329static void
330myDebug( const char * file, int line,
331         const struct tr_peermsgs * msgs,
332         const char * fmt, ... )
333{
334    FILE * fp = tr_getLog( );
335
336    if( fp )
337    {
338        va_list           args;
339        char              timestr[64];
340        struct evbuffer * buf = evbuffer_new( );
341        char *            base = tr_basename( file );
342
343        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
344                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
345                             msgs->torrent->info.name,
346                             tr_peerIoGetAddrStr( msgs->peer->io ),
347                             msgs->peer->client );
348        va_start( args, fmt );
349        evbuffer_add_vprintf( buf, fmt, args );
350        va_end( args );
351        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
352        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
353
354        tr_free( base );
355        evbuffer_free( buf );
356    }
357}
358
359#define dbgmsg( msgs, ... ) \
360    do { \
361        if( tr_deepLoggingIsActive( ) ) \
362            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
363    } while( 0 )
364
365/**
366***
367**/
368
369static void
370pokeBatchPeriod( tr_peermsgs * msgs,
371                 int           interval )
372{
373    if( msgs->outMessagesBatchPeriod > interval )
374    {
375        msgs->outMessagesBatchPeriod = interval;
376        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
377    }
378}
379
380static void
381dbgOutMessageLen( tr_peermsgs * msgs )
382{
383    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
384}
385
386static void
387protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
388{
389    tr_peerIo       * io  = msgs->peer->io;
390    struct evbuffer * out = msgs->outMessages;
391
392    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
393
394    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
395    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
396    tr_peerIoWriteUint32( io, out, req->index );
397    tr_peerIoWriteUint32( io, out, req->offset );
398    tr_peerIoWriteUint32( io, out, req->length );
399
400    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
401    dbgOutMessageLen( msgs );
402}
403
404static void
405protocolSendRequest( tr_peermsgs *               msgs,
406                     const struct peer_request * req )
407{
408    tr_peerIo       * io  = msgs->peer->io;
409    struct evbuffer * out = msgs->outMessages;
410
411    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
412    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
413    tr_peerIoWriteUint32( io, out, req->index );
414    tr_peerIoWriteUint32( io, out, req->offset );
415    tr_peerIoWriteUint32( io, out, req->length );
416
417    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
418    dbgOutMessageLen( msgs );
419    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
420}
421
422static void
423protocolSendCancel( tr_peermsgs *               msgs,
424                    const struct peer_request * req )
425{
426    tr_peerIo       * io  = msgs->peer->io;
427    struct evbuffer * out = msgs->outMessages;
428
429    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
430    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
431    tr_peerIoWriteUint32( io, out, req->index );
432    tr_peerIoWriteUint32( io, out, req->offset );
433    tr_peerIoWriteUint32( io, out, req->length );
434
435    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
436    dbgOutMessageLen( msgs );
437    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
438}
439
440static void
441protocolSendHave( tr_peermsgs * msgs,
442                  uint32_t      index )
443{
444    tr_peerIo       * io  = msgs->peer->io;
445    struct evbuffer * out = msgs->outMessages;
446
447    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
448    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
449    tr_peerIoWriteUint32( io, out, index );
450
451    dbgmsg( msgs, "sending Have %u...", index );
452    dbgOutMessageLen( msgs );
453    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
454}
455
456#if 0
457static void
458protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
459{
460    tr_peerIo       * io  = msgs->peer->io;
461    struct evbuffer * out = msgs->outMessages;
462
463    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
464
465    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
466    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
467    tr_peerIoWriteUint32( io, out, pieceIndex );
468
469    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
470    dbgOutMessageLen( msgs );
471}
472#endif
473
474static void
475protocolSendChoke( tr_peermsgs * msgs,
476                   int           choke )
477{
478    tr_peerIo       * io  = msgs->peer->io;
479    struct evbuffer * out = msgs->outMessages;
480
481    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
482    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
483
484    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
485    dbgOutMessageLen( msgs );
486    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
487}
488
489static void
490protocolSendHaveAll( tr_peermsgs * msgs )
491{
492    tr_peerIo       * io  = msgs->peer->io;
493    struct evbuffer * out = msgs->outMessages;
494
495    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
496
497    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
498    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
499
500    dbgmsg( msgs, "sending HAVE_ALL..." );
501    dbgOutMessageLen( msgs );
502    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
503}
504
505static void
506protocolSendHaveNone( tr_peermsgs * msgs )
507{
508    tr_peerIo       * io  = msgs->peer->io;
509    struct evbuffer * out = msgs->outMessages;
510
511    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
512
513    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
514    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
515
516    dbgmsg( msgs, "sending HAVE_NONE..." );
517    dbgOutMessageLen( msgs );
518    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
519}
520
521/**
522***  EVENTS
523**/
524
525static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
526
527static void
528publish( tr_peermsgs * msgs, tr_peer_event * e )
529{
530    assert( msgs->peer );
531    assert( msgs->peer->msgs == msgs );
532
533    tr_publisherPublish( msgs->publisher, msgs->peer, e );
534}
535
536static void
537fireError( tr_peermsgs * msgs, int err )
538{
539    tr_peer_event e = blankEvent;
540    e.eventType = TR_PEER_ERROR;
541    e.err = err;
542    publish( msgs, &e );
543}
544
545static void
546fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
547{
548    tr_peer_event e = blankEvent;
549    e.eventType = TR_PEER_UPLOAD_ONLY;
550    e.uploadOnly = uploadOnly;
551    publish( msgs, &e );
552}
553
554static void
555fireNeedReq( tr_peermsgs * msgs )
556{
557    tr_peer_event e = blankEvent;
558    e.eventType = TR_PEER_NEED_REQ;
559    publish( msgs, &e );
560}
561
562static void
563firePeerProgress( tr_peermsgs * msgs )
564{
565    tr_peer_event e = blankEvent;
566    e.eventType = TR_PEER_PEER_PROGRESS;
567    e.progress = msgs->peer->progress;
568    publish( msgs, &e );
569}
570
571static void
572fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
573{
574    tr_peer_event e = blankEvent;
575    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
576    e.pieceIndex = req->index;
577    e.offset = req->offset;
578    e.length = req->length;
579    publish( msgs, &e );
580}
581
582static void
583fireClientGotData( tr_peermsgs * msgs,
584                   uint32_t      length,
585                   int           wasPieceData )
586{
587    tr_peer_event e = blankEvent;
588
589    e.length = length;
590    e.eventType = TR_PEER_CLIENT_GOT_DATA;
591    e.wasPieceData = wasPieceData;
592    publish( msgs, &e );
593}
594
595static void
596fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
597{
598    tr_peer_event e = blankEvent;
599    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
600    e.pieceIndex = pieceIndex;
601    publish( msgs, &e );
602}
603
604static void
605fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
606{
607    tr_peer_event e = blankEvent;
608    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
609    e.pieceIndex = pieceIndex;
610    publish( msgs, &e );
611}
612
613static void
614firePeerGotData( tr_peermsgs  * msgs,
615                 uint32_t       length,
616                 int            wasPieceData )
617{
618    tr_peer_event e = blankEvent;
619
620    e.length = length;
621    e.eventType = TR_PEER_PEER_GOT_DATA;
622    e.wasPieceData = wasPieceData;
623
624    publish( msgs, &e );
625}
626
627static void
628fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
629{
630    tr_peer_event e = blankEvent;
631    e.eventType = TR_PEER_CANCEL;
632    e.pieceIndex = req->index;
633    e.offset = req->offset;
634    e.length = req->length;
635    publish( msgs, &e );
636}
637
638/**
639***  ALLOWED FAST SET
640***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
641**/
642
643size_t
644tr_generateAllowedSet( tr_piece_index_t * setmePieces,
645                       size_t             desiredSetSize,
646                       size_t             pieceCount,
647                       const uint8_t    * infohash,
648                       const tr_address * addr )
649{
650    size_t setSize = 0;
651
652    assert( setmePieces );
653    assert( desiredSetSize <= pieceCount );
654    assert( desiredSetSize );
655    assert( pieceCount );
656    assert( infohash );
657    assert( addr );
658
659    if( addr->type == TR_AF_INET )
660    {
661        uint8_t w[SHA_DIGEST_LENGTH + 4];
662        uint8_t x[SHA_DIGEST_LENGTH];
663
664        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
665        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
666        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
667
668        while( setSize<desiredSetSize )
669        {
670            int i;
671            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
672            {
673                size_t k;
674                uint32_t j = i * 4;                                  /* (5) */
675                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
676                uint32_t index = y % pieceCount;                     /* (7) */
677
678                for( k=0; k<setSize; ++k )                           /* (8) */
679                    if( setmePieces[k] == index )
680                        break;
681
682                if( k == setSize )
683                    setmePieces[setSize++] = index;                  /* (9) */
684            }
685
686            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
687        }
688    }
689
690    return setSize;
691}
692
693static void
694updateFastSet( tr_peermsgs * msgs UNUSED )
695{
696#if 0
697    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
698    const int peerIsNeedy = msgs->peer->progress < 0.10;
699
700    if( fext && peerIsNeedy && !msgs->haveFastSet )
701    {
702        size_t i;
703        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
704        const tr_info * inf = &msgs->torrent->info;
705        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
706
707        /* build the fast set */
708        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
709        msgs->haveFastSet = 1;
710
711        /* send it to the peer */
712        for( i=0; i<msgs->fastsetSize; ++i )
713            protocolSendAllowedFast( msgs, msgs->fastset[i] );
714    }
715#endif
716}
717
718/**
719***  INTEREST
720**/
721
722static int
723isPieceInteresting( const tr_peermsgs * msgs,
724                    tr_piece_index_t    piece )
725{
726    const tr_torrent * torrent = msgs->torrent;
727
728    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
729          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
730          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
731}
732
733/* "interested" means we'll ask for piece data if they unchoke us */
734static int
735isPeerInteresting( const tr_peermsgs * msgs )
736{
737    tr_piece_index_t    i;
738    const tr_torrent *  torrent;
739    const tr_bitfield * bitfield;
740    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
741
742    if( clientIsSeed )
743        return FALSE;
744
745    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
746        return FALSE;
747
748    torrent = msgs->torrent;
749    bitfield = tr_cpPieceBitfield( torrent->completion );
750
751    if( !msgs->peer->have )
752        return TRUE;
753
754    assert( bitfield->byteCount == msgs->peer->have->byteCount );
755
756    for( i = 0; i < torrent->info.pieceCount; ++i )
757        if( isPieceInteresting( msgs, i ) )
758            return TRUE;
759
760    return FALSE;
761}
762
763static void
764sendInterest( tr_peermsgs * msgs,
765              int           weAreInterested )
766{
767    struct evbuffer * out = msgs->outMessages;
768
769    assert( msgs );
770    assert( weAreInterested == 0 || weAreInterested == 1 );
771
772    msgs->peer->clientIsInterested = weAreInterested;
773    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
774    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
775    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
776    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
777    dbgOutMessageLen( msgs );
778}
779
780static void
781updateInterest( tr_peermsgs * msgs )
782{
783    const int i = isPeerInteresting( msgs );
784
785    if( i != msgs->peer->clientIsInterested )
786        sendInterest( msgs, i );
787    if( i )
788        fireNeedReq( msgs );
789}
790
791static int
792popNextRequest( tr_peermsgs *         msgs,
793                struct peer_request * setme )
794{
795    return reqListPop( &msgs->peerAskedFor, setme );
796}
797
798static void
799cancelAllRequestsToClient( tr_peermsgs * msgs )
800{
801    struct peer_request req;
802    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
803
804    while( popNextRequest( msgs, &req ) )
805        if( mustSendCancel )
806            protocolSendReject( msgs, &req );
807}
808
809void
810tr_peerMsgsSetChoke( tr_peermsgs * msgs,
811                     int           choke )
812{
813    const time_t now = time( NULL );
814    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
815
816    assert( msgs );
817    assert( msgs->peer );
818    assert( choke == 0 || choke == 1 );
819
820    if( msgs->peer->chokeChangedAt > fibrillationTime )
821    {
822        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
823    }
824    else if( msgs->peer->peerIsChoked != choke )
825    {
826        msgs->peer->peerIsChoked = choke;
827        if( choke )
828            cancelAllRequestsToClient( msgs );
829        protocolSendChoke( msgs, choke );
830        msgs->peer->chokeChangedAt = now;
831    }
832}
833
834/**
835***
836**/
837
838void
839tr_peerMsgsHave( tr_peermsgs * msgs,
840                 uint32_t      index )
841{
842    protocolSendHave( msgs, index );
843
844    /* since we have more pieces now, we might not be interested in this peer */
845    updateInterest( msgs );
846}
847
848/**
849***
850**/
851
852static int
853reqIsValid( const tr_peermsgs * peer,
854            uint32_t            index,
855            uint32_t            offset,
856            uint32_t            length )
857{
858    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
859}
860
861static int
862requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
863{
864    return reqIsValid( msgs, req->index, req->offset, req->length );
865}
866
867static void
868expireOldRequests( tr_peermsgs * msgs, const time_t now  )
869{
870    int i;
871    time_t oldestAllowed;
872    struct request_list tmp = REQUEST_LIST_INIT;
873    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
874    dbgmsg( msgs, "entering `expire old requests' block" );
875
876    /* cancel requests that have been queued for too long */
877    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
878    reqListCopy( &tmp, &msgs->clientWillAskFor );
879    for( i = 0; i < tmp.count; ++i ) {
880        const struct peer_request * req = &tmp.requests[i];
881        if( req->time_requested < oldestAllowed )
882            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
883    }
884    reqListClear( &tmp );
885
886    /* if the peer doesn't support "Reject Request",
887     * cancel requests that were sent too long ago. */
888    if( !fext ) {
889        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
890        reqListCopy( &tmp, &msgs->clientAskedFor );
891        for( i=0; i<tmp.count; ++i ) {
892            const struct peer_request * req = &tmp.requests[i];
893            if( req->time_requested < oldestAllowed )
894                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
895        }
896        reqListClear( &tmp );
897    }
898
899    dbgmsg( msgs, "leaving `expire old requests' block" );
900}
901
902static void
903pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
904{
905    const int           max = msgs->maxActiveRequests;
906    const int           min = msgs->minActiveRequests;
907    int                 sent = 0;
908    int                 count = msgs->clientAskedFor.count;
909    struct peer_request req;
910
911    if( count > min )
912        return;
913    if( msgs->peer->clientIsChoked )
914        return;
915    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
916        return;
917
918    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
919    {
920        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
921
922        assert( requestIsValid( msgs, &req ) );
923        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
924
925        /* don't ask for it if we've already got it... this block may have
926         * come in from a different peer after we cancelled a request for it */
927        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
928        {
929            protocolSendRequest( msgs, &req );
930            req.time_requested = now;
931            reqListAppend( &msgs->clientAskedFor, &req );
932
933            ++count;
934            ++sent;
935        }
936    }
937
938    if( sent )
939        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
940                sent,
941                msgs->clientAskedFor.count,
942                msgs->clientWillAskFor.count );
943
944    if( count < max )
945        fireNeedReq( msgs );
946}
947
948static int
949requestQueueIsFull( const tr_peermsgs * msgs )
950{
951    const int req_max = msgs->maxActiveRequests;
952    return msgs->clientWillAskFor.count >= req_max;
953}
954
955tr_addreq_t
956tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
957                       uint32_t         index,
958                       uint32_t         offset,
959                       uint32_t         length,
960                       int              doForce )
961{
962    struct peer_request req;
963
964    assert( msgs );
965    assert( msgs->torrent );
966    assert( reqIsValid( msgs, index, offset, length ) );
967
968    /**
969    ***  Reasons to decline the request
970    **/
971
972    /* don't send requests to choked clients */
973    if( !doForce && msgs->peer->clientIsChoked ) {
974        dbgmsg( msgs, "declining request because they're choking us" );
975        return TR_ADDREQ_CLIENT_CHOKED;
976    }
977
978    /* peer doesn't have this piece */
979    if( !doForce && !tr_bitfieldHas( msgs->peer->have, index ) )
980        return TR_ADDREQ_MISSING;
981
982    /* peer's queue is full */
983    if( !doForce && requestQueueIsFull( msgs ) ) {
984        dbgmsg( msgs, "declining request because we're full" );
985        return TR_ADDREQ_FULL;
986    }
987
988    /* have we already asked for this piece? */
989    req.index = index;
990    req.offset = offset;
991    req.length = length;
992    if( !doForce ) {
993        if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
994            dbgmsg( msgs, "declining because it's a duplicate" );
995            return TR_ADDREQ_DUPLICATE;
996        }
997        if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
998            dbgmsg( msgs, "declining because it's a duplicate" );
999            return TR_ADDREQ_DUPLICATE;
1000        }
1001    }
1002
1003    /**
1004    ***  Accept this request
1005    **/
1006
1007    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1008            index, offset, length );
1009    req.time_requested = time( NULL );
1010    reqListAppend( &msgs->clientWillAskFor, &req );
1011    return TR_ADDREQ_OK;
1012}
1013
1014static void
1015cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1016{
1017    int i;
1018    struct request_list a = msgs->clientWillAskFor;
1019    struct request_list b = msgs->clientAskedFor;
1020    dbgmsg( msgs, "cancelling all requests to peer" );
1021
1022    msgs->clientAskedFor = REQUEST_LIST_INIT;
1023    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1024
1025    for( i=0; i<a.count; ++i )
1026        fireCancelledReq( msgs, &a.requests[i] );
1027
1028    for( i = 0; i < b.count; ++i ) {
1029        fireCancelledReq( msgs, &b.requests[i] );
1030        if( sendCancel )
1031            protocolSendCancel( msgs, &b.requests[i] );
1032    }
1033
1034    reqListClear( &a );
1035    reqListClear( &b );
1036}
1037
1038void
1039tr_peerMsgsCancel( tr_peermsgs * msgs,
1040                   uint32_t      pieceIndex,
1041                   uint32_t      offset,
1042                   uint32_t      length )
1043{
1044    struct peer_request req;
1045
1046    assert( msgs != NULL );
1047    assert( length > 0 );
1048
1049
1050    /* have we asked the peer for this piece? */
1051    req.index = pieceIndex;
1052    req.offset = offset;
1053    req.length = length;
1054
1055    /* if it's only in the queue and hasn't been sent yet, free it */
1056    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1057        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1058        fireCancelledReq( msgs, &req );
1059    }
1060
1061    /* if it's already been sent, send a cancel message too */
1062    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1063        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1064        protocolSendCancel( msgs, &req );
1065        fireCancelledReq( msgs, &req );
1066    }
1067}
1068
1069
1070/**
1071***
1072**/
1073
1074static void
1075sendLtepHandshake( tr_peermsgs * msgs )
1076{
1077    tr_benc val, *m;
1078    char * buf;
1079    int len;
1080    int pex;
1081    struct evbuffer * out = msgs->outMessages;
1082
1083    if( msgs->clientSentLtepHandshake )
1084        return;
1085
1086    dbgmsg( msgs, "sending an ltep handshake" );
1087    msgs->clientSentLtepHandshake = 1;
1088
1089    /* decide if we want to advertise pex support */
1090    if( !tr_torrentAllowsPex( msgs->torrent ) )
1091        pex = 0;
1092    else if( msgs->peerSentLtepHandshake )
1093        pex = msgs->peerSupportsPex ? 1 : 0;
1094    else
1095        pex = 1;
1096
1097    tr_bencInitDict( &val, 5 );
1098    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1099    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1100    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1101    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1102    m  = tr_bencDictAddDict( &val, "m", 1 );
1103    if( pex )
1104        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1105    buf = tr_bencSave( &val, &len );
1106
1107    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1108    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1109    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1110    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1111    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1112    dbgOutMessageLen( msgs );
1113
1114    /* cleanup */
1115    tr_bencFree( &val );
1116    tr_free( buf );
1117}
1118
1119static void
1120parseLtepHandshake( tr_peermsgs *     msgs,
1121                    int               len,
1122                    struct evbuffer * inbuf )
1123{
1124    int64_t   i;
1125    tr_benc   val, * sub;
1126    uint8_t * tmp = tr_new( uint8_t, len );
1127
1128    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1129    msgs->peerSentLtepHandshake = 1;
1130
1131    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1132    {
1133        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1134        tr_free( tmp );
1135        return;
1136    }
1137
1138    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1139
1140    /* does the peer prefer encrypted connections? */
1141    if( tr_bencDictFindInt( &val, "e", &i ) )
1142        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1143                                              : ENCRYPTION_PREFERENCE_NO;
1144
1145    /* check supported messages for utorrent pex */
1146    msgs->peerSupportsPex = 0;
1147    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1148        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1149            msgs->ut_pex_id = (uint8_t) i;
1150            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1151            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1152        }
1153    }
1154
1155    /* look for upload_only (BEP 21) */
1156    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1157        fireUploadOnly( msgs, i!=0 );
1158
1159    /* get peer's listening port */
1160    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1161        msgs->peer->port = htons( (uint16_t)i );
1162        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1163    }
1164
1165    /* get peer's maximum request queue size */
1166    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1167        msgs->reqq = i;
1168
1169    tr_bencFree( &val );
1170    tr_free( tmp );
1171}
1172
1173static void
1174parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1175{
1176    int loaded = 0;
1177    uint8_t * tmp = tr_new( uint8_t, msglen );
1178    tr_benc val;
1179    const tr_torrent * tor = msgs->torrent;
1180    const uint8_t * added;
1181    size_t added_len;
1182
1183    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1184
1185    if( tr_torrentAllowsPex( tor )
1186      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1187    {
1188        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1189        {
1190            const uint8_t * added_f = NULL;
1191            tr_pex *        pex;
1192            size_t          i, n;
1193            size_t          added_f_len = 0;
1194            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1195            pex =
1196                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1197                                        &n );
1198            for( i = 0; i < n; ++i )
1199                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1200                                  TR_PEER_FROM_PEX, pex + i );
1201            tr_free( pex );
1202        }
1203       
1204        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1205        {
1206            const uint8_t * added_f = NULL;
1207            tr_pex *        pex;
1208            size_t          i, n;
1209            size_t          added_f_len = 0;
1210            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1211            pex =
1212                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1213                                         &n );
1214            for( i = 0; i < n; ++i )
1215                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1216                                  TR_PEER_FROM_PEX, pex + i );
1217            tr_free( pex );
1218        }
1219       
1220    }
1221
1222    if( loaded )
1223        tr_bencFree( &val );
1224    tr_free( tmp );
1225}
1226
1227static void sendPex( tr_peermsgs * msgs );
1228
1229static void
1230parseLtep( tr_peermsgs *     msgs,
1231           int               msglen,
1232           struct evbuffer * inbuf )
1233{
1234    uint8_t ltep_msgid;
1235
1236    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1237    msglen--;
1238
1239    if( ltep_msgid == LTEP_HANDSHAKE )
1240    {
1241        dbgmsg( msgs, "got ltep handshake" );
1242        parseLtepHandshake( msgs, msglen, inbuf );
1243        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1244        {
1245            sendLtepHandshake( msgs );
1246            sendPex( msgs );
1247        }
1248    }
1249    else if( ltep_msgid == TR_LTEP_PEX )
1250    {
1251        dbgmsg( msgs, "got ut pex" );
1252        msgs->peerSupportsPex = 1;
1253        parseUtPex( msgs, msglen, inbuf );
1254    }
1255    else
1256    {
1257        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1258        evbuffer_drain( inbuf, msglen );
1259    }
1260}
1261
1262static int
1263readBtLength( tr_peermsgs *     msgs,
1264              struct evbuffer * inbuf,
1265              size_t            inlen )
1266{
1267    uint32_t len;
1268
1269    if( inlen < sizeof( len ) )
1270        return READ_LATER;
1271
1272    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1273
1274    if( len == 0 ) /* peer sent us a keepalive message */
1275        dbgmsg( msgs, "got KeepAlive" );
1276    else
1277    {
1278        msgs->incoming.length = len;
1279        msgs->state = AWAITING_BT_ID;
1280    }
1281
1282    return READ_NOW;
1283}
1284
1285static int readBtMessage( tr_peermsgs *     msgs,
1286                          struct evbuffer * inbuf,
1287                          size_t            inlen );
1288
1289static int
1290readBtId( tr_peermsgs *     msgs,
1291          struct evbuffer * inbuf,
1292          size_t            inlen )
1293{
1294    uint8_t id;
1295
1296    if( inlen < sizeof( uint8_t ) )
1297        return READ_LATER;
1298
1299    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1300    msgs->incoming.id = id;
1301
1302    if( id == BT_PIECE )
1303    {
1304        msgs->state = AWAITING_BT_PIECE;
1305        return READ_NOW;
1306    }
1307    else if( msgs->incoming.length != 1 )
1308    {
1309        msgs->state = AWAITING_BT_MESSAGE;
1310        return READ_NOW;
1311    }
1312    else return readBtMessage( msgs, inbuf, inlen - 1 );
1313}
1314
1315static void
1316updatePeerProgress( tr_peermsgs * msgs )
1317{
1318    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1319    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1320    updateFastSet( msgs );
1321    updateInterest( msgs );
1322    firePeerProgress( msgs );
1323}
1324
1325static void
1326peerMadeRequest( tr_peermsgs *               msgs,
1327                 const struct peer_request * req )
1328{
1329    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1330    const int reqIsValid = requestIsValid( msgs, req );
1331    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1332    const int peerIsChoked = msgs->peer->peerIsChoked;
1333
1334    int allow = FALSE;
1335
1336    if( !reqIsValid )
1337        dbgmsg( msgs, "rejecting an invalid request." );
1338    else if( !clientHasPiece )
1339        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1340    else if( peerIsChoked )
1341        dbgmsg( msgs, "rejecting request from choked peer" );
1342    else
1343        allow = TRUE;
1344
1345    if( allow )
1346        reqListAppend( &msgs->peerAskedFor, req );
1347    else if( fext )
1348        protocolSendReject( msgs, req );
1349}
1350
1351static int
1352messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1353{
1354    switch( id )
1355    {
1356        case BT_CHOKE:
1357        case BT_UNCHOKE:
1358        case BT_INTERESTED:
1359        case BT_NOT_INTERESTED:
1360        case BT_FEXT_HAVE_ALL:
1361        case BT_FEXT_HAVE_NONE:
1362            return len == 1;
1363
1364        case BT_HAVE:
1365        case BT_FEXT_SUGGEST:
1366        case BT_FEXT_ALLOWED_FAST:
1367            return len == 5;
1368
1369        case BT_BITFIELD:
1370            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1371
1372        case BT_REQUEST:
1373        case BT_CANCEL:
1374        case BT_FEXT_REJECT:
1375            return len == 13;
1376
1377        case BT_PIECE:
1378            return len > 9 && len <= 16393;
1379
1380        case BT_PORT:
1381            return len == 3;
1382
1383        case BT_LTEP:
1384            return len >= 2;
1385
1386        default:
1387            return FALSE;
1388    }
1389}
1390
1391static int clientGotBlock( tr_peermsgs *               msgs,
1392                           const uint8_t *             block,
1393                           const struct peer_request * req );
1394
1395static int
1396readBtPiece( tr_peermsgs      * msgs,
1397             struct evbuffer  * inbuf,
1398             size_t             inlen,
1399             size_t           * setme_piece_bytes_read )
1400{
1401    struct peer_request * req = &msgs->incoming.blockReq;
1402
1403    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1404    dbgmsg( msgs, "In readBtPiece" );
1405
1406    if( !req->length )
1407    {
1408        if( inlen < 8 )
1409            return READ_LATER;
1410
1411        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1412        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1413        req->length = msgs->incoming.length - 9;
1414        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1415        return READ_NOW;
1416    }
1417    else
1418    {
1419        int err;
1420
1421        /* read in another chunk of data */
1422        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1423        size_t n = MIN( nLeft, inlen );
1424        uint8_t * buf = tr_new( uint8_t, n );
1425        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1426        tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, n );
1427        evbuffer_add( msgs->incoming.block, buf, n );
1428        fireClientGotData( msgs, n, TRUE );
1429        *setme_piece_bytes_read += n;
1430        tr_free( buf );
1431        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1432               n, req->index, req->offset, req->length,
1433               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1434        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1435            return READ_LATER;
1436
1437        /* we've got the whole block ... process it */
1438        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1439
1440        /* cleanup */
1441        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1442        req->length = 0;
1443        msgs->state = AWAITING_BT_LENGTH;
1444        if( !err )
1445            return READ_NOW;
1446        else {
1447            fireError( msgs, err );
1448            return READ_ERR;
1449        }
1450    }
1451}
1452
1453static int
1454readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1455{
1456    uint32_t      ui32;
1457    uint32_t      msglen = msgs->incoming.length;
1458    const uint8_t id = msgs->incoming.id;
1459    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1460    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1461
1462    --msglen; /* id length */
1463
1464    if( inlen < msglen )
1465        return READ_LATER;
1466
1467    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1468
1469    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1470    {
1471        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1472        fireError( msgs, EMSGSIZE );
1473        return READ_ERR;
1474    }
1475
1476    switch( id )
1477    {
1478        case BT_CHOKE:
1479            dbgmsg( msgs, "got Choke" );
1480            msgs->peer->clientIsChoked = 1;
1481            if( !fext )
1482                cancelAllRequestsToPeer( msgs, FALSE );
1483            break;
1484
1485        case BT_UNCHOKE:
1486            dbgmsg( msgs, "got Unchoke" );
1487            msgs->peer->clientIsChoked = 0;
1488            fireNeedReq( msgs );
1489            break;
1490
1491        case BT_INTERESTED:
1492            dbgmsg( msgs, "got Interested" );
1493            msgs->peer->peerIsInterested = 1;
1494            break;
1495
1496        case BT_NOT_INTERESTED:
1497            dbgmsg( msgs, "got Not Interested" );
1498            msgs->peer->peerIsInterested = 0;
1499            break;
1500
1501        case BT_HAVE:
1502            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1503            dbgmsg( msgs, "got Have: %u", ui32 );
1504            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1505                fireError( msgs, ERANGE );
1506                return READ_ERR;
1507            }
1508            updatePeerProgress( msgs );
1509            tr_rcTransferred( msgs->torrent->swarmSpeed,
1510                              msgs->torrent->info.pieceSize );
1511            break;
1512
1513        case BT_BITFIELD:
1514        {
1515            dbgmsg( msgs, "got a bitfield" );
1516            msgs->peerSentBitfield = 1;
1517            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1518            updatePeerProgress( msgs );
1519            fireNeedReq( msgs );
1520            break;
1521        }
1522
1523        case BT_REQUEST:
1524        {
1525            struct peer_request r;
1526            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1527            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1528            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1529            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1530            peerMadeRequest( msgs, &r );
1531            break;
1532        }
1533
1534        case BT_CANCEL:
1535        {
1536            struct peer_request r;
1537            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1538            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1539            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1540            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1541            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1542                protocolSendReject( msgs, &r );
1543            break;
1544        }
1545
1546        case BT_PIECE:
1547            assert( 0 ); /* handled elsewhere! */
1548            break;
1549
1550        case BT_PORT:
1551            dbgmsg( msgs, "Got a BT_PORT" );
1552            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1553            break;
1554
1555        case BT_FEXT_SUGGEST:
1556            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1557            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1558            if( fext )
1559                fireClientGotSuggest( msgs, ui32 );
1560            else {
1561                fireError( msgs, EMSGSIZE );
1562                return READ_ERR;
1563            }
1564            break;
1565
1566        case BT_FEXT_ALLOWED_FAST:
1567            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1568            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1569            if( fext )
1570                fireClientGotAllowedFast( msgs, ui32 );
1571            else {
1572                fireError( msgs, EMSGSIZE );
1573                return READ_ERR;
1574            }
1575            break;
1576
1577        case BT_FEXT_HAVE_ALL:
1578            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1579            if( fext ) {
1580                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1581                updatePeerProgress( msgs );
1582            } else {
1583                fireError( msgs, EMSGSIZE );
1584                return READ_ERR;
1585            }
1586            break;
1587
1588
1589        case BT_FEXT_HAVE_NONE:
1590            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1591            if( fext ) {
1592                tr_bitfieldClear( msgs->peer->have );
1593                updatePeerProgress( msgs );
1594            } else {
1595                fireError( msgs, EMSGSIZE );
1596                return READ_ERR;
1597            }
1598            break;
1599
1600        case BT_FEXT_REJECT:
1601        {
1602            struct peer_request r;
1603            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1604            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1605            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1606            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1607            if( fext )
1608                reqListRemove( &msgs->clientAskedFor, &r );
1609            else {
1610                fireError( msgs, EMSGSIZE );
1611                return READ_ERR;
1612            }
1613            break;
1614        }
1615
1616        case BT_LTEP:
1617            dbgmsg( msgs, "Got a BT_LTEP" );
1618            parseLtep( msgs, msglen, inbuf );
1619            break;
1620
1621        default:
1622            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1623            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1624            break;
1625    }
1626
1627    assert( msglen + 1 == msgs->incoming.length );
1628    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1629
1630    msgs->state = AWAITING_BT_LENGTH;
1631    return READ_NOW;
1632}
1633
1634static void
1635decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1636{
1637    tr_torrent * tor = msgs->torrent;
1638
1639    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1640}
1641
1642static void
1643clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1644{
1645    decrementDownloadedCount( msgs, req->length );
1646}
1647
1648static void
1649addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1650{
1651    if( !msgs->peer->blame )
1652         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1653    tr_bitfieldAdd( msgs->peer->blame, index );
1654}
1655
1656/* returns 0 on success, or an errno on failure */
1657static int
1658clientGotBlock( tr_peermsgs *               msgs,
1659                const uint8_t *             data,
1660                const struct peer_request * req )
1661{
1662    int err;
1663    tr_torrent * tor = msgs->torrent;
1664    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1665
1666    assert( msgs );
1667    assert( req );
1668
1669    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1670        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1671                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1672        return EMSGSIZE;
1673    }
1674
1675    /* save the block */
1676    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1677
1678    /**
1679    *** Remove the block from our `we asked for this' list
1680    **/
1681
1682    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1683        clientGotUnwantedBlock( msgs, req );
1684        dbgmsg( msgs, "we didn't ask for this message..." );
1685        return 0;
1686    }
1687
1688    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1689            msgs->clientAskedFor.count );
1690
1691    /**
1692    *** Error checks
1693    **/
1694
1695    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1696        dbgmsg( msgs, "we have this block already..." );
1697        clientGotUnwantedBlock( msgs, req );
1698        return 0;
1699    }
1700
1701    /**
1702    ***  Save the block
1703    **/
1704
1705    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1706        return err;
1707
1708    addPeerToBlamefield( msgs, req->index );
1709    fireGotBlock( msgs, req );
1710    return 0;
1711}
1712
1713static int peerPulse( void * vmsgs );
1714
1715static void
1716didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1717{
1718    tr_peermsgs * msgs = vmsgs;
1719    firePeerGotData( msgs, bytesWritten, wasPieceData );
1720    peerPulse( msgs );
1721}
1722
1723static ReadState
1724canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1725{
1726    ReadState         ret;
1727    tr_peermsgs *     msgs = vmsgs;
1728    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1729    const size_t      inlen = EVBUFFER_LENGTH( in );
1730
1731    if( !inlen )
1732    {
1733        ret = READ_LATER;
1734    }
1735    else if( msgs->state == AWAITING_BT_PIECE )
1736    {
1737        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1738    }
1739    else switch( msgs->state )
1740    {
1741        case AWAITING_BT_LENGTH:
1742            ret = readBtLength ( msgs, in, inlen ); break;
1743
1744        case AWAITING_BT_ID:
1745            ret = readBtId     ( msgs, in, inlen ); break;
1746
1747        case AWAITING_BT_MESSAGE:
1748            ret = readBtMessage( msgs, in, inlen ); break;
1749
1750        default:
1751            assert( 0 );
1752    }
1753
1754    /* log the raw data that was read */
1755    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1756        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1757
1758    return ret;
1759}
1760
1761/**
1762***
1763**/
1764
1765static int
1766ratePulse( void * vmsgs )
1767{
1768    tr_peermsgs * msgs = vmsgs;
1769    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1770    const int seconds = 10;
1771    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1772
1773    msgs->minActiveRequests = 8;
1774    msgs->maxActiveRequests = msgs->minActiveRequests + estimatedBlocksInPeriod;
1775
1776    if( msgs->reqq > 0 )
1777        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1778
1779    return TRUE;
1780}
1781
1782static size_t
1783fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1784{
1785    size_t bytesWritten = 0;
1786    struct peer_request req;
1787    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1788    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1789
1790    /**
1791    ***  Protocol messages
1792    **/
1793
1794    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1795    {
1796        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1797        msgs->outMessagesBatchedAt = now;
1798    }
1799    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1800    {
1801        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1802        /* flush the protocol messages */
1803        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1804        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1805        msgs->clientSentAnythingAt = now;
1806        msgs->outMessagesBatchedAt = 0;
1807        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1808        bytesWritten +=  len;
1809    }
1810
1811    /**
1812    ***  Blocks
1813    **/
1814
1815    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1816        && popNextRequest( msgs, &req ) )
1817    {
1818        if( requestIsValid( msgs, &req )
1819            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1820        {
1821            /* send a block */
1822            uint8_t * buf = tr_new( uint8_t, req.length );
1823            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1824            if( err ) {
1825                fireError( msgs, err );
1826                bytesWritten = 0;
1827                msgs = NULL;
1828            } else {
1829                tr_peerIo * io = msgs->peer->io;
1830                struct evbuffer * out = evbuffer_new( );
1831                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1832                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1833                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1834                tr_peerIoWriteUint32( io, out, req.index );
1835                tr_peerIoWriteUint32( io, out, req.offset );
1836                tr_peerIoWriteBytes ( io, out, buf, req.length );
1837                tr_peerIoWriteBuf( io, out, TRUE );
1838                bytesWritten += EVBUFFER_LENGTH( out );
1839                evbuffer_free( out );
1840                msgs->clientSentAnythingAt = now;
1841            }
1842            tr_free( buf );
1843        }
1844        else if( fext ) /* peer needs a reject message */
1845        {
1846            protocolSendReject( msgs, &req );
1847        }
1848    }
1849
1850    /**
1851    ***  Keepalive
1852    **/
1853
1854    if( ( msgs != NULL )
1855        && ( msgs->clientSentAnythingAt != 0 )
1856        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1857    {
1858        dbgmsg( msgs, "sending a keepalive message" );
1859        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1860        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1861    }
1862
1863    return bytesWritten;
1864}
1865
1866static int
1867peerPulse( void * vmsgs )
1868{
1869    tr_peermsgs * msgs = vmsgs;
1870    const time_t  now = time( NULL );
1871
1872    ratePulse( msgs );
1873
1874    pumpRequestQueue( msgs, now );
1875    expireOldRequests( msgs, now );
1876
1877    for( ;; )
1878        if( fillOutputBuffer( msgs, now ) < 1 )
1879            break;
1880
1881    return TRUE; /* loop forever */
1882}
1883
1884void
1885tr_peerMsgsPulse( tr_peermsgs * msgs )
1886{
1887    if( msgs )
1888        peerPulse( msgs );
1889}
1890
1891static void
1892gotError( tr_peerIo  * io UNUSED,
1893          short        what,
1894          void       * vmsgs )
1895{
1896    if( what & EVBUFFER_TIMEOUT )
1897        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1898    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1899        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1900               what, errno, tr_strerror( errno ) );
1901    fireError( vmsgs, ENOTCONN );
1902}
1903
1904static void
1905sendBitfield( tr_peermsgs * msgs )
1906{
1907    struct evbuffer * out = msgs->outMessages;
1908    tr_bitfield *     field;
1909    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1910    size_t            i;
1911    size_t            lazyCount = 0;
1912
1913    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1914
1915    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1916    {
1917        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1918            speed over a truly random sample -- let's limit the pool size to
1919            the first 1000 pieces so large torrents don't bog things down */
1920        size_t poolSize;
1921        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1922        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1923
1924        /* build the pool */
1925        for( i=poolSize=0; i<maxPoolSize; ++i )
1926            if( tr_bitfieldHas( field, i ) )
1927                pool[poolSize++] = i;
1928
1929        /* pull random piece indices from the pool */
1930        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1931        {
1932            const int pos = tr_cryptoWeakRandInt( poolSize );
1933            const tr_piece_index_t piece = pool[pos];
1934            tr_bitfieldRem( field, piece );
1935            lazyPieces[lazyCount++] = piece;
1936            pool[pos] = pool[--poolSize];
1937        }
1938
1939        /* cleanup */
1940        tr_free( pool );
1941    }
1942
1943    tr_peerIoWriteUint32( msgs->peer->io, out,
1944                          sizeof( uint8_t ) + field->byteCount );
1945    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1946    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1947    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1948            EVBUFFER_LENGTH( out ) );
1949    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1950
1951    for( i = 0; i < lazyCount; ++i )
1952        protocolSendHave( msgs, lazyPieces[i] );
1953
1954    tr_bitfieldFree( field );
1955}
1956
1957static void
1958tellPeerWhatWeHave( tr_peermsgs * msgs )
1959{
1960    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1961
1962    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1963    {
1964        protocolSendHaveAll( msgs );
1965    }
1966    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1967    {
1968        protocolSendHaveNone( msgs );
1969    }
1970    else
1971    {
1972        sendBitfield( msgs );
1973    }
1974}
1975
1976/**
1977***
1978**/
1979
1980/* some peers give us error messages if we send
1981   more than this many peers in a single pex message
1982   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1983#define MAX_PEX_ADDED 50
1984#define MAX_PEX_DROPPED 50
1985
1986typedef struct
1987{
1988    tr_pex *  added;
1989    tr_pex *  dropped;
1990    tr_pex *  elements;
1991    int       addedCount;
1992    int       droppedCount;
1993    int       elementCount;
1994}
1995PexDiffs;
1996
1997static void
1998pexAddedCb( void * vpex,
1999            void * userData )
2000{
2001    PexDiffs * diffs = userData;
2002    tr_pex *   pex = vpex;
2003
2004    if( diffs->addedCount < MAX_PEX_ADDED )
2005    {
2006        diffs->added[diffs->addedCount++] = *pex;
2007        diffs->elements[diffs->elementCount++] = *pex;
2008    }
2009}
2010
2011static void
2012pexDroppedCb( void * vpex,
2013              void * userData )
2014{
2015    PexDiffs * diffs = userData;
2016    tr_pex *   pex = vpex;
2017
2018    if( diffs->droppedCount < MAX_PEX_DROPPED )
2019    {
2020        diffs->dropped[diffs->droppedCount++] = *pex;
2021    }
2022}
2023
2024static void
2025pexElementCb( void * vpex,
2026              void * userData )
2027{
2028    PexDiffs * diffs = userData;
2029    tr_pex *   pex = vpex;
2030
2031    diffs->elements[diffs->elementCount++] = *pex;
2032}
2033
2034static void
2035sendPex( tr_peermsgs * msgs )
2036{
2037    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2038    {
2039        PexDiffs diffs;
2040        PexDiffs diffs6;
2041        tr_pex * newPex = NULL;
2042        tr_pex * newPex6 = NULL;
2043        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2044                                                 msgs->torrent->info.hash,
2045                                                 &newPex, TR_AF_INET );
2046        const int newCount6 = tr_peerMgrGetPeers( msgs->session->peerMgr,
2047                                                  msgs->torrent->info.hash,
2048                                                  &newPex6, TR_AF_INET6 );
2049
2050        /* build the diffs */
2051        diffs.added = tr_new( tr_pex, newCount );
2052        diffs.addedCount = 0;
2053        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2054        diffs.droppedCount = 0;
2055        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2056        diffs.elementCount = 0;
2057        tr_set_compare( msgs->pex, msgs->pexCount,
2058                        newPex, newCount,
2059                        tr_pexCompare, sizeof( tr_pex ),
2060                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2061        diffs6.added = tr_new( tr_pex, newCount6 );
2062        diffs6.addedCount = 0;
2063        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2064        diffs6.droppedCount = 0;
2065        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2066        diffs6.elementCount = 0;
2067        tr_set_compare( msgs->pex6, msgs->pexCount6,
2068                        newPex6, newCount6,
2069                        tr_pexCompare, sizeof( tr_pex ),
2070                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2071        dbgmsg(
2072            msgs,
2073            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2074            msgs->pexCount, newCount + newCount6,
2075            diffs.addedCount + diffs6.addedCount,
2076            diffs.droppedCount + diffs6.droppedCount );
2077
2078        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2079            !diffs6.droppedCount )
2080        {
2081            tr_free( diffs.elements );
2082            tr_free( diffs6.elements );
2083        }
2084        else
2085        {
2086            int  i;
2087            tr_benc val;
2088            char * benc;
2089            int bencLen;
2090            uint8_t * tmp, *walk;
2091            struct evbuffer * out = msgs->outMessages;
2092
2093            /* update peer */
2094            tr_free( msgs->pex );
2095            msgs->pex = diffs.elements;
2096            msgs->pexCount = diffs.elementCount;
2097            tr_free( msgs->pex6 );
2098            msgs->pex6 = diffs6.elements;
2099            msgs->pexCount6 = diffs6.elementCount;
2100
2101            /* build the pex payload */
2102            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2103                                         * speed vs. likelihood? */
2104
2105            /* "added" */
2106            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2107            for( i = 0; i < diffs.addedCount; ++i )
2108            {
2109                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2110                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2111            }
2112            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2113            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2114            tr_free( tmp );
2115
2116            /* "added.f" */
2117            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2118            for( i = 0; i < diffs.addedCount; ++i )
2119                *walk++ = diffs.added[i].flags;
2120            assert( ( walk - tmp ) == diffs.addedCount );
2121            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2122            tr_free( tmp );
2123
2124            /* "dropped" */
2125            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2126            for( i = 0; i < diffs.droppedCount; ++i )
2127            {
2128                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2129                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2130            }
2131            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2132            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2133            tr_free( tmp );
2134           
2135            /* "added6" */
2136            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2137            for( i = 0; i < diffs6.addedCount; ++i )
2138            {
2139                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2140                walk += 16;
2141                memcpy( walk, &diffs6.added[i].port, 2 );
2142                walk += 2;
2143            }
2144            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2145            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2146            tr_free( tmp );
2147           
2148            /* "added6.f" */
2149            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2150            for( i = 0; i < diffs6.addedCount; ++i )
2151                *walk++ = diffs6.added[i].flags;
2152            assert( ( walk - tmp ) == diffs6.addedCount );
2153            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2154            tr_free( tmp );
2155           
2156            /* "dropped6" */
2157            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2158            for( i = 0; i < diffs6.droppedCount; ++i )
2159            {
2160                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2161                walk += 16;
2162                memcpy( walk, &diffs6.dropped[i].port, 2 );
2163                walk += 2;
2164            }
2165            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2166            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2167            tr_free( tmp );
2168
2169            /* write the pex message */
2170            benc = tr_bencSave( &val, &bencLen );
2171            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2172            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2173            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2174            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2175            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2176            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2177
2178            tr_free( benc );
2179            tr_bencFree( &val );
2180        }
2181
2182        /* cleanup */
2183        tr_free( diffs.added );
2184        tr_free( diffs.dropped );
2185        tr_free( newPex );
2186        tr_free( diffs6.added );
2187        tr_free( diffs6.dropped );
2188        tr_free( newPex6 );
2189
2190        msgs->clientSentPexAt = time( NULL );
2191    }
2192}
2193
2194static int
2195pexPulse( void * vpeer )
2196{
2197    sendPex( vpeer );
2198    return TRUE;
2199}
2200
2201/**
2202***
2203**/
2204
2205tr_peermsgs*
2206tr_peerMsgsNew( struct tr_torrent * torrent,
2207                struct tr_peer *    peer,
2208                tr_delivery_func    func,
2209                void *              userData,
2210                tr_publisher_tag *  setme )
2211{
2212    tr_peermsgs * m;
2213
2214    assert( peer );
2215    assert( peer->io );
2216
2217    m = tr_new0( tr_peermsgs, 1 );
2218    m->publisher = tr_publisherNew( );
2219    m->peer = peer;
2220    m->session = torrent->session;
2221    m->torrent = torrent;
2222    m->peer->clientIsChoked = 1;
2223    m->peer->peerIsChoked = 1;
2224    m->peer->clientIsInterested = 0;
2225    m->peer->peerIsInterested = 0;
2226    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2227    m->state = AWAITING_BT_LENGTH;
2228    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2229    m->outMessages = evbuffer_new( );
2230    m->outMessagesBatchedAt = 0;
2231    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2232    m->incoming.block = evbuffer_new( );
2233    m->peerAskedFor = REQUEST_LIST_INIT;
2234    m->clientAskedFor = REQUEST_LIST_INIT;
2235    m->clientWillAskFor = REQUEST_LIST_INIT;
2236    peer->msgs = m;
2237
2238    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2239
2240    if( tr_peerIoSupportsLTEP( peer->io ) )
2241        sendLtepHandshake( m );
2242
2243    tellPeerWhatWeHave( m );
2244
2245    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2246    ratePulse( m );
2247
2248    return m;
2249}
2250
2251void
2252tr_peerMsgsFree( tr_peermsgs* msgs )
2253{
2254    if( msgs )
2255    {
2256        tr_timerFree( &msgs->pexTimer );
2257        tr_publisherFree( &msgs->publisher );
2258        reqListClear( &msgs->clientWillAskFor );
2259        reqListClear( &msgs->clientAskedFor );
2260        reqListClear( &msgs->peerAskedFor );
2261
2262        evbuffer_free( msgs->incoming.block );
2263        evbuffer_free( msgs->outMessages );
2264        tr_free( msgs->pex6 );
2265        tr_free( msgs->pex );
2266
2267        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2268        tr_free( msgs );
2269    }
2270}
2271
2272void
2273tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2274                        tr_publisher_tag tag )
2275{
2276    tr_publisherUnsubscribe( peer->publisher, tag );
2277}
2278
Note: See TracBrowser for help on using the repository browser.