source: trunk/libtransmission/peer-msgs.c @ 7606

Last change on this file since 7606 was 7606, checked in by charles, 12 years ago

(trunk libT) minor improvement to testing to see if two requests are equal

  • Property svn:keywords set to Date Rev Author Id
File size: 65.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7606 2009-01-04 02:49:30Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
36#include "ratecontrol.h"
37#include "stats.h"
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    TR_LTEP_PEX             = 1,
70
71
72
73    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
74
75    /* idle seconds before we send a keepalive */
76    KEEPALIVE_INTERVAL_SECS = 100,
77
78    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
79
80
81    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
82
83
84    /* how long an unsent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    QUEUED_REQUEST_TTL_SECS = 20,
87
88    /* how long a sent request can stay queued before it's returned
89       back to the peer-mgr's pool of requests */
90    SENT_REQUEST_TTL_SECS = 240,
91
92    /* used in lowering the outMessages queue period */
93    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
94    HIGH_PRIORITY_INTERVAL_SECS = 2,
95    LOW_PRIORITY_INTERVAL_SECS = 20,
96
97    /* number of pieces to remove from the bitfield when
98     * lazy bitfields are turned on */
99    LAZY_PIECE_COUNT = 26,
100
101    /* number of pieces we'll allow in our fast set */
102    MAX_FAST_SET_SIZE = 3
103};
104
105/**
106***  REQUEST MANAGEMENT
107**/
108
109enum
110{
111    AWAITING_BT_LENGTH,
112    AWAITING_BT_ID,
113    AWAITING_BT_MESSAGE,
114    AWAITING_BT_PIECE
115};
116
117struct peer_request
118{
119    uint32_t    index;
120    uint32_t    offset;
121    uint32_t    length;
122    time_t      time_requested;
123};
124
125static inline tr_bool
126requestsMatch( const struct peer_request * a, const struct peer_request * b )
127{
128    return (a->index==b->index) && (a->offset==b->offset) && (a->length==b->length);
129}
130
131struct request_list
132{
133    uint16_t               count;
134    uint16_t               max;
135    struct peer_request *  requests;
136};
137
138static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
139
140static void
141reqListReserve( struct request_list * list,
142                uint16_t              max )
143{
144    if( list->max < max )
145    {
146        list->max = max;
147        list->requests = tr_renew( struct peer_request,
148                                   list->requests,
149                                   list->max );
150    }
151}
152
153static void
154reqListClear( struct request_list * list )
155{
156    tr_free( list->requests );
157    *list = REQUEST_LIST_INIT;
158}
159
160static void
161reqListCopy( struct request_list * dest, const struct request_list * src )
162{
163    dest->count = dest->max = src->count;
164    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
165}
166
167static void
168reqListRemoveOne( struct request_list * list,
169                  int                   i )
170{
171    assert( 0 <= i && i < list->count );
172
173    memmove( &list->requests[i],
174             &list->requests[i + 1],
175             sizeof( struct peer_request ) * ( --list->count - i ) );
176}
177
178static void
179reqListAppend( struct request_list *       list,
180               const struct peer_request * req )
181{
182    if( ++list->count >= list->max )
183        reqListReserve( list, list->max + 8 );
184
185    list->requests[list->count - 1] = *req;
186}
187
188static int
189reqListPop( struct request_list * list,
190            struct peer_request * setme )
191{
192    int success;
193
194    if( !list->count )
195        success = FALSE;
196    else {
197        *setme = list->requests[0];
198        reqListRemoveOne( list, 0 );
199        success = TRUE;
200    }
201
202    return success;
203}
204
205static int
206reqListFind( struct request_list *       list,
207             const struct peer_request * key )
208{
209    uint16_t i;
210
211    for( i = 0; i < list->count; ++i )
212        if( requestsMatch( key, list->requests + i ) )
213            return i;
214
215    return -1;
216}
217
218static int
219reqListRemove( struct request_list *       list,
220               const struct peer_request * key )
221{
222    int success;
223    const int i = reqListFind( list, key );
224
225    if( i < 0 )
226        success = FALSE;
227    else {
228        reqListRemoveOne( list, i );
229        success = TRUE;
230    }
231
232    return success;
233}
234
235/**
236***
237**/
238
239/* this is raw, unchanged data from the peer regarding
240 * the current message that it's sending us. */
241struct tr_incoming
242{
243    uint8_t                id;
244    uint32_t               length; /* includes the +1 for id length */
245    struct peer_request    blockReq; /* metadata for incoming blocks */
246    struct evbuffer *      block; /* piece data for incoming blocks */
247};
248
249/**
250 * Low-level communication state information about a connected peer.
251 *
252 * This structure remembers the low-level protocol states that we're
253 * in with this peer, such as active requests, pex messages, and so on.
254 * Its fields are all private to peer-msgs.c.
255 *
256 * Data not directly involved with sending & receiving messages is
257 * stored in tr_peer, where it can be accessed by both peermsgs and
258 * the peer manager.
259 *
260 * @see struct peer_atom
261 * @see tr_peer
262 */
263struct tr_peermsgs
264{
265    tr_bool         peerSupportsPex;
266    tr_bool         clientSentLtepHandshake;
267    tr_bool         peerSentLtepHandshake;
268    tr_bool         haveFastSet;
269
270    uint8_t         state;
271    uint8_t         ut_pex_id;
272    uint16_t        pexCount;
273    uint16_t        pexCount6;
274    uint16_t        maxActiveRequests;
275
276    size_t                 fastsetSize;
277    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
278
279    /* how long the outMessages batch should be allowed to grow before
280     * it's flushed -- some messages (like requests >:) should be sent
281     * very quickly; others aren't as urgent. */
282    int                    outMessagesBatchPeriod;
283
284    tr_peer *              peer;
285
286    tr_session *           session;
287    tr_torrent *           torrent;
288
289    tr_publisher           publisher;
290
291    struct evbuffer *      outMessages; /* all the non-piece messages */
292
293    struct request_list    peerAskedFor;
294    struct request_list    clientAskedFor;
295    struct request_list    clientWillAskFor;
296
297    tr_timer             * pexTimer;
298    tr_pex               * pex;
299    tr_pex               * pex6;
300
301    time_t                 clientSentPexAt;
302    time_t                 clientSentAnythingAt;
303
304    /* when we started batching the outMessages */
305    time_t                outMessagesBatchedAt;
306
307    struct tr_incoming    incoming;
308
309    /* if the peer supports the Extension Protocol in BEP 10 and
310       supplied a reqq argument, it's stored here.  otherwise the
311       value is zero and should be ignored. */
312    int64_t               reqq;
313};
314
315/**
316***
317**/
318
319static void
320myDebug( const char * file, int line,
321         const struct tr_peermsgs * msgs,
322         const char * fmt, ... )
323{
324    FILE * fp = tr_getLog( );
325
326    if( fp )
327    {
328        va_list           args;
329        char              timestr[64];
330        struct evbuffer * buf = tr_getBuffer( );
331        char *            base = tr_basename( file );
332
333        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
334                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
335                             msgs->torrent->info.name,
336                             tr_peerIoGetAddrStr( msgs->peer->io ),
337                             msgs->peer->client );
338        va_start( args, fmt );
339        evbuffer_add_vprintf( buf, fmt, args );
340        va_end( args );
341        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
342        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
343
344        tr_free( base );
345        tr_releaseBuffer( buf );
346    }
347}
348
349#define dbgmsg( msgs, ... ) \
350    do { \
351        if( tr_deepLoggingIsActive( ) ) \
352            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
353    } while( 0 )
354
355/**
356***
357**/
358
359static void
360pokeBatchPeriod( tr_peermsgs * msgs,
361                 int           interval )
362{
363    if( msgs->outMessagesBatchPeriod > interval )
364    {
365        msgs->outMessagesBatchPeriod = interval;
366        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
367    }
368}
369
370static inline void
371dbgOutMessageLen( tr_peermsgs * msgs )
372{
373    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
374}
375
376static void
377protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
378{
379    tr_peerIo       * io  = msgs->peer->io;
380    struct evbuffer * out = msgs->outMessages;
381
382    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
383
384    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
385    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
386    tr_peerIoWriteUint32( io, out, req->index );
387    tr_peerIoWriteUint32( io, out, req->offset );
388    tr_peerIoWriteUint32( io, out, req->length );
389
390    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
391    dbgOutMessageLen( msgs );
392}
393
394static void
395protocolSendRequest( tr_peermsgs               * msgs,
396                     const struct peer_request * req )
397{
398    tr_peerIo       * io  = msgs->peer->io;
399    struct evbuffer * out = msgs->outMessages;
400
401    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
402    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
403    tr_peerIoWriteUint32( io, out, req->index );
404    tr_peerIoWriteUint32( io, out, req->offset );
405    tr_peerIoWriteUint32( io, out, req->length );
406
407    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
408    dbgOutMessageLen( msgs );
409    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
410}
411
412static void
413protocolSendCancel( tr_peermsgs               * msgs,
414                    const struct peer_request * req )
415{
416    tr_peerIo       * io  = msgs->peer->io;
417    struct evbuffer * out = msgs->outMessages;
418
419    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
420    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
421    tr_peerIoWriteUint32( io, out, req->index );
422    tr_peerIoWriteUint32( io, out, req->offset );
423    tr_peerIoWriteUint32( io, out, req->length );
424
425    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
426    dbgOutMessageLen( msgs );
427    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
428}
429
430static void
431protocolSendHave( tr_peermsgs * msgs,
432                  uint32_t      index )
433{
434    tr_peerIo       * io  = msgs->peer->io;
435    struct evbuffer * out = msgs->outMessages;
436
437    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
438    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
439    tr_peerIoWriteUint32( io, out, index );
440
441    dbgmsg( msgs, "sending Have %u", index );
442    dbgOutMessageLen( msgs );
443    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
444}
445
446#if 0
447static void
448protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
449{
450    tr_peerIo       * io  = msgs->peer->io;
451    struct evbuffer * out = msgs->outMessages;
452
453    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
454
455    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
456    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
457    tr_peerIoWriteUint32( io, out, pieceIndex );
458
459    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
460    dbgOutMessageLen( msgs );
461}
462#endif
463
464static void
465protocolSendChoke( tr_peermsgs * msgs,
466                   int           choke )
467{
468    tr_peerIo       * io  = msgs->peer->io;
469    struct evbuffer * out = msgs->outMessages;
470
471    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
472    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
473
474    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
475    dbgOutMessageLen( msgs );
476    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
477}
478
479static void
480protocolSendHaveAll( tr_peermsgs * msgs )
481{
482    tr_peerIo       * io  = msgs->peer->io;
483    struct evbuffer * out = msgs->outMessages;
484
485    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
486
487    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
488    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
489
490    dbgmsg( msgs, "sending HAVE_ALL..." );
491    dbgOutMessageLen( msgs );
492    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
493}
494
495static void
496protocolSendHaveNone( tr_peermsgs * msgs )
497{
498    tr_peerIo       * io  = msgs->peer->io;
499    struct evbuffer * out = msgs->outMessages;
500
501    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
502
503    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
504    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
505
506    dbgmsg( msgs, "sending HAVE_NONE..." );
507    dbgOutMessageLen( msgs );
508    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
509}
510
511/**
512***  EVENTS
513**/
514
515static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
516
517static void
518publish( tr_peermsgs * msgs, tr_peer_event * e )
519{
520    assert( msgs->peer );
521    assert( msgs->peer->msgs == msgs );
522
523    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
524}
525
526static void
527fireError( tr_peermsgs * msgs, int err )
528{
529    tr_peer_event e = blankEvent;
530    e.eventType = TR_PEER_ERROR;
531    e.err = err;
532    publish( msgs, &e );
533}
534
535static void
536fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
537{
538    tr_peer_event e = blankEvent;
539    e.eventType = TR_PEER_UPLOAD_ONLY;
540    e.uploadOnly = uploadOnly;
541    publish( msgs, &e );
542}
543
544static void
545fireNeedReq( tr_peermsgs * msgs )
546{
547    tr_peer_event e = blankEvent;
548    e.eventType = TR_PEER_NEED_REQ;
549    publish( msgs, &e );
550}
551
552static void
553firePeerProgress( tr_peermsgs * msgs )
554{
555    tr_peer_event e = blankEvent;
556    e.eventType = TR_PEER_PEER_PROGRESS;
557    e.progress = msgs->peer->progress;
558    publish( msgs, &e );
559}
560
561static void
562fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
563{
564    tr_peer_event e = blankEvent;
565    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
566    e.pieceIndex = req->index;
567    e.offset = req->offset;
568    e.length = req->length;
569    publish( msgs, &e );
570}
571
572static void
573fireClientGotData( tr_peermsgs * msgs,
574                   uint32_t      length,
575                   int           wasPieceData )
576{
577    tr_peer_event e = blankEvent;
578
579    e.length = length;
580    e.eventType = TR_PEER_CLIENT_GOT_DATA;
581    e.wasPieceData = wasPieceData;
582    publish( msgs, &e );
583}
584
585static void
586fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
587{
588    tr_peer_event e = blankEvent;
589    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
590    e.pieceIndex = pieceIndex;
591    publish( msgs, &e );
592}
593
594static void
595fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
596{
597    tr_peer_event e = blankEvent;
598    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
599    e.pieceIndex = pieceIndex;
600    publish( msgs, &e );
601}
602
603static void
604firePeerGotData( tr_peermsgs  * msgs,
605                 uint32_t       length,
606                 int            wasPieceData )
607{
608    tr_peer_event e = blankEvent;
609
610    e.length = length;
611    e.eventType = TR_PEER_PEER_GOT_DATA;
612    e.wasPieceData = wasPieceData;
613
614    publish( msgs, &e );
615}
616
617static void
618fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
619{
620    tr_peer_event e = blankEvent;
621    e.eventType = TR_PEER_CANCEL;
622    e.pieceIndex = req->index;
623    e.offset = req->offset;
624    e.length = req->length;
625    publish( msgs, &e );
626}
627
628/**
629***  ALLOWED FAST SET
630***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
631**/
632
633size_t
634tr_generateAllowedSet( tr_piece_index_t * setmePieces,
635                       size_t             desiredSetSize,
636                       size_t             pieceCount,
637                       const uint8_t    * infohash,
638                       const tr_address * addr )
639{
640    size_t setSize = 0;
641
642    assert( setmePieces );
643    assert( desiredSetSize <= pieceCount );
644    assert( desiredSetSize );
645    assert( pieceCount );
646    assert( infohash );
647    assert( addr );
648
649    if( addr->type == TR_AF_INET )
650    {
651        uint8_t w[SHA_DIGEST_LENGTH + 4];
652        uint8_t x[SHA_DIGEST_LENGTH];
653
654        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
655        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
656        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
657
658        while( setSize<desiredSetSize )
659        {
660            int i;
661            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
662            {
663                size_t k;
664                uint32_t j = i * 4;                                  /* (5) */
665                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
666                uint32_t index = y % pieceCount;                     /* (7) */
667
668                for( k=0; k<setSize; ++k )                           /* (8) */
669                    if( setmePieces[k] == index )
670                        break;
671
672                if( k == setSize )
673                    setmePieces[setSize++] = index;                  /* (9) */
674            }
675
676            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
677        }
678    }
679
680    return setSize;
681}
682
683static void
684updateFastSet( tr_peermsgs * msgs UNUSED )
685{
686#if 0
687    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
688    const int peerIsNeedy = msgs->peer->progress < 0.10;
689
690    if( fext && peerIsNeedy && !msgs->haveFastSet )
691    {
692        size_t i;
693        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
694        const tr_info * inf = &msgs->torrent->info;
695        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
696
697        /* build the fast set */
698        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
699        msgs->haveFastSet = 1;
700
701        /* send it to the peer */
702        for( i=0; i<msgs->fastsetSize; ++i )
703            protocolSendAllowedFast( msgs, msgs->fastset[i] );
704    }
705#endif
706}
707
708/**
709***  INTEREST
710**/
711
712static int
713isPieceInteresting( const tr_peermsgs * msgs,
714                    tr_piece_index_t    piece )
715{
716    const tr_torrent * torrent = msgs->torrent;
717
718    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
719          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
720          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
721}
722
723/* "interested" means we'll ask for piece data if they unchoke us */
724static int
725isPeerInteresting( const tr_peermsgs * msgs )
726{
727    tr_piece_index_t    i;
728    const tr_torrent *  torrent;
729    const tr_bitfield * bitfield;
730    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
731
732    if( clientIsSeed )
733        return FALSE;
734
735    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
736        return FALSE;
737
738    torrent = msgs->torrent;
739    bitfield = tr_cpPieceBitfield( &torrent->completion );
740
741    if( !msgs->peer->have )
742        return TRUE;
743
744    assert( bitfield->byteCount == msgs->peer->have->byteCount );
745
746    for( i = 0; i < torrent->info.pieceCount; ++i )
747        if( isPieceInteresting( msgs, i ) )
748            return TRUE;
749
750    return FALSE;
751}
752
753static void
754sendInterest( tr_peermsgs * msgs,
755              int           weAreInterested )
756{
757    struct evbuffer * out = msgs->outMessages;
758
759    assert( msgs );
760    assert( weAreInterested == 0 || weAreInterested == 1 );
761
762    msgs->peer->clientIsInterested = weAreInterested;
763    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
764    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
765    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
766
767    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
768    dbgOutMessageLen( msgs );
769}
770
771static void
772updateInterest( tr_peermsgs * msgs )
773{
774    const int i = isPeerInteresting( msgs );
775
776    if( i != msgs->peer->clientIsInterested )
777        sendInterest( msgs, i );
778    if( i )
779        fireNeedReq( msgs );
780}
781
782static int
783popNextRequest( tr_peermsgs *         msgs,
784                struct peer_request * setme )
785{
786    return reqListPop( &msgs->peerAskedFor, setme );
787}
788
789static void
790cancelAllRequestsToClient( tr_peermsgs * msgs )
791{
792    struct peer_request req;
793    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
794
795    while( popNextRequest( msgs, &req ) )
796        if( mustSendCancel )
797            protocolSendReject( msgs, &req );
798}
799
800void
801tr_peerMsgsSetChoke( tr_peermsgs * msgs,
802                     int           choke )
803{
804    const time_t now = time( NULL );
805    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
806
807    assert( msgs );
808    assert( msgs->peer );
809    assert( choke == 0 || choke == 1 );
810
811    if( msgs->peer->chokeChangedAt > fibrillationTime )
812    {
813        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
814    }
815    else if( msgs->peer->peerIsChoked != choke )
816    {
817        msgs->peer->peerIsChoked = choke;
818        if( choke )
819            cancelAllRequestsToClient( msgs );
820        protocolSendChoke( msgs, choke );
821        msgs->peer->chokeChangedAt = now;
822    }
823}
824
825/**
826***
827**/
828
829void
830tr_peerMsgsHave( tr_peermsgs * msgs,
831                 uint32_t      index )
832{
833    protocolSendHave( msgs, index );
834
835    /* since we have more pieces now, we might not be interested in this peer */
836    updateInterest( msgs );
837}
838
839/**
840***
841**/
842
843static int
844reqIsValid( const tr_peermsgs * peer,
845            uint32_t            index,
846            uint32_t            offset,
847            uint32_t            length )
848{
849    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
850}
851
852static int
853requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
854{
855    return reqIsValid( msgs, req->index, req->offset, req->length );
856}
857
858static void
859expireFromList( tr_peermsgs          * msgs,
860                struct request_list  * list,
861                const time_t           oldestAllowed )
862{
863    int i;
864
865    /* walk through the list, looking for the first req that's too old */
866    for( i=0; i<list->count; ++i ) {
867        const struct peer_request * req = &list->requests[i];
868        if( req->time_requested < oldestAllowed )
869            break;
870    }
871
872    /* if we found one too old, start pruning them */
873    if( i < list->count ) {
874        struct request_list tmp = REQUEST_LIST_INIT;
875        reqListCopy( &tmp, list );
876        for( ; i<tmp.count; ++i ) {
877            const struct peer_request * req = &tmp.requests[i];
878            if( req->time_requested < oldestAllowed )
879                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
880        }
881        reqListClear( &tmp );
882    }
883}
884
885static void
886expireOldRequests( tr_peermsgs * msgs, const time_t now  )
887{
888    time_t oldestAllowed;
889    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
890    dbgmsg( msgs, "entering `expire old requests' block" );
891
892    /* cancel requests that have been queued for too long */
893    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
894    expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed );
895
896    /* if the peer doesn't support "Reject Request",
897     * cancel requests that were sent too long ago. */
898    if( !fext ) {
899        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
900        expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed );
901    }
902
903    dbgmsg( msgs, "leaving `expire old requests' block" );
904}
905
906static void
907pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
908{
909    const int           max = msgs->maxActiveRequests;
910    int                 sent = 0;
911    int                 count = msgs->clientAskedFor.count;
912    struct peer_request req;
913
914    if( msgs->peer->clientIsChoked )
915        return;
916    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
917        return;
918
919    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
920    {
921        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
922
923        assert( requestIsValid( msgs, &req ) );
924        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
925
926        /* don't ask for it if we've already got it... this block may have
927         * come in from a different peer after we cancelled a request for it */
928        if( !tr_cpBlockIsComplete( &msgs->torrent->completion, block ) )
929        {
930            protocolSendRequest( msgs, &req );
931            req.time_requested = now;
932            reqListAppend( &msgs->clientAskedFor, &req );
933
934            ++count;
935            ++sent;
936        }
937    }
938
939    if( sent )
940        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
941                sent, msgs->clientAskedFor.count, msgs->clientWillAskFor.count );
942
943    if( count < max )
944        fireNeedReq( msgs );
945}
946
947static inline int
948requestQueueIsFull( const tr_peermsgs * msgs )
949{
950    const int req_max = msgs->maxActiveRequests;
951    return msgs->clientWillAskFor.count >= req_max;
952}
953
954tr_addreq_t
955tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
956                       uint32_t         index,
957                       uint32_t         offset,
958                       uint32_t         length )
959{
960    struct peer_request req;
961
962    assert( msgs );
963    assert( msgs->torrent );
964    assert( reqIsValid( msgs, index, offset, length ) );
965
966    /**
967    ***  Reasons to decline the request
968    **/
969
970    /* don't send requests to choked clients */
971    if( msgs->peer->clientIsChoked ) {
972        dbgmsg( msgs, "declining request because they're choking us" );
973        return TR_ADDREQ_CLIENT_CHOKED;
974    }
975
976    /* peer doesn't have this piece */
977    if( !tr_bitfieldHas( msgs->peer->have, index ) )
978        return TR_ADDREQ_MISSING;
979
980    /* peer's queue is full */
981    if( requestQueueIsFull( msgs ) ) {
982        dbgmsg( msgs, "declining request because we're full" );
983        return TR_ADDREQ_FULL;
984    }
985
986    /* have we already asked for this piece? */
987    req.index = index;
988    req.offset = offset;
989    req.length = length;
990    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
991        dbgmsg( msgs, "declining because it's a duplicate" );
992        return TR_ADDREQ_DUPLICATE;
993    }
994    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
995        dbgmsg( msgs, "declining because it's a duplicate" );
996        return TR_ADDREQ_DUPLICATE;
997    }
998
999    /**
1000    ***  Accept this request
1001    **/
1002
1003    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1004            index, offset, length );
1005    req.time_requested = time( NULL );
1006    reqListAppend( &msgs->clientWillAskFor, &req );
1007    return TR_ADDREQ_OK;
1008}
1009
1010static void
1011cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1012{
1013    int i;
1014    struct request_list a = msgs->clientWillAskFor;
1015    struct request_list b = msgs->clientAskedFor;
1016    dbgmsg( msgs, "cancelling all requests to peer" );
1017
1018    msgs->clientAskedFor = REQUEST_LIST_INIT;
1019    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1020
1021    for( i=0; i<a.count; ++i )
1022        fireCancelledReq( msgs, &a.requests[i] );
1023
1024    for( i = 0; i < b.count; ++i ) {
1025        fireCancelledReq( msgs, &b.requests[i] );
1026        if( sendCancel )
1027            protocolSendCancel( msgs, &b.requests[i] );
1028    }
1029
1030    reqListClear( &a );
1031    reqListClear( &b );
1032}
1033
1034void
1035tr_peerMsgsCancel( tr_peermsgs * msgs,
1036                   uint32_t      pieceIndex,
1037                   uint32_t      offset,
1038                   uint32_t      length )
1039{
1040    struct peer_request req;
1041
1042    assert( msgs != NULL );
1043    assert( length > 0 );
1044
1045
1046    /* have we asked the peer for this piece? */
1047    req.index = pieceIndex;
1048    req.offset = offset;
1049    req.length = length;
1050
1051    /* if it's only in the queue and hasn't been sent yet, free it */
1052    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1053        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1054        fireCancelledReq( msgs, &req );
1055    }
1056
1057    /* if it's already been sent, send a cancel message too */
1058    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1059        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1060        protocolSendCancel( msgs, &req );
1061        fireCancelledReq( msgs, &req );
1062    }
1063}
1064
1065
1066/**
1067***
1068**/
1069
1070static void
1071sendLtepHandshake( tr_peermsgs * msgs )
1072{
1073    tr_benc val, *m;
1074    char * buf;
1075    int len;
1076    int pex;
1077    struct evbuffer * out = msgs->outMessages;
1078
1079    if( msgs->clientSentLtepHandshake )
1080        return;
1081
1082    dbgmsg( msgs, "sending an ltep handshake" );
1083    msgs->clientSentLtepHandshake = 1;
1084
1085    /* decide if we want to advertise pex support */
1086    if( !tr_torrentAllowsPex( msgs->torrent ) )
1087        pex = 0;
1088    else if( msgs->peerSentLtepHandshake )
1089        pex = msgs->peerSupportsPex ? 1 : 0;
1090    else
1091        pex = 1;
1092
1093    tr_bencInitDict( &val, 5 );
1094    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1095    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1096    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1097    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1098    m  = tr_bencDictAddDict( &val, "m", 1 );
1099    if( pex )
1100        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1101    buf = tr_bencSave( &val, &len );
1102
1103    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1104    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1105    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1106    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1107    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1108    dbgOutMessageLen( msgs );
1109
1110    /* cleanup */
1111    tr_bencFree( &val );
1112    tr_free( buf );
1113}
1114
1115static void
1116parseLtepHandshake( tr_peermsgs *     msgs,
1117                    int               len,
1118                    struct evbuffer * inbuf )
1119{
1120    int64_t   i;
1121    tr_benc   val, * sub;
1122    uint8_t * tmp = tr_new( uint8_t, len );
1123
1124    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1125    msgs->peerSentLtepHandshake = 1;
1126
1127    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1128    {
1129        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1130        tr_free( tmp );
1131        return;
1132    }
1133
1134    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1135
1136    /* does the peer prefer encrypted connections? */
1137    if( tr_bencDictFindInt( &val, "e", &i ) )
1138        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1139                                              : ENCRYPTION_PREFERENCE_NO;
1140
1141    /* check supported messages for utorrent pex */
1142    msgs->peerSupportsPex = 0;
1143    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1144        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1145            msgs->ut_pex_id = (uint8_t) i;
1146            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1147            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1148        }
1149    }
1150
1151    /* look for upload_only (BEP 21) */
1152    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1153        fireUploadOnly( msgs, i!=0 );
1154
1155    /* get peer's listening port */
1156    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1157        msgs->peer->port = htons( (uint16_t)i );
1158        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1159    }
1160
1161    /* get peer's maximum request queue size */
1162    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1163        msgs->reqq = i;
1164
1165    tr_bencFree( &val );
1166    tr_free( tmp );
1167}
1168
1169static void
1170parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1171{
1172    int loaded = 0;
1173    uint8_t * tmp = tr_new( uint8_t, msglen );
1174    tr_benc val;
1175    const tr_torrent * tor = msgs->torrent;
1176    const uint8_t * added;
1177    size_t added_len;
1178
1179    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1180
1181    if( tr_torrentAllowsPex( tor )
1182      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1183    {
1184        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1185        {
1186            const uint8_t * added_f = NULL;
1187            tr_pex *        pex;
1188            size_t          i, n;
1189            size_t          added_f_len = 0;
1190            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1191            pex =
1192                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1193                                        &n );
1194            for( i = 0; i < n; ++i )
1195                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1196                                  TR_PEER_FROM_PEX, pex + i );
1197            tr_free( pex );
1198        }
1199       
1200        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1201        {
1202            const uint8_t * added_f = NULL;
1203            tr_pex *        pex;
1204            size_t          i, n;
1205            size_t          added_f_len = 0;
1206            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1207            pex =
1208                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1209                                         &n );
1210            for( i = 0; i < n; ++i )
1211                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1212                                  TR_PEER_FROM_PEX, pex + i );
1213            tr_free( pex );
1214        }
1215       
1216    }
1217
1218    if( loaded )
1219        tr_bencFree( &val );
1220    tr_free( tmp );
1221}
1222
1223static void sendPex( tr_peermsgs * msgs );
1224
1225static void
1226parseLtep( tr_peermsgs *     msgs,
1227           int               msglen,
1228           struct evbuffer * inbuf )
1229{
1230    uint8_t ltep_msgid;
1231
1232    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1233    msglen--;
1234
1235    if( ltep_msgid == LTEP_HANDSHAKE )
1236    {
1237        dbgmsg( msgs, "got ltep handshake" );
1238        parseLtepHandshake( msgs, msglen, inbuf );
1239        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1240        {
1241            sendLtepHandshake( msgs );
1242            sendPex( msgs );
1243        }
1244    }
1245    else if( ltep_msgid == TR_LTEP_PEX )
1246    {
1247        dbgmsg( msgs, "got ut pex" );
1248        msgs->peerSupportsPex = 1;
1249        parseUtPex( msgs, msglen, inbuf );
1250    }
1251    else
1252    {
1253        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1254        evbuffer_drain( inbuf, msglen );
1255    }
1256}
1257
1258static int
1259readBtLength( tr_peermsgs *     msgs,
1260              struct evbuffer * inbuf,
1261              size_t            inlen )
1262{
1263    uint32_t len;
1264
1265    if( inlen < sizeof( len ) )
1266        return READ_LATER;
1267
1268    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1269
1270    if( len == 0 ) /* peer sent us a keepalive message */
1271        dbgmsg( msgs, "got KeepAlive" );
1272    else
1273    {
1274        msgs->incoming.length = len;
1275        msgs->state = AWAITING_BT_ID;
1276    }
1277
1278    return READ_NOW;
1279}
1280
1281static int readBtMessage( tr_peermsgs *     msgs,
1282                          struct evbuffer * inbuf,
1283                          size_t            inlen );
1284
1285static int
1286readBtId( tr_peermsgs *     msgs,
1287          struct evbuffer * inbuf,
1288          size_t            inlen )
1289{
1290    uint8_t id;
1291
1292    if( inlen < sizeof( uint8_t ) )
1293        return READ_LATER;
1294
1295    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1296    msgs->incoming.id = id;
1297
1298    if( id == BT_PIECE )
1299    {
1300        msgs->state = AWAITING_BT_PIECE;
1301        return READ_NOW;
1302    }
1303    else if( msgs->incoming.length != 1 )
1304    {
1305        msgs->state = AWAITING_BT_MESSAGE;
1306        return READ_NOW;
1307    }
1308    else return readBtMessage( msgs, inbuf, inlen - 1 );
1309}
1310
1311static void
1312updatePeerProgress( tr_peermsgs * msgs )
1313{
1314    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1315    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1316    updateFastSet( msgs );
1317    updateInterest( msgs );
1318    firePeerProgress( msgs );
1319}
1320
1321static void
1322peerMadeRequest( tr_peermsgs *               msgs,
1323                 const struct peer_request * req )
1324{
1325    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1326    const int reqIsValid = requestIsValid( msgs, req );
1327    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1328    const int peerIsChoked = msgs->peer->peerIsChoked;
1329
1330    int allow = FALSE;
1331
1332    if( !reqIsValid )
1333        dbgmsg( msgs, "rejecting an invalid request." );
1334    else if( !clientHasPiece )
1335        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1336    else if( peerIsChoked )
1337        dbgmsg( msgs, "rejecting request from choked peer" );
1338    else
1339        allow = TRUE;
1340
1341    if( allow )
1342        reqListAppend( &msgs->peerAskedFor, req );
1343    else if( fext )
1344        protocolSendReject( msgs, req );
1345}
1346
1347static int
1348messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1349{
1350    switch( id )
1351    {
1352        case BT_CHOKE:
1353        case BT_UNCHOKE:
1354        case BT_INTERESTED:
1355        case BT_NOT_INTERESTED:
1356        case BT_FEXT_HAVE_ALL:
1357        case BT_FEXT_HAVE_NONE:
1358            return len == 1;
1359
1360        case BT_HAVE:
1361        case BT_FEXT_SUGGEST:
1362        case BT_FEXT_ALLOWED_FAST:
1363            return len == 5;
1364
1365        case BT_BITFIELD:
1366            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1367
1368        case BT_REQUEST:
1369        case BT_CANCEL:
1370        case BT_FEXT_REJECT:
1371            return len == 13;
1372
1373        case BT_PIECE:
1374            return len > 9 && len <= 16393;
1375
1376        case BT_PORT:
1377            return len == 3;
1378
1379        case BT_LTEP:
1380            return len >= 2;
1381
1382        default:
1383            return FALSE;
1384    }
1385}
1386
1387static int clientGotBlock( tr_peermsgs *               msgs,
1388                           const uint8_t *             block,
1389                           const struct peer_request * req );
1390
1391static int
1392readBtPiece( tr_peermsgs      * msgs,
1393             struct evbuffer  * inbuf,
1394             size_t             inlen,
1395             size_t           * setme_piece_bytes_read )
1396{
1397    struct peer_request * req = &msgs->incoming.blockReq;
1398
1399    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1400    dbgmsg( msgs, "In readBtPiece" );
1401
1402    if( !req->length )
1403    {
1404        if( inlen < 8 )
1405            return READ_LATER;
1406
1407        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1408        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1409        req->length = msgs->incoming.length - 9;
1410        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1411        return READ_NOW;
1412    }
1413    else
1414    {
1415        int err;
1416
1417        /* read in another chunk of data */
1418        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1419        size_t n = MIN( nLeft, inlen );
1420        size_t i = n;
1421
1422        while( i > 0 )
1423        {
1424            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1425            const size_t thisPass = MIN( i, sizeof( buf ) );
1426            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1427            evbuffer_add( msgs->incoming.block, buf, thisPass );
1428            i -= thisPass;
1429        }
1430
1431        fireClientGotData( msgs, n, TRUE );
1432        *setme_piece_bytes_read += n;
1433        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1434               n, req->index, req->offset, req->length,
1435               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1436        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1437            return READ_LATER;
1438
1439        /* we've got the whole block ... process it */
1440        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1441
1442        /* cleanup */
1443        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1444        req->length = 0;
1445        msgs->state = AWAITING_BT_LENGTH;
1446        if( !err )
1447            return READ_NOW;
1448        else {
1449            fireError( msgs, err );
1450            return READ_ERR;
1451        }
1452    }
1453}
1454
1455static int
1456readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1457{
1458    uint32_t      ui32;
1459    uint32_t      msglen = msgs->incoming.length;
1460    const uint8_t id = msgs->incoming.id;
1461    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1462    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1463
1464    --msglen; /* id length */
1465
1466    if( inlen < msglen )
1467        return READ_LATER;
1468
1469    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1470
1471    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1472    {
1473        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1474        fireError( msgs, EMSGSIZE );
1475        return READ_ERR;
1476    }
1477
1478    switch( id )
1479    {
1480        case BT_CHOKE:
1481            dbgmsg( msgs, "got Choke" );
1482            msgs->peer->clientIsChoked = 1;
1483            if( !fext )
1484                cancelAllRequestsToPeer( msgs, FALSE );
1485            break;
1486
1487        case BT_UNCHOKE:
1488            dbgmsg( msgs, "got Unchoke" );
1489            msgs->peer->clientIsChoked = 0;
1490            fireNeedReq( msgs );
1491            break;
1492
1493        case BT_INTERESTED:
1494            dbgmsg( msgs, "got Interested" );
1495            msgs->peer->peerIsInterested = 1;
1496            break;
1497
1498        case BT_NOT_INTERESTED:
1499            dbgmsg( msgs, "got Not Interested" );
1500            msgs->peer->peerIsInterested = 0;
1501            break;
1502
1503        case BT_HAVE:
1504            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1505            dbgmsg( msgs, "got Have: %u", ui32 );
1506            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1507                fireError( msgs, ERANGE );
1508                return READ_ERR;
1509            }
1510            updatePeerProgress( msgs );
1511            tr_rcTransferred( &msgs->torrent->swarmSpeed,
1512                              msgs->torrent->info.pieceSize );
1513            break;
1514
1515        case BT_BITFIELD:
1516        {
1517            dbgmsg( msgs, "got a bitfield" );
1518            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1519            updatePeerProgress( msgs );
1520            fireNeedReq( msgs );
1521            break;
1522        }
1523
1524        case BT_REQUEST:
1525        {
1526            struct peer_request r;
1527            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1528            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1529            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1530            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1531            peerMadeRequest( msgs, &r );
1532            break;
1533        }
1534
1535        case BT_CANCEL:
1536        {
1537            struct peer_request r;
1538            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1539            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1540            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1541            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1542            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1543                protocolSendReject( msgs, &r );
1544            break;
1545        }
1546
1547        case BT_PIECE:
1548            assert( 0 ); /* handled elsewhere! */
1549            break;
1550
1551        case BT_PORT:
1552            dbgmsg( msgs, "Got a BT_PORT" );
1553            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1554            break;
1555
1556        case BT_FEXT_SUGGEST:
1557            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1558            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1559            if( fext )
1560                fireClientGotSuggest( msgs, ui32 );
1561            else {
1562                fireError( msgs, EMSGSIZE );
1563                return READ_ERR;
1564            }
1565            break;
1566
1567        case BT_FEXT_ALLOWED_FAST:
1568            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1569            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1570            if( fext )
1571                fireClientGotAllowedFast( msgs, ui32 );
1572            else {
1573                fireError( msgs, EMSGSIZE );
1574                return READ_ERR;
1575            }
1576            break;
1577
1578        case BT_FEXT_HAVE_ALL:
1579            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1580            if( fext ) {
1581                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1582                updatePeerProgress( msgs );
1583            } else {
1584                fireError( msgs, EMSGSIZE );
1585                return READ_ERR;
1586            }
1587            break;
1588
1589        case BT_FEXT_HAVE_NONE:
1590            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1591            if( fext ) {
1592                tr_bitfieldClear( msgs->peer->have );
1593                updatePeerProgress( msgs );
1594            } else {
1595                fireError( msgs, EMSGSIZE );
1596                return READ_ERR;
1597            }
1598            break;
1599
1600        case BT_FEXT_REJECT:
1601        {
1602            struct peer_request r;
1603            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1604            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1605            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1606            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1607            if( fext )
1608                reqListRemove( &msgs->clientAskedFor, &r );
1609            else {
1610                fireError( msgs, EMSGSIZE );
1611                return READ_ERR;
1612            }
1613            break;
1614        }
1615
1616        case BT_LTEP:
1617            dbgmsg( msgs, "Got a BT_LTEP" );
1618            parseLtep( msgs, msglen, inbuf );
1619            break;
1620
1621        default:
1622            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1623            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1624            break;
1625    }
1626
1627    assert( msglen + 1 == msgs->incoming.length );
1628    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1629
1630    msgs->state = AWAITING_BT_LENGTH;
1631    return READ_NOW;
1632}
1633
1634static inline void
1635decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1636{
1637    tr_torrent * tor = msgs->torrent;
1638
1639    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1640}
1641
1642static inline void
1643clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1644{
1645    decrementDownloadedCount( msgs, req->length );
1646}
1647
1648static void
1649addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1650{
1651    if( !msgs->peer->blame )
1652         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1653    tr_bitfieldAdd( msgs->peer->blame, index );
1654}
1655
1656/* returns 0 on success, or an errno on failure */
1657static int
1658clientGotBlock( tr_peermsgs *               msgs,
1659                const uint8_t *             data,
1660                const struct peer_request * req )
1661{
1662    int err;
1663    tr_torrent * tor = msgs->torrent;
1664    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1665
1666    assert( msgs );
1667    assert( req );
1668
1669    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1670        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1671                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1672        return EMSGSIZE;
1673    }
1674
1675    /* save the block */
1676    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1677
1678    /**
1679    *** Remove the block from our `we asked for this' list
1680    **/
1681
1682    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1683        clientGotUnwantedBlock( msgs, req );
1684        dbgmsg( msgs, "we didn't ask for this message..." );
1685        return 0;
1686    }
1687
1688    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1689            msgs->clientAskedFor.count );
1690
1691    /**
1692    *** Error checks
1693    **/
1694
1695    if( tr_cpBlockIsComplete( &tor->completion, block ) ) {
1696        dbgmsg( msgs, "we have this block already..." );
1697        clientGotUnwantedBlock( msgs, req );
1698        return 0;
1699    }
1700
1701    /**
1702    ***  Save the block
1703    **/
1704
1705    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1706        return err;
1707
1708    addPeerToBlamefield( msgs, req->index );
1709    fireGotBlock( msgs, req );
1710    return 0;
1711}
1712
1713static int peerPulse( void * vmsgs );
1714
1715static void
1716didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1717{
1718    tr_peermsgs * msgs = vmsgs;
1719    firePeerGotData( msgs, bytesWritten, wasPieceData );
1720    peerPulse( msgs );
1721}
1722
1723static ReadState
1724canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1725{
1726    ReadState         ret;
1727    tr_peermsgs *     msgs = vmsgs;
1728    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1729    const size_t      inlen = EVBUFFER_LENGTH( in );
1730
1731    if( !inlen )
1732    {
1733        ret = READ_LATER;
1734    }
1735    else if( msgs->state == AWAITING_BT_PIECE )
1736    {
1737        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1738    }
1739    else switch( msgs->state )
1740    {
1741        case AWAITING_BT_LENGTH:
1742            ret = readBtLength ( msgs, in, inlen ); break;
1743
1744        case AWAITING_BT_ID:
1745            ret = readBtId     ( msgs, in, inlen ); break;
1746
1747        case AWAITING_BT_MESSAGE:
1748            ret = readBtMessage( msgs, in, inlen ); break;
1749
1750        default:
1751            assert( 0 );
1752    }
1753
1754    /* log the raw data that was read */
1755    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1756        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1757
1758    return ret;
1759}
1760
1761/**
1762***
1763**/
1764
1765static int
1766ratePulse( void * vmsgs )
1767{
1768    tr_peermsgs * msgs = vmsgs;
1769    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1770    const int seconds = 10;
1771    const int floor = 8;
1772    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1773
1774    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1775
1776    if( msgs->reqq > 0 )
1777        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1778
1779    return TRUE;
1780}
1781
1782static size_t
1783fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1784{
1785    size_t bytesWritten = 0;
1786    struct peer_request req;
1787    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1788    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1789
1790    /**
1791    ***  Protocol messages
1792    **/
1793
1794    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1795    {
1796        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1797        msgs->outMessagesBatchedAt = now;
1798    }
1799    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1800    {
1801        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1802        /* flush the protocol messages */
1803        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1804        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1805        msgs->clientSentAnythingAt = now;
1806        msgs->outMessagesBatchedAt = 0;
1807        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1808        bytesWritten +=  len;
1809    }
1810
1811    /**
1812    ***  Blocks
1813    **/
1814
1815    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1816        && popNextRequest( msgs, &req ) )
1817    {
1818        if( requestIsValid( msgs, &req )
1819            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1820        {
1821            int err;
1822            static uint8_t * buf = NULL;
1823
1824            if( buf == NULL )
1825                buf = tr_new( uint8_t, MAX_BLOCK_SIZE );
1826
1827            /* send a block */
1828            if(( err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ))) {
1829                fireError( msgs, err );
1830                bytesWritten = 0;
1831                msgs = NULL;
1832            } else {
1833                tr_peerIo * io = msgs->peer->io;
1834                struct evbuffer * out = tr_getBuffer( );
1835                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1836                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1837                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1838                tr_peerIoWriteUint32( io, out, req.index );
1839                tr_peerIoWriteUint32( io, out, req.offset );
1840                tr_peerIoWriteBytes ( io, out, buf, req.length );
1841                tr_peerIoWriteBuf( io, out, TRUE );
1842                bytesWritten += EVBUFFER_LENGTH( out );
1843                msgs->clientSentAnythingAt = now;
1844                tr_releaseBuffer( out );
1845            }
1846        }
1847        else if( fext ) /* peer needs a reject message */
1848        {
1849            protocolSendReject( msgs, &req );
1850        }
1851    }
1852
1853    /**
1854    ***  Keepalive
1855    **/
1856
1857    if( ( msgs != NULL )
1858        && ( msgs->clientSentAnythingAt != 0 )
1859        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1860    {
1861        dbgmsg( msgs, "sending a keepalive message" );
1862        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1863        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1864    }
1865
1866    return bytesWritten;
1867}
1868
1869static int
1870peerPulse( void * vmsgs )
1871{
1872    tr_peermsgs * msgs = vmsgs;
1873    const time_t  now = time( NULL );
1874
1875    ratePulse( msgs );
1876
1877    pumpRequestQueue( msgs, now );
1878    expireOldRequests( msgs, now );
1879
1880    for( ;; )
1881        if( fillOutputBuffer( msgs, now ) < 1 )
1882            break;
1883
1884    return TRUE; /* loop forever */
1885}
1886
1887void
1888tr_peerMsgsPulse( tr_peermsgs * msgs )
1889{
1890    if( msgs != NULL )
1891        peerPulse( msgs );
1892}
1893
1894static void
1895gotError( tr_peerIo  * io UNUSED,
1896          short        what,
1897          void       * vmsgs )
1898{
1899    if( what & EVBUFFER_TIMEOUT )
1900        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1901    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1902        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1903               what, errno, tr_strerror( errno ) );
1904    fireError( vmsgs, ENOTCONN );
1905}
1906
1907static void
1908sendBitfield( tr_peermsgs * msgs )
1909{
1910    struct evbuffer * out = msgs->outMessages;
1911    tr_bitfield *     field;
1912    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1913    size_t            i;
1914    size_t            lazyCount = 0;
1915
1916    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
1917
1918    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1919    {
1920        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1921            speed over a truly random sample -- let's limit the pool size to
1922            the first 1000 pieces so large torrents don't bog things down */
1923        size_t poolSize;
1924        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1925        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1926
1927        /* build the pool */
1928        for( i=poolSize=0; i<maxPoolSize; ++i )
1929            if( tr_bitfieldHas( field, i ) )
1930                pool[poolSize++] = i;
1931
1932        /* pull random piece indices from the pool */
1933        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1934        {
1935            const int pos = tr_cryptoWeakRandInt( poolSize );
1936            const tr_piece_index_t piece = pool[pos];
1937            tr_bitfieldRem( field, piece );
1938            lazyPieces[lazyCount++] = piece;
1939            pool[pos] = pool[--poolSize];
1940        }
1941
1942        /* cleanup */
1943        tr_free( pool );
1944    }
1945
1946    tr_peerIoWriteUint32( msgs->peer->io, out,
1947                          sizeof( uint8_t ) + field->byteCount );
1948    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1949    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1950    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1951            EVBUFFER_LENGTH( out ) );
1952    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1953
1954    for( i = 0; i < lazyCount; ++i )
1955        protocolSendHave( msgs, lazyPieces[i] );
1956
1957    tr_bitfieldFree( field );
1958}
1959
1960static void
1961tellPeerWhatWeHave( tr_peermsgs * msgs )
1962{
1963    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1964
1965    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
1966    {
1967        protocolSendHaveAll( msgs );
1968    }
1969    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
1970    {
1971        protocolSendHaveNone( msgs );
1972    }
1973    else
1974    {
1975        sendBitfield( msgs );
1976    }
1977}
1978
1979/**
1980***
1981**/
1982
1983/* some peers give us error messages if we send
1984   more than this many peers in a single pex message
1985   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1986#define MAX_PEX_ADDED 50
1987#define MAX_PEX_DROPPED 50
1988
1989typedef struct
1990{
1991    tr_pex *  added;
1992    tr_pex *  dropped;
1993    tr_pex *  elements;
1994    int       addedCount;
1995    int       droppedCount;
1996    int       elementCount;
1997}
1998PexDiffs;
1999
2000static void
2001pexAddedCb( void * vpex,
2002            void * userData )
2003{
2004    PexDiffs * diffs = userData;
2005    tr_pex *   pex = vpex;
2006
2007    if( diffs->addedCount < MAX_PEX_ADDED )
2008    {
2009        diffs->added[diffs->addedCount++] = *pex;
2010        diffs->elements[diffs->elementCount++] = *pex;
2011    }
2012}
2013
2014static inline void
2015pexDroppedCb( void * vpex,
2016              void * userData )
2017{
2018    PexDiffs * diffs = userData;
2019    tr_pex *   pex = vpex;
2020
2021    if( diffs->droppedCount < MAX_PEX_DROPPED )
2022    {
2023        diffs->dropped[diffs->droppedCount++] = *pex;
2024    }
2025}
2026
2027static inline void
2028pexElementCb( void * vpex,
2029              void * userData )
2030{
2031    PexDiffs * diffs = userData;
2032    tr_pex * pex = vpex;
2033
2034    diffs->elements[diffs->elementCount++] = *pex;
2035}
2036
2037static void
2038sendPex( tr_peermsgs * msgs )
2039{
2040    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2041    {
2042        PexDiffs diffs;
2043        PexDiffs diffs6;
2044        tr_pex * newPex = NULL;
2045        tr_pex * newPex6 = NULL;
2046        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2047                                                 msgs->torrent->info.hash,
2048                                                 &newPex, TR_AF_INET );
2049        const int newCount6 = tr_peerMgrGetPeers( msgs->session->peerMgr,
2050                                                  msgs->torrent->info.hash,
2051                                                  &newPex6, TR_AF_INET6 );
2052
2053        /* build the diffs */
2054        diffs.added = tr_new( tr_pex, newCount );
2055        diffs.addedCount = 0;
2056        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2057        diffs.droppedCount = 0;
2058        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2059        diffs.elementCount = 0;
2060        tr_set_compare( msgs->pex, msgs->pexCount,
2061                        newPex, newCount,
2062                        tr_pexCompare, sizeof( tr_pex ),
2063                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2064        diffs6.added = tr_new( tr_pex, newCount6 );
2065        diffs6.addedCount = 0;
2066        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2067        diffs6.droppedCount = 0;
2068        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2069        diffs6.elementCount = 0;
2070        tr_set_compare( msgs->pex6, msgs->pexCount6,
2071                        newPex6, newCount6,
2072                        tr_pexCompare, sizeof( tr_pex ),
2073                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2074        dbgmsg(
2075            msgs,
2076            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2077            msgs->pexCount, newCount + newCount6,
2078            diffs.addedCount + diffs6.addedCount,
2079            diffs.droppedCount + diffs6.droppedCount );
2080
2081        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2082            !diffs6.droppedCount )
2083        {
2084            tr_free( diffs.elements );
2085            tr_free( diffs6.elements );
2086        }
2087        else
2088        {
2089            int  i;
2090            tr_benc val;
2091            char * benc;
2092            int bencLen;
2093            uint8_t * tmp, *walk;
2094            struct evbuffer * out = msgs->outMessages;
2095
2096            /* update peer */
2097            tr_free( msgs->pex );
2098            msgs->pex = diffs.elements;
2099            msgs->pexCount = diffs.elementCount;
2100            tr_free( msgs->pex6 );
2101            msgs->pex6 = diffs6.elements;
2102            msgs->pexCount6 = diffs6.elementCount;
2103
2104            /* build the pex payload */
2105            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2106                                         * speed vs. likelihood? */
2107
2108            /* "added" */
2109            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2110            for( i = 0; i < diffs.addedCount; ++i )
2111            {
2112                tr_suspectAddress( &diffs.added[i].addr, "pex" );
2113                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2114                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2115            }
2116            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2117            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2118            tr_free( tmp );
2119
2120            /* "added.f" */
2121            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2122            for( i = 0; i < diffs.addedCount; ++i )
2123                *walk++ = diffs.added[i].flags;
2124            assert( ( walk - tmp ) == diffs.addedCount );
2125            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2126            tr_free( tmp );
2127
2128            /* "dropped" */
2129            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2130            for( i = 0; i < diffs.droppedCount; ++i )
2131            {
2132                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2133                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2134            }
2135            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2136            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2137            tr_free( tmp );
2138           
2139            /* "added6" */
2140            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2141            for( i = 0; i < diffs6.addedCount; ++i )
2142            {
2143                tr_suspectAddress( &diffs6.added[i].addr, "pex6" );
2144                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2145                walk += 16;
2146                memcpy( walk, &diffs6.added[i].port, 2 );
2147                walk += 2;
2148            }
2149            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2150            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2151            tr_free( tmp );
2152           
2153            /* "added6.f" */
2154            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2155            for( i = 0; i < diffs6.addedCount; ++i )
2156                *walk++ = diffs6.added[i].flags;
2157            assert( ( walk - tmp ) == diffs6.addedCount );
2158            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2159            tr_free( tmp );
2160           
2161            /* "dropped6" */
2162            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2163            for( i = 0; i < diffs6.droppedCount; ++i )
2164            {
2165                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2166                walk += 16;
2167                memcpy( walk, &diffs6.dropped[i].port, 2 );
2168                walk += 2;
2169            }
2170            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2171            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2172            tr_free( tmp );
2173
2174            /* write the pex message */
2175            benc = tr_bencSave( &val, &bencLen );
2176            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2177            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2178            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2179            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2180            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2181            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2182
2183            tr_free( benc );
2184            tr_bencFree( &val );
2185        }
2186
2187        /* cleanup */
2188        tr_free( diffs.added );
2189        tr_free( diffs.dropped );
2190        tr_free( newPex );
2191        tr_free( diffs6.added );
2192        tr_free( diffs6.dropped );
2193        tr_free( newPex6 );
2194
2195        msgs->clientSentPexAt = time( NULL );
2196    }
2197}
2198
2199static inline int
2200pexPulse( void * vpeer )
2201{
2202    sendPex( vpeer );
2203    return TRUE;
2204}
2205
2206/**
2207***
2208**/
2209
2210tr_peermsgs*
2211tr_peerMsgsNew( struct tr_torrent * torrent,
2212                struct tr_peer    * peer,
2213                tr_delivery_func    func,
2214                void              * userData,
2215                tr_publisher_tag  * setme )
2216{
2217    tr_peermsgs * m;
2218
2219    assert( peer );
2220    assert( peer->io );
2221
2222    m = tr_new0( tr_peermsgs, 1 );
2223    m->publisher = TR_PUBLISHER_INIT;
2224    m->peer = peer;
2225    m->session = torrent->session;
2226    m->torrent = torrent;
2227    m->peer->clientIsChoked = 1;
2228    m->peer->peerIsChoked = 1;
2229    m->peer->clientIsInterested = 0;
2230    m->peer->peerIsInterested = 0;
2231    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2232    m->state = AWAITING_BT_LENGTH;
2233    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2234    m->outMessages = evbuffer_new( );
2235    m->outMessagesBatchedAt = 0;
2236    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2237    m->incoming.block = evbuffer_new( );
2238    m->peerAskedFor = REQUEST_LIST_INIT;
2239    m->clientAskedFor = REQUEST_LIST_INIT;
2240    m->clientWillAskFor = REQUEST_LIST_INIT;
2241    peer->msgs = m;
2242
2243    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2244
2245    if( tr_peerIoSupportsLTEP( peer->io ) )
2246        sendLtepHandshake( m );
2247
2248    tellPeerWhatWeHave( m );
2249
2250    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2251    ratePulse( m );
2252
2253    return m;
2254}
2255
2256void
2257tr_peerMsgsFree( tr_peermsgs* msgs )
2258{
2259    if( msgs )
2260    {
2261        tr_timerFree( &msgs->pexTimer );
2262        tr_publisherDestruct( &msgs->publisher );
2263        reqListClear( &msgs->clientWillAskFor );
2264        reqListClear( &msgs->clientAskedFor );
2265        reqListClear( &msgs->peerAskedFor );
2266
2267        evbuffer_free( msgs->incoming.block );
2268        evbuffer_free( msgs->outMessages );
2269        tr_free( msgs->pex6 );
2270        tr_free( msgs->pex );
2271
2272        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2273        tr_free( msgs );
2274    }
2275}
2276
2277void
2278tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2279                        tr_publisher_tag tag )
2280{
2281    tr_publisherUnsubscribe( &peer->publisher, tag );
2282}
2283
Note: See TracBrowser for help on using the repository browser.