source: trunk/libtransmission/peer-msgs.c @ 7456

Last change on this file since 7456 was 7456, checked in by charles, 12 years ago

(trunk libT) minor cleanups found while diffing for backport to 1.4x in r7455

  • Property svn:keywords set to Date Rev Author Id
File size: 64.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7456 2008-12-22 00:52:44Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56
57    BT_FEXT_SUGGEST         = 13,
58    BT_FEXT_HAVE_ALL        = 14,
59    BT_FEXT_HAVE_NONE       = 15,
60    BT_FEXT_REJECT          = 16,
61    BT_FEXT_ALLOWED_FAST    = 17,
62
63    BT_LTEP                 = 20,
64
65    LTEP_HANDSHAKE          = 0,
66
67    TR_LTEP_PEX             = 1,
68
69
70
71    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
72
73    /* idle seconds before we send a keepalive */
74    KEEPALIVE_INTERVAL_SECS = 100,
75
76    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
77
78    MAX_QUEUE_SIZE          = ( 100 ),
79
80    /* how long an unsent request can stay queued before it's returned
81       back to the peer-mgr's pool of requests */
82    QUEUED_REQUEST_TTL_SECS = 20,
83
84    /* how long a sent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    SENT_REQUEST_TTL_SECS = 240,
87
88    /* used in lowering the outMessages queue period */
89    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
90    HIGH_PRIORITY_INTERVAL_SECS = 2,
91    LOW_PRIORITY_INTERVAL_SECS = 20,
92
93    /* number of pieces to remove from the bitfield when
94     * lazy bitfields are turned on */
95    LAZY_PIECE_COUNT = 26,
96
97    /* number of pieces we'll allow in our fast set */
98    MAX_FAST_SET_SIZE = 3
99};
100
101/**
102***  REQUEST MANAGEMENT
103**/
104
105enum
106{
107    AWAITING_BT_LENGTH,
108    AWAITING_BT_ID,
109    AWAITING_BT_MESSAGE,
110    AWAITING_BT_PIECE
111};
112
113struct peer_request
114{
115    uint32_t    index;
116    uint32_t    offset;
117    uint32_t    length;
118    time_t      time_requested;
119};
120
121static int
122compareRequest( const void * va, const void * vb )
123{
124    const struct peer_request * a = va;
125    const struct peer_request * b = vb;
126
127    if( a->index != b->index )
128        return a->index < b->index ? -1 : 1;
129
130    if( a->offset != b->offset )
131        return a->offset < b->offset ? -1 : 1;
132
133    if( a->length != b->length )
134        return a->length < b->length ? -1 : 1;
135
136    return 0;
137}
138
139struct request_list
140{
141    uint16_t               count;
142    uint16_t               max;
143    struct peer_request *  requests;
144};
145
146static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
147
148static void
149reqListReserve( struct request_list * list,
150                uint16_t              max )
151{
152    if( list->max < max )
153    {
154        list->max = max;
155        list->requests = tr_renew( struct peer_request,
156                                   list->requests,
157                                   list->max );
158    }
159}
160
161static void
162reqListClear( struct request_list * list )
163{
164    tr_free( list->requests );
165    *list = REQUEST_LIST_INIT;
166}
167
168static void
169reqListCopy( struct request_list * dest, const struct request_list * src )
170{
171    dest->count = dest->max = src->count;
172    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
173}
174
175static void
176reqListRemoveOne( struct request_list * list,
177                  int                   i )
178{
179    assert( 0 <= i && i < list->count );
180
181    memmove( &list->requests[i],
182             &list->requests[i + 1],
183             sizeof( struct peer_request ) * ( --list->count - i ) );
184}
185
186static void
187reqListAppend( struct request_list *       list,
188               const struct peer_request * req )
189{
190    if( ++list->count >= list->max )
191        reqListReserve( list, list->max + 8 );
192
193    list->requests[list->count - 1] = *req;
194}
195
196static int
197reqListPop( struct request_list * list,
198            struct peer_request * setme )
199{
200    int success;
201
202    if( !list->count )
203        success = FALSE;
204    else {
205        *setme = list->requests[0];
206        reqListRemoveOne( list, 0 );
207        success = TRUE;
208    }
209
210    return success;
211}
212
213static int
214reqListFind( struct request_list *       list,
215             const struct peer_request * key )
216{
217    uint16_t i;
218
219    for( i = 0; i < list->count; ++i )
220        if( !compareRequest( key, list->requests + i ) )
221            return i;
222
223    return -1;
224}
225
226static int
227reqListRemove( struct request_list *       list,
228               const struct peer_request * key )
229{
230    int success;
231    const int i = reqListFind( list, key );
232
233    if( i < 0 )
234        success = FALSE;
235    else {
236        reqListRemoveOne( list, i );
237        success = TRUE;
238    }
239
240    return success;
241}
242
243/**
244***
245**/
246
247/* this is raw, unchanged data from the peer regarding
248 * the current message that it's sending us. */
249struct tr_incoming
250{
251    uint8_t                id;
252    uint32_t               length; /* includes the +1 for id length */
253    struct peer_request    blockReq; /* metadata for incoming blocks */
254    struct evbuffer *      block; /* piece data for incoming blocks */
255};
256
257/**
258 * Low-level communication state information about a connected peer.
259 *
260 * This structure remembers the low-level protocol states that we're
261 * in with this peer, such as active requests, pex messages, and so on.
262 * Its fields are all private to peer-msgs.c.
263 *
264 * Data not directly involved with sending & receiving messages is
265 * stored in tr_peer, where it can be accessed by both peermsgs and
266 * the peer manager.
267 *
268 * @see struct peer_atom
269 * @see tr_peer
270 */
271struct tr_peermsgs
272{
273    tr_bool         peerSentBitfield;
274    tr_bool         peerSupportsPex;
275    tr_bool         clientSentLtepHandshake;
276    tr_bool         peerSentLtepHandshake;
277    tr_bool         haveFastSet;
278
279    uint8_t         state;
280    uint8_t         ut_pex_id;
281    uint16_t        pexCount;
282    uint16_t        pexCount6;
283    uint16_t        maxActiveRequests;
284
285    size_t                 fastsetSize;
286    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
287
288    /* how long the outMessages batch should be allowed to grow before
289     * it's flushed -- some messages (like requests >:) should be sent
290     * very quickly; others aren't as urgent. */
291    int                    outMessagesBatchPeriod;
292
293    tr_peer *              peer;
294
295    tr_session *           session;
296    tr_torrent *           torrent;
297
298    tr_publisher_t *       publisher;
299
300    struct evbuffer *      outMessages; /* all the non-piece messages */
301
302    struct request_list    peerAskedFor;
303    struct request_list    clientAskedFor;
304    struct request_list    clientWillAskFor;
305
306    tr_timer             * pexTimer;
307    tr_pex               * pex;
308    tr_pex               * pex6;
309
310    time_t                 clientSentPexAt;
311    time_t                 clientSentAnythingAt;
312
313    /* when we started batching the outMessages */
314    time_t                outMessagesBatchedAt;
315
316    struct tr_incoming    incoming;
317
318    /* if the peer supports the Extension Protocol in BEP 10 and
319       supplied a reqq argument, it's stored here.  otherwise the
320       value is zero and should be ignored. */
321    int64_t               reqq;
322};
323
324/**
325***
326**/
327
328static void
329myDebug( const char * file, int line,
330         const struct tr_peermsgs * msgs,
331         const char * fmt, ... )
332{
333    FILE * fp = tr_getLog( );
334
335    if( fp )
336    {
337        va_list           args;
338        char              timestr[64];
339        struct evbuffer * buf = evbuffer_new( );
340        char *            base = tr_basename( file );
341
342        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
343                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
344                             msgs->torrent->info.name,
345                             tr_peerIoGetAddrStr( msgs->peer->io ),
346                             msgs->peer->client );
347        va_start( args, fmt );
348        evbuffer_add_vprintf( buf, fmt, args );
349        va_end( args );
350        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
351        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
352
353        tr_free( base );
354        evbuffer_free( buf );
355    }
356}
357
358#define dbgmsg( msgs, ... ) \
359    do { \
360        if( tr_deepLoggingIsActive( ) ) \
361            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
362    } while( 0 )
363
364/**
365***
366**/
367
368static void
369pokeBatchPeriod( tr_peermsgs * msgs,
370                 int           interval )
371{
372    if( msgs->outMessagesBatchPeriod > interval )
373    {
374        msgs->outMessagesBatchPeriod = interval;
375        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
376    }
377}
378
379static void
380dbgOutMessageLen( tr_peermsgs * msgs )
381{
382    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
383}
384
385static void
386protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
387{
388    tr_peerIo       * io  = msgs->peer->io;
389    struct evbuffer * out = msgs->outMessages;
390
391    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
392
393    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
394    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
395    tr_peerIoWriteUint32( io, out, req->index );
396    tr_peerIoWriteUint32( io, out, req->offset );
397    tr_peerIoWriteUint32( io, out, req->length );
398
399    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
400    dbgOutMessageLen( msgs );
401}
402
403static void
404protocolSendRequest( tr_peermsgs               * msgs,
405                     const struct peer_request * req )
406{
407    tr_peerIo       * io  = msgs->peer->io;
408    struct evbuffer * out = msgs->outMessages;
409
410    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
411    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
412    tr_peerIoWriteUint32( io, out, req->index );
413    tr_peerIoWriteUint32( io, out, req->offset );
414    tr_peerIoWriteUint32( io, out, req->length );
415
416    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
417    dbgOutMessageLen( msgs );
418    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
419}
420
421static void
422protocolSendCancel( tr_peermsgs               * msgs,
423                    const struct peer_request * req )
424{
425    tr_peerIo       * io  = msgs->peer->io;
426    struct evbuffer * out = msgs->outMessages;
427
428    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
429    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
430    tr_peerIoWriteUint32( io, out, req->index );
431    tr_peerIoWriteUint32( io, out, req->offset );
432    tr_peerIoWriteUint32( io, out, req->length );
433
434    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
435    dbgOutMessageLen( msgs );
436    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
437}
438
439static void
440protocolSendHave( tr_peermsgs * msgs,
441                  uint32_t      index )
442{
443    tr_peerIo       * io  = msgs->peer->io;
444    struct evbuffer * out = msgs->outMessages;
445
446    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
447    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
448    tr_peerIoWriteUint32( io, out, index );
449
450    dbgmsg( msgs, "sending Have %u", index );
451    dbgOutMessageLen( msgs );
452    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
453}
454
455#if 0
456static void
457protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
458{
459    tr_peerIo       * io  = msgs->peer->io;
460    struct evbuffer * out = msgs->outMessages;
461
462    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
463
464    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
465    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
466    tr_peerIoWriteUint32( io, out, pieceIndex );
467
468    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
469    dbgOutMessageLen( msgs );
470}
471#endif
472
473static void
474protocolSendChoke( tr_peermsgs * msgs,
475                   int           choke )
476{
477    tr_peerIo       * io  = msgs->peer->io;
478    struct evbuffer * out = msgs->outMessages;
479
480    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
481    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
482
483    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
484    dbgOutMessageLen( msgs );
485    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
486}
487
488static void
489protocolSendHaveAll( tr_peermsgs * msgs )
490{
491    tr_peerIo       * io  = msgs->peer->io;
492    struct evbuffer * out = msgs->outMessages;
493
494    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
495
496    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
497    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
498
499    dbgmsg( msgs, "sending HAVE_ALL..." );
500    dbgOutMessageLen( msgs );
501    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
502}
503
504static void
505protocolSendHaveNone( tr_peermsgs * msgs )
506{
507    tr_peerIo       * io  = msgs->peer->io;
508    struct evbuffer * out = msgs->outMessages;
509
510    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
511
512    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
513    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
514
515    dbgmsg( msgs, "sending HAVE_NONE..." );
516    dbgOutMessageLen( msgs );
517    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
518}
519
520/**
521***  EVENTS
522**/
523
524static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
525
526static void
527publish( tr_peermsgs * msgs, tr_peer_event * e )
528{
529    assert( msgs->peer );
530    assert( msgs->peer->msgs == msgs );
531
532    tr_publisherPublish( msgs->publisher, msgs->peer, e );
533}
534
535static void
536fireError( tr_peermsgs * msgs, int err )
537{
538    tr_peer_event e = blankEvent;
539    e.eventType = TR_PEER_ERROR;
540    e.err = err;
541    publish( msgs, &e );
542}
543
544static void
545fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
546{
547    tr_peer_event e = blankEvent;
548    e.eventType = TR_PEER_UPLOAD_ONLY;
549    e.uploadOnly = uploadOnly;
550    publish( msgs, &e );
551}
552
553static void
554fireNeedReq( tr_peermsgs * msgs )
555{
556    tr_peer_event e = blankEvent;
557    e.eventType = TR_PEER_NEED_REQ;
558    publish( msgs, &e );
559}
560
561static void
562firePeerProgress( tr_peermsgs * msgs )
563{
564    tr_peer_event e = blankEvent;
565    e.eventType = TR_PEER_PEER_PROGRESS;
566    e.progress = msgs->peer->progress;
567    publish( msgs, &e );
568}
569
570static void
571fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
572{
573    tr_peer_event e = blankEvent;
574    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
575    e.pieceIndex = req->index;
576    e.offset = req->offset;
577    e.length = req->length;
578    publish( msgs, &e );
579}
580
581static void
582fireClientGotData( tr_peermsgs * msgs,
583                   uint32_t      length,
584                   int           wasPieceData )
585{
586    tr_peer_event e = blankEvent;
587
588    e.length = length;
589    e.eventType = TR_PEER_CLIENT_GOT_DATA;
590    e.wasPieceData = wasPieceData;
591    publish( msgs, &e );
592}
593
594static void
595fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
596{
597    tr_peer_event e = blankEvent;
598    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
599    e.pieceIndex = pieceIndex;
600    publish( msgs, &e );
601}
602
603static void
604fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
605{
606    tr_peer_event e = blankEvent;
607    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
608    e.pieceIndex = pieceIndex;
609    publish( msgs, &e );
610}
611
612static void
613firePeerGotData( tr_peermsgs  * msgs,
614                 uint32_t       length,
615                 int            wasPieceData )
616{
617    tr_peer_event e = blankEvent;
618
619    e.length = length;
620    e.eventType = TR_PEER_PEER_GOT_DATA;
621    e.wasPieceData = wasPieceData;
622
623    publish( msgs, &e );
624}
625
626static void
627fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
628{
629    tr_peer_event e = blankEvent;
630    e.eventType = TR_PEER_CANCEL;
631    e.pieceIndex = req->index;
632    e.offset = req->offset;
633    e.length = req->length;
634    publish( msgs, &e );
635}
636
637/**
638***  ALLOWED FAST SET
639***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
640**/
641
642size_t
643tr_generateAllowedSet( tr_piece_index_t * setmePieces,
644                       size_t             desiredSetSize,
645                       size_t             pieceCount,
646                       const uint8_t    * infohash,
647                       const tr_address * addr )
648{
649    size_t setSize = 0;
650
651    assert( setmePieces );
652    assert( desiredSetSize <= pieceCount );
653    assert( desiredSetSize );
654    assert( pieceCount );
655    assert( infohash );
656    assert( addr );
657
658    if( addr->type == TR_AF_INET )
659    {
660        uint8_t w[SHA_DIGEST_LENGTH + 4];
661        uint8_t x[SHA_DIGEST_LENGTH];
662
663        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
664        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
665        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
666
667        while( setSize<desiredSetSize )
668        {
669            int i;
670            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
671            {
672                size_t k;
673                uint32_t j = i * 4;                                  /* (5) */
674                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
675                uint32_t index = y % pieceCount;                     /* (7) */
676
677                for( k=0; k<setSize; ++k )                           /* (8) */
678                    if( setmePieces[k] == index )
679                        break;
680
681                if( k == setSize )
682                    setmePieces[setSize++] = index;                  /* (9) */
683            }
684
685            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
686        }
687    }
688
689    return setSize;
690}
691
692static void
693updateFastSet( tr_peermsgs * msgs UNUSED )
694{
695#if 0
696    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
697    const int peerIsNeedy = msgs->peer->progress < 0.10;
698
699    if( fext && peerIsNeedy && !msgs->haveFastSet )
700    {
701        size_t i;
702        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
703        const tr_info * inf = &msgs->torrent->info;
704        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
705
706        /* build the fast set */
707        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
708        msgs->haveFastSet = 1;
709
710        /* send it to the peer */
711        for( i=0; i<msgs->fastsetSize; ++i )
712            protocolSendAllowedFast( msgs, msgs->fastset[i] );
713    }
714#endif
715}
716
717/**
718***  INTEREST
719**/
720
721static int
722isPieceInteresting( const tr_peermsgs * msgs,
723                    tr_piece_index_t    piece )
724{
725    const tr_torrent * torrent = msgs->torrent;
726
727    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
728          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
729          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
730}
731
732/* "interested" means we'll ask for piece data if they unchoke us */
733static int
734isPeerInteresting( const tr_peermsgs * msgs )
735{
736    tr_piece_index_t    i;
737    const tr_torrent *  torrent;
738    const tr_bitfield * bitfield;
739    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
740
741    if( clientIsSeed )
742        return FALSE;
743
744    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
745        return FALSE;
746
747    torrent = msgs->torrent;
748    bitfield = tr_cpPieceBitfield( torrent->completion );
749
750    if( !msgs->peer->have )
751        return TRUE;
752
753    assert( bitfield->byteCount == msgs->peer->have->byteCount );
754
755    for( i = 0; i < torrent->info.pieceCount; ++i )
756        if( isPieceInteresting( msgs, i ) )
757            return TRUE;
758
759    return FALSE;
760}
761
762static void
763sendInterest( tr_peermsgs * msgs,
764              int           weAreInterested )
765{
766    struct evbuffer * out = msgs->outMessages;
767
768    assert( msgs );
769    assert( weAreInterested == 0 || weAreInterested == 1 );
770
771    msgs->peer->clientIsInterested = weAreInterested;
772    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
773    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
774    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
775
776    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
777    dbgOutMessageLen( msgs );
778}
779
780static void
781updateInterest( tr_peermsgs * msgs )
782{
783    const int i = isPeerInteresting( msgs );
784
785    if( i != msgs->peer->clientIsInterested )
786        sendInterest( msgs, i );
787    if( i )
788        fireNeedReq( msgs );
789}
790
791static int
792popNextRequest( tr_peermsgs *         msgs,
793                struct peer_request * setme )
794{
795    return reqListPop( &msgs->peerAskedFor, setme );
796}
797
798static void
799cancelAllRequestsToClient( tr_peermsgs * msgs )
800{
801    struct peer_request req;
802    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
803
804    while( popNextRequest( msgs, &req ) )
805        if( mustSendCancel )
806            protocolSendReject( msgs, &req );
807}
808
809void
810tr_peerMsgsSetChoke( tr_peermsgs * msgs,
811                     int           choke )
812{
813    const time_t now = time( NULL );
814    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
815
816    assert( msgs );
817    assert( msgs->peer );
818    assert( choke == 0 || choke == 1 );
819
820    if( msgs->peer->chokeChangedAt > fibrillationTime )
821    {
822        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
823    }
824    else if( msgs->peer->peerIsChoked != choke )
825    {
826        msgs->peer->peerIsChoked = choke;
827        if( choke )
828            cancelAllRequestsToClient( msgs );
829        protocolSendChoke( msgs, choke );
830        msgs->peer->chokeChangedAt = now;
831    }
832}
833
834/**
835***
836**/
837
838void
839tr_peerMsgsHave( tr_peermsgs * msgs,
840                 uint32_t      index )
841{
842    protocolSendHave( msgs, index );
843
844    /* since we have more pieces now, we might not be interested in this peer */
845    updateInterest( msgs );
846}
847
848/**
849***
850**/
851
852static int
853reqIsValid( const tr_peermsgs * peer,
854            uint32_t            index,
855            uint32_t            offset,
856            uint32_t            length )
857{
858    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
859}
860
861static int
862requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
863{
864    return reqIsValid( msgs, req->index, req->offset, req->length );
865}
866
867static void
868expireOldRequests( tr_peermsgs * msgs, const time_t now  )
869{
870    int i;
871    time_t oldestAllowed;
872    struct request_list tmp = REQUEST_LIST_INIT;
873    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
874    dbgmsg( msgs, "entering `expire old requests' block" );
875
876    /* cancel requests that have been queued for too long */
877    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
878    reqListCopy( &tmp, &msgs->clientWillAskFor );
879    for( i=0; i<tmp.count; ++i ) {
880        const struct peer_request * req = &tmp.requests[i];
881        if( req->time_requested < oldestAllowed )
882            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
883    }
884    reqListClear( &tmp );
885
886    /* if the peer doesn't support "Reject Request",
887     * cancel requests that were sent too long ago. */
888    if( !fext ) {
889        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
890        reqListCopy( &tmp, &msgs->clientAskedFor );
891        for( i=0; i<tmp.count; ++i ) {
892            const struct peer_request * req = &tmp.requests[i];
893            if( req->time_requested < oldestAllowed )
894                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
895        }
896        reqListClear( &tmp );
897    }
898
899    dbgmsg( msgs, "leaving `expire old requests' block" );
900}
901
902static void
903pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
904{
905    const int           max = msgs->maxActiveRequests;
906    int                 sent = 0;
907    int                 count = msgs->clientAskedFor.count;
908    struct peer_request req;
909
910    if( msgs->peer->clientIsChoked )
911        return;
912    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
913        return;
914
915    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
916    {
917        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
918
919        assert( requestIsValid( msgs, &req ) );
920        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
921
922        /* don't ask for it if we've already got it... this block may have
923         * come in from a different peer after we cancelled a request for it */
924        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
925        {
926            protocolSendRequest( msgs, &req );
927            req.time_requested = now;
928            reqListAppend( &msgs->clientAskedFor, &req );
929
930            ++count;
931            ++sent;
932        }
933    }
934
935    if( sent )
936        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
937                sent, msgs->clientAskedFor.count, msgs->clientWillAskFor.count );
938
939    if( count < max )
940        fireNeedReq( msgs );
941}
942
943static int
944requestQueueIsFull( const tr_peermsgs * msgs )
945{
946    const int req_max = msgs->maxActiveRequests;
947    return msgs->clientWillAskFor.count >= req_max;
948}
949
950tr_addreq_t
951tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
952                       uint32_t         index,
953                       uint32_t         offset,
954                       uint32_t         length )
955{
956    struct peer_request req;
957
958    assert( msgs );
959    assert( msgs->torrent );
960    assert( reqIsValid( msgs, index, offset, length ) );
961
962    /**
963    ***  Reasons to decline the request
964    **/
965
966    /* don't send requests to choked clients */
967    if( msgs->peer->clientIsChoked ) {
968        dbgmsg( msgs, "declining request because they're choking us" );
969        return TR_ADDREQ_CLIENT_CHOKED;
970    }
971
972    /* peer doesn't have this piece */
973    if( !tr_bitfieldHas( msgs->peer->have, index ) )
974        return TR_ADDREQ_MISSING;
975
976    /* peer's queue is full */
977    if( requestQueueIsFull( msgs ) ) {
978        dbgmsg( msgs, "declining request because we're full" );
979        return TR_ADDREQ_FULL;
980    }
981
982    /* have we already asked for this piece? */
983    req.index = index;
984    req.offset = offset;
985    req.length = length;
986    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
987        dbgmsg( msgs, "declining because it's a duplicate" );
988        return TR_ADDREQ_DUPLICATE;
989    }
990    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
991        dbgmsg( msgs, "declining because it's a duplicate" );
992        return TR_ADDREQ_DUPLICATE;
993    }
994
995    /**
996    ***  Accept this request
997    **/
998
999    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1000            index, offset, length );
1001    req.time_requested = time( NULL );
1002    reqListAppend( &msgs->clientWillAskFor, &req );
1003    return TR_ADDREQ_OK;
1004}
1005
1006static void
1007cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1008{
1009    int i;
1010    struct request_list a = msgs->clientWillAskFor;
1011    struct request_list b = msgs->clientAskedFor;
1012    dbgmsg( msgs, "cancelling all requests to peer" );
1013
1014    msgs->clientAskedFor = REQUEST_LIST_INIT;
1015    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1016
1017    for( i=0; i<a.count; ++i )
1018        fireCancelledReq( msgs, &a.requests[i] );
1019
1020    for( i = 0; i < b.count; ++i ) {
1021        fireCancelledReq( msgs, &b.requests[i] );
1022        if( sendCancel )
1023            protocolSendCancel( msgs, &b.requests[i] );
1024    }
1025
1026    reqListClear( &a );
1027    reqListClear( &b );
1028}
1029
1030void
1031tr_peerMsgsCancel( tr_peermsgs * msgs,
1032                   uint32_t      pieceIndex,
1033                   uint32_t      offset,
1034                   uint32_t      length )
1035{
1036    struct peer_request req;
1037
1038    assert( msgs != NULL );
1039    assert( length > 0 );
1040
1041
1042    /* have we asked the peer for this piece? */
1043    req.index = pieceIndex;
1044    req.offset = offset;
1045    req.length = length;
1046
1047    /* if it's only in the queue and hasn't been sent yet, free it */
1048    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1049        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1050        fireCancelledReq( msgs, &req );
1051    }
1052
1053    /* if it's already been sent, send a cancel message too */
1054    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1055        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1056        protocolSendCancel( msgs, &req );
1057        fireCancelledReq( msgs, &req );
1058    }
1059}
1060
1061
1062/**
1063***
1064**/
1065
1066static void
1067sendLtepHandshake( tr_peermsgs * msgs )
1068{
1069    tr_benc val, *m;
1070    char * buf;
1071    int len;
1072    int pex;
1073    struct evbuffer * out = msgs->outMessages;
1074
1075    if( msgs->clientSentLtepHandshake )
1076        return;
1077
1078    dbgmsg( msgs, "sending an ltep handshake" );
1079    msgs->clientSentLtepHandshake = 1;
1080
1081    /* decide if we want to advertise pex support */
1082    if( !tr_torrentAllowsPex( msgs->torrent ) )
1083        pex = 0;
1084    else if( msgs->peerSentLtepHandshake )
1085        pex = msgs->peerSupportsPex ? 1 : 0;
1086    else
1087        pex = 1;
1088
1089    tr_bencInitDict( &val, 5 );
1090    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1091    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1092    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1093    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1094    m  = tr_bencDictAddDict( &val, "m", 1 );
1095    if( pex )
1096        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1097    buf = tr_bencSave( &val, &len );
1098
1099    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1100    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1101    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1102    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1103    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1104    dbgOutMessageLen( msgs );
1105
1106    /* cleanup */
1107    tr_bencFree( &val );
1108    tr_free( buf );
1109}
1110
1111static void
1112parseLtepHandshake( tr_peermsgs *     msgs,
1113                    int               len,
1114                    struct evbuffer * inbuf )
1115{
1116    int64_t   i;
1117    tr_benc   val, * sub;
1118    uint8_t * tmp = tr_new( uint8_t, len );
1119
1120    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1121    msgs->peerSentLtepHandshake = 1;
1122
1123    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1124    {
1125        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1126        tr_free( tmp );
1127        return;
1128    }
1129
1130    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1131
1132    /* does the peer prefer encrypted connections? */
1133    if( tr_bencDictFindInt( &val, "e", &i ) )
1134        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1135                                              : ENCRYPTION_PREFERENCE_NO;
1136
1137    /* check supported messages for utorrent pex */
1138    msgs->peerSupportsPex = 0;
1139    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1140        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1141            msgs->ut_pex_id = (uint8_t) i;
1142            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1143            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1144        }
1145    }
1146
1147    /* look for upload_only (BEP 21) */
1148    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1149        fireUploadOnly( msgs, i!=0 );
1150
1151    /* get peer's listening port */
1152    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1153        msgs->peer->port = htons( (uint16_t)i );
1154        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1155    }
1156
1157    /* get peer's maximum request queue size */
1158    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1159        msgs->reqq = i;
1160
1161    tr_bencFree( &val );
1162    tr_free( tmp );
1163}
1164
1165static void
1166parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1167{
1168    int loaded = 0;
1169    uint8_t * tmp = tr_new( uint8_t, msglen );
1170    tr_benc val;
1171    const tr_torrent * tor = msgs->torrent;
1172    const uint8_t * added;
1173    size_t added_len;
1174
1175    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1176
1177    if( tr_torrentAllowsPex( tor )
1178      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1179    {
1180        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1181        {
1182            const uint8_t * added_f = NULL;
1183            tr_pex *        pex;
1184            size_t          i, n;
1185            size_t          added_f_len = 0;
1186            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1187            pex =
1188                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1189                                        &n );
1190            for( i = 0; i < n; ++i )
1191                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1192                                  TR_PEER_FROM_PEX, pex + i );
1193            tr_free( pex );
1194        }
1195       
1196        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1197        {
1198            const uint8_t * added_f = NULL;
1199            tr_pex *        pex;
1200            size_t          i, n;
1201            size_t          added_f_len = 0;
1202            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1203            pex =
1204                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1205                                         &n );
1206            for( i = 0; i < n; ++i )
1207                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1208                                  TR_PEER_FROM_PEX, pex + i );
1209            tr_free( pex );
1210        }
1211       
1212    }
1213
1214    if( loaded )
1215        tr_bencFree( &val );
1216    tr_free( tmp );
1217}
1218
1219static void sendPex( tr_peermsgs * msgs );
1220
1221static void
1222parseLtep( tr_peermsgs *     msgs,
1223           int               msglen,
1224           struct evbuffer * inbuf )
1225{
1226    uint8_t ltep_msgid;
1227
1228    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1229    msglen--;
1230
1231    if( ltep_msgid == LTEP_HANDSHAKE )
1232    {
1233        dbgmsg( msgs, "got ltep handshake" );
1234        parseLtepHandshake( msgs, msglen, inbuf );
1235        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1236        {
1237            sendLtepHandshake( msgs );
1238            sendPex( msgs );
1239        }
1240    }
1241    else if( ltep_msgid == TR_LTEP_PEX )
1242    {
1243        dbgmsg( msgs, "got ut pex" );
1244        msgs->peerSupportsPex = 1;
1245        parseUtPex( msgs, msglen, inbuf );
1246    }
1247    else
1248    {
1249        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1250        evbuffer_drain( inbuf, msglen );
1251    }
1252}
1253
1254static int
1255readBtLength( tr_peermsgs *     msgs,
1256              struct evbuffer * inbuf,
1257              size_t            inlen )
1258{
1259    uint32_t len;
1260
1261    if( inlen < sizeof( len ) )
1262        return READ_LATER;
1263
1264    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1265
1266    if( len == 0 ) /* peer sent us a keepalive message */
1267        dbgmsg( msgs, "got KeepAlive" );
1268    else
1269    {
1270        msgs->incoming.length = len;
1271        msgs->state = AWAITING_BT_ID;
1272    }
1273
1274    return READ_NOW;
1275}
1276
1277static int readBtMessage( tr_peermsgs *     msgs,
1278                          struct evbuffer * inbuf,
1279                          size_t            inlen );
1280
1281static int
1282readBtId( tr_peermsgs *     msgs,
1283          struct evbuffer * inbuf,
1284          size_t            inlen )
1285{
1286    uint8_t id;
1287
1288    if( inlen < sizeof( uint8_t ) )
1289        return READ_LATER;
1290
1291    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1292    msgs->incoming.id = id;
1293
1294    if( id == BT_PIECE )
1295    {
1296        msgs->state = AWAITING_BT_PIECE;
1297        return READ_NOW;
1298    }
1299    else if( msgs->incoming.length != 1 )
1300    {
1301        msgs->state = AWAITING_BT_MESSAGE;
1302        return READ_NOW;
1303    }
1304    else return readBtMessage( msgs, inbuf, inlen - 1 );
1305}
1306
1307static void
1308updatePeerProgress( tr_peermsgs * msgs )
1309{
1310    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1311    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1312    updateFastSet( msgs );
1313    updateInterest( msgs );
1314    firePeerProgress( msgs );
1315}
1316
1317static void
1318peerMadeRequest( tr_peermsgs *               msgs,
1319                 const struct peer_request * req )
1320{
1321    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1322    const int reqIsValid = requestIsValid( msgs, req );
1323    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1324    const int peerIsChoked = msgs->peer->peerIsChoked;
1325
1326    int allow = FALSE;
1327
1328    if( !reqIsValid )
1329        dbgmsg( msgs, "rejecting an invalid request." );
1330    else if( !clientHasPiece )
1331        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1332    else if( peerIsChoked )
1333        dbgmsg( msgs, "rejecting request from choked peer" );
1334    else
1335        allow = TRUE;
1336
1337    if( allow )
1338        reqListAppend( &msgs->peerAskedFor, req );
1339    else if( fext )
1340        protocolSendReject( msgs, req );
1341}
1342
1343static int
1344messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1345{
1346    switch( id )
1347    {
1348        case BT_CHOKE:
1349        case BT_UNCHOKE:
1350        case BT_INTERESTED:
1351        case BT_NOT_INTERESTED:
1352        case BT_FEXT_HAVE_ALL:
1353        case BT_FEXT_HAVE_NONE:
1354            return len == 1;
1355
1356        case BT_HAVE:
1357        case BT_FEXT_SUGGEST:
1358        case BT_FEXT_ALLOWED_FAST:
1359            return len == 5;
1360
1361        case BT_BITFIELD:
1362            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1363
1364        case BT_REQUEST:
1365        case BT_CANCEL:
1366        case BT_FEXT_REJECT:
1367            return len == 13;
1368
1369        case BT_PIECE:
1370            return len > 9 && len <= 16393;
1371
1372        case BT_PORT:
1373            return len == 3;
1374
1375        case BT_LTEP:
1376            return len >= 2;
1377
1378        default:
1379            return FALSE;
1380    }
1381}
1382
1383static int clientGotBlock( tr_peermsgs *               msgs,
1384                           const uint8_t *             block,
1385                           const struct peer_request * req );
1386
1387static int
1388readBtPiece( tr_peermsgs      * msgs,
1389             struct evbuffer  * inbuf,
1390             size_t             inlen,
1391             size_t           * setme_piece_bytes_read )
1392{
1393    struct peer_request * req = &msgs->incoming.blockReq;
1394
1395    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1396    dbgmsg( msgs, "In readBtPiece" );
1397
1398    if( !req->length )
1399    {
1400        if( inlen < 8 )
1401            return READ_LATER;
1402
1403        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1404        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1405        req->length = msgs->incoming.length - 9;
1406        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1407        return READ_NOW;
1408    }
1409    else
1410    {
1411        int err;
1412
1413        /* read in another chunk of data */
1414        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1415        size_t n = MIN( nLeft, inlen );
1416        uint8_t * buf = tr_new( uint8_t, n );
1417        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1418        tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, n );
1419        evbuffer_add( msgs->incoming.block, buf, n );
1420        fireClientGotData( msgs, n, TRUE );
1421        *setme_piece_bytes_read += n;
1422        tr_free( buf );
1423        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1424               n, req->index, req->offset, req->length,
1425               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1426        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1427            return READ_LATER;
1428
1429        /* we've got the whole block ... process it */
1430        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1431
1432        /* cleanup */
1433        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1434        req->length = 0;
1435        msgs->state = AWAITING_BT_LENGTH;
1436        if( !err )
1437            return READ_NOW;
1438        else {
1439            fireError( msgs, err );
1440            return READ_ERR;
1441        }
1442    }
1443}
1444
1445static int
1446readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1447{
1448    uint32_t      ui32;
1449    uint32_t      msglen = msgs->incoming.length;
1450    const uint8_t id = msgs->incoming.id;
1451    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1452    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1453
1454    --msglen; /* id length */
1455
1456    if( inlen < msglen )
1457        return READ_LATER;
1458
1459    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1460
1461    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1462    {
1463        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1464        fireError( msgs, EMSGSIZE );
1465        return READ_ERR;
1466    }
1467
1468    switch( id )
1469    {
1470        case BT_CHOKE:
1471            dbgmsg( msgs, "got Choke" );
1472            msgs->peer->clientIsChoked = 1;
1473            if( !fext )
1474                cancelAllRequestsToPeer( msgs, FALSE );
1475            break;
1476
1477        case BT_UNCHOKE:
1478            dbgmsg( msgs, "got Unchoke" );
1479            msgs->peer->clientIsChoked = 0;
1480            fireNeedReq( msgs );
1481            break;
1482
1483        case BT_INTERESTED:
1484            dbgmsg( msgs, "got Interested" );
1485            msgs->peer->peerIsInterested = 1;
1486            break;
1487
1488        case BT_NOT_INTERESTED:
1489            dbgmsg( msgs, "got Not Interested" );
1490            msgs->peer->peerIsInterested = 0;
1491            break;
1492
1493        case BT_HAVE:
1494            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1495            dbgmsg( msgs, "got Have: %u", ui32 );
1496            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1497                fireError( msgs, ERANGE );
1498                return READ_ERR;
1499            }
1500            updatePeerProgress( msgs );
1501            tr_rcTransferred( msgs->torrent->swarmSpeed,
1502                              msgs->torrent->info.pieceSize );
1503            break;
1504
1505        case BT_BITFIELD:
1506        {
1507            dbgmsg( msgs, "got a bitfield" );
1508            msgs->peerSentBitfield = 1;
1509            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1510            updatePeerProgress( msgs );
1511            fireNeedReq( msgs );
1512            break;
1513        }
1514
1515        case BT_REQUEST:
1516        {
1517            struct peer_request r;
1518            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1519            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1520            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1521            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1522            peerMadeRequest( msgs, &r );
1523            break;
1524        }
1525
1526        case BT_CANCEL:
1527        {
1528            struct peer_request r;
1529            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1530            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1531            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1532            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1533            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1534                protocolSendReject( msgs, &r );
1535            break;
1536        }
1537
1538        case BT_PIECE:
1539            assert( 0 ); /* handled elsewhere! */
1540            break;
1541
1542        case BT_PORT:
1543            dbgmsg( msgs, "Got a BT_PORT" );
1544            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1545            break;
1546
1547        case BT_FEXT_SUGGEST:
1548            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1549            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1550            if( fext )
1551                fireClientGotSuggest( msgs, ui32 );
1552            else {
1553                fireError( msgs, EMSGSIZE );
1554                return READ_ERR;
1555            }
1556            break;
1557
1558        case BT_FEXT_ALLOWED_FAST:
1559            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1560            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1561            if( fext )
1562                fireClientGotAllowedFast( msgs, ui32 );
1563            else {
1564                fireError( msgs, EMSGSIZE );
1565                return READ_ERR;
1566            }
1567            break;
1568
1569        case BT_FEXT_HAVE_ALL:
1570            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1571            if( fext ) {
1572                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1573                updatePeerProgress( msgs );
1574            } else {
1575                fireError( msgs, EMSGSIZE );
1576                return READ_ERR;
1577            }
1578            break;
1579
1580        case BT_FEXT_HAVE_NONE:
1581            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1582            if( fext ) {
1583                tr_bitfieldClear( msgs->peer->have );
1584                updatePeerProgress( msgs );
1585            } else {
1586                fireError( msgs, EMSGSIZE );
1587                return READ_ERR;
1588            }
1589            break;
1590
1591        case BT_FEXT_REJECT:
1592        {
1593            struct peer_request r;
1594            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1595            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1596            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1597            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1598            if( fext )
1599                reqListRemove( &msgs->clientAskedFor, &r );
1600            else {
1601                fireError( msgs, EMSGSIZE );
1602                return READ_ERR;
1603            }
1604            break;
1605        }
1606
1607        case BT_LTEP:
1608            dbgmsg( msgs, "Got a BT_LTEP" );
1609            parseLtep( msgs, msglen, inbuf );
1610            break;
1611
1612        default:
1613            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1614            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1615            break;
1616    }
1617
1618    assert( msglen + 1 == msgs->incoming.length );
1619    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1620
1621    msgs->state = AWAITING_BT_LENGTH;
1622    return READ_NOW;
1623}
1624
1625static void
1626decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1627{
1628    tr_torrent * tor = msgs->torrent;
1629
1630    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1631}
1632
1633static void
1634clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1635{
1636    decrementDownloadedCount( msgs, req->length );
1637}
1638
1639static void
1640addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1641{
1642    if( !msgs->peer->blame )
1643         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1644    tr_bitfieldAdd( msgs->peer->blame, index );
1645}
1646
1647/* returns 0 on success, or an errno on failure */
1648static int
1649clientGotBlock( tr_peermsgs *               msgs,
1650                const uint8_t *             data,
1651                const struct peer_request * req )
1652{
1653    int err;
1654    tr_torrent * tor = msgs->torrent;
1655    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1656
1657    assert( msgs );
1658    assert( req );
1659
1660    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1661        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1662                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1663        return EMSGSIZE;
1664    }
1665
1666    /* save the block */
1667    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1668
1669    /**
1670    *** Remove the block from our `we asked for this' list
1671    **/
1672
1673    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1674        clientGotUnwantedBlock( msgs, req );
1675        dbgmsg( msgs, "we didn't ask for this message..." );
1676        return 0;
1677    }
1678
1679    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1680            msgs->clientAskedFor.count );
1681
1682    /**
1683    *** Error checks
1684    **/
1685
1686    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1687        dbgmsg( msgs, "we have this block already..." );
1688        clientGotUnwantedBlock( msgs, req );
1689        return 0;
1690    }
1691
1692    /**
1693    ***  Save the block
1694    **/
1695
1696    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1697        return err;
1698
1699    addPeerToBlamefield( msgs, req->index );
1700    fireGotBlock( msgs, req );
1701    return 0;
1702}
1703
1704static int peerPulse( void * vmsgs );
1705
1706static void
1707didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1708{
1709    tr_peermsgs * msgs = vmsgs;
1710    firePeerGotData( msgs, bytesWritten, wasPieceData );
1711    peerPulse( msgs );
1712}
1713
1714static ReadState
1715canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1716{
1717    ReadState         ret;
1718    tr_peermsgs *     msgs = vmsgs;
1719    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1720    const size_t      inlen = EVBUFFER_LENGTH( in );
1721
1722    if( !inlen )
1723    {
1724        ret = READ_LATER;
1725    }
1726    else if( msgs->state == AWAITING_BT_PIECE )
1727    {
1728        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1729    }
1730    else switch( msgs->state )
1731    {
1732        case AWAITING_BT_LENGTH:
1733            ret = readBtLength ( msgs, in, inlen ); break;
1734
1735        case AWAITING_BT_ID:
1736            ret = readBtId     ( msgs, in, inlen ); break;
1737
1738        case AWAITING_BT_MESSAGE:
1739            ret = readBtMessage( msgs, in, inlen ); break;
1740
1741        default:
1742            assert( 0 );
1743    }
1744
1745    /* log the raw data that was read */
1746    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1747        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1748
1749    return ret;
1750}
1751
1752/**
1753***
1754**/
1755
1756static int
1757ratePulse( void * vmsgs )
1758{
1759    tr_peermsgs * msgs = vmsgs;
1760    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1761    const int seconds = 10;
1762    const int floor = 8;
1763    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1764
1765    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1766
1767    if( msgs->reqq > 0 )
1768        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1769
1770    return TRUE;
1771}
1772
1773static size_t
1774fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1775{
1776    size_t bytesWritten = 0;
1777    struct peer_request req;
1778    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1779    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1780
1781    /**
1782    ***  Protocol messages
1783    **/
1784
1785    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1786    {
1787        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1788        msgs->outMessagesBatchedAt = now;
1789    }
1790    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1791    {
1792        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1793        /* flush the protocol messages */
1794        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1795        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1796        msgs->clientSentAnythingAt = now;
1797        msgs->outMessagesBatchedAt = 0;
1798        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1799        bytesWritten +=  len;
1800    }
1801
1802    /**
1803    ***  Blocks
1804    **/
1805
1806    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1807        && popNextRequest( msgs, &req ) )
1808    {
1809        if( requestIsValid( msgs, &req )
1810            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1811        {
1812            /* send a block */
1813            uint8_t * buf = tr_new( uint8_t, req.length );
1814            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1815            if( err ) {
1816                fireError( msgs, err );
1817                bytesWritten = 0;
1818                msgs = NULL;
1819            } else {
1820                tr_peerIo * io = msgs->peer->io;
1821                struct evbuffer * out = evbuffer_new( );
1822                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1823                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1824                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1825                tr_peerIoWriteUint32( io, out, req.index );
1826                tr_peerIoWriteUint32( io, out, req.offset );
1827                tr_peerIoWriteBytes ( io, out, buf, req.length );
1828                tr_peerIoWriteBuf( io, out, TRUE );
1829                bytesWritten += EVBUFFER_LENGTH( out );
1830                evbuffer_free( out );
1831                msgs->clientSentAnythingAt = now;
1832            }
1833            tr_free( buf );
1834        }
1835        else if( fext ) /* peer needs a reject message */
1836        {
1837            protocolSendReject( msgs, &req );
1838        }
1839    }
1840
1841    /**
1842    ***  Keepalive
1843    **/
1844
1845    if( ( msgs != NULL )
1846        && ( msgs->clientSentAnythingAt != 0 )
1847        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1848    {
1849        dbgmsg( msgs, "sending a keepalive message" );
1850        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1851        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1852    }
1853
1854    return bytesWritten;
1855}
1856
1857static int
1858peerPulse( void * vmsgs )
1859{
1860    tr_peermsgs * msgs = vmsgs;
1861    const time_t  now = time( NULL );
1862
1863    ratePulse( msgs );
1864
1865    pumpRequestQueue( msgs, now );
1866    expireOldRequests( msgs, now );
1867
1868    for( ;; )
1869        if( fillOutputBuffer( msgs, now ) < 1 )
1870            break;
1871
1872    return TRUE; /* loop forever */
1873}
1874
1875void
1876tr_peerMsgsPulse( tr_peermsgs * msgs )
1877{
1878    if( msgs != NULL )
1879        peerPulse( msgs );
1880}
1881
1882static void
1883gotError( tr_peerIo  * io UNUSED,
1884          short        what,
1885          void       * vmsgs )
1886{
1887    if( what & EVBUFFER_TIMEOUT )
1888        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1889    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1890        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1891               what, errno, tr_strerror( errno ) );
1892    fireError( vmsgs, ENOTCONN );
1893}
1894
1895static void
1896sendBitfield( tr_peermsgs * msgs )
1897{
1898    struct evbuffer * out = msgs->outMessages;
1899    tr_bitfield *     field;
1900    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1901    size_t            i;
1902    size_t            lazyCount = 0;
1903
1904    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1905
1906    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1907    {
1908        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1909            speed over a truly random sample -- let's limit the pool size to
1910            the first 1000 pieces so large torrents don't bog things down */
1911        size_t poolSize;
1912        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1913        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1914
1915        /* build the pool */
1916        for( i=poolSize=0; i<maxPoolSize; ++i )
1917            if( tr_bitfieldHas( field, i ) )
1918                pool[poolSize++] = i;
1919
1920        /* pull random piece indices from the pool */
1921        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1922        {
1923            const int pos = tr_cryptoWeakRandInt( poolSize );
1924            const tr_piece_index_t piece = pool[pos];
1925            tr_bitfieldRem( field, piece );
1926            lazyPieces[lazyCount++] = piece;
1927            pool[pos] = pool[--poolSize];
1928        }
1929
1930        /* cleanup */
1931        tr_free( pool );
1932    }
1933
1934    tr_peerIoWriteUint32( msgs->peer->io, out,
1935                          sizeof( uint8_t ) + field->byteCount );
1936    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1937    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1938    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1939            EVBUFFER_LENGTH( out ) );
1940    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1941
1942    for( i = 0; i < lazyCount; ++i )
1943        protocolSendHave( msgs, lazyPieces[i] );
1944
1945    tr_bitfieldFree( field );
1946}
1947
1948static void
1949tellPeerWhatWeHave( tr_peermsgs * msgs )
1950{
1951    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1952
1953    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1954    {
1955        protocolSendHaveAll( msgs );
1956    }
1957    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1958    {
1959        protocolSendHaveNone( msgs );
1960    }
1961    else
1962    {
1963        sendBitfield( msgs );
1964    }
1965}
1966
1967/**
1968***
1969**/
1970
1971/* some peers give us error messages if we send
1972   more than this many peers in a single pex message
1973   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1974#define MAX_PEX_ADDED 50
1975#define MAX_PEX_DROPPED 50
1976
1977typedef struct
1978{
1979    tr_pex *  added;
1980    tr_pex *  dropped;
1981    tr_pex *  elements;
1982    int       addedCount;
1983    int       droppedCount;
1984    int       elementCount;
1985}
1986PexDiffs;
1987
1988static void
1989pexAddedCb( void * vpex,
1990            void * userData )
1991{
1992    PexDiffs * diffs = userData;
1993    tr_pex *   pex = vpex;
1994
1995    if( diffs->addedCount < MAX_PEX_ADDED )
1996    {
1997        diffs->added[diffs->addedCount++] = *pex;
1998        diffs->elements[diffs->elementCount++] = *pex;
1999    }
2000}
2001
2002static void
2003pexDroppedCb( void * vpex,
2004              void * userData )
2005{
2006    PexDiffs * diffs = userData;
2007    tr_pex *   pex = vpex;
2008
2009    if( diffs->droppedCount < MAX_PEX_DROPPED )
2010    {
2011        diffs->dropped[diffs->droppedCount++] = *pex;
2012    }
2013}
2014
2015static void
2016pexElementCb( void * vpex,
2017              void * userData )
2018{
2019    PexDiffs * diffs = userData;
2020    tr_pex *   pex = vpex;
2021
2022    diffs->elements[diffs->elementCount++] = *pex;
2023}
2024
2025static void
2026sendPex( tr_peermsgs * msgs )
2027{
2028    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2029    {
2030        PexDiffs diffs;
2031        PexDiffs diffs6;
2032        tr_pex * newPex = NULL;
2033        tr_pex * newPex6 = NULL;
2034        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2035                                                 msgs->torrent->info.hash,
2036                                                 &newPex, TR_AF_INET );
2037        const int newCount6 = tr_peerMgrGetPeers( msgs->session->peerMgr,
2038                                                  msgs->torrent->info.hash,
2039                                                  &newPex6, TR_AF_INET6 );
2040
2041        /* build the diffs */
2042        diffs.added = tr_new( tr_pex, newCount );
2043        diffs.addedCount = 0;
2044        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2045        diffs.droppedCount = 0;
2046        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2047        diffs.elementCount = 0;
2048        tr_set_compare( msgs->pex, msgs->pexCount,
2049                        newPex, newCount,
2050                        tr_pexCompare, sizeof( tr_pex ),
2051                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2052        diffs6.added = tr_new( tr_pex, newCount6 );
2053        diffs6.addedCount = 0;
2054        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2055        diffs6.droppedCount = 0;
2056        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2057        diffs6.elementCount = 0;
2058        tr_set_compare( msgs->pex6, msgs->pexCount6,
2059                        newPex6, newCount6,
2060                        tr_pexCompare, sizeof( tr_pex ),
2061                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2062        dbgmsg(
2063            msgs,
2064            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2065            msgs->pexCount, newCount + newCount6,
2066            diffs.addedCount + diffs6.addedCount,
2067            diffs.droppedCount + diffs6.droppedCount );
2068
2069        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2070            !diffs6.droppedCount )
2071        {
2072            tr_free( diffs.elements );
2073            tr_free( diffs6.elements );
2074        }
2075        else
2076        {
2077            int  i;
2078            tr_benc val;
2079            char * benc;
2080            int bencLen;
2081            uint8_t * tmp, *walk;
2082            struct evbuffer * out = msgs->outMessages;
2083
2084            /* update peer */
2085            tr_free( msgs->pex );
2086            msgs->pex = diffs.elements;
2087            msgs->pexCount = diffs.elementCount;
2088            tr_free( msgs->pex6 );
2089            msgs->pex6 = diffs6.elements;
2090            msgs->pexCount6 = diffs6.elementCount;
2091
2092            /* build the pex payload */
2093            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2094                                         * speed vs. likelihood? */
2095
2096            /* "added" */
2097            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2098            for( i = 0; i < diffs.addedCount; ++i )
2099            {
2100                tr_suspectAddress( &diffs.added[i].addr, "pex" );
2101                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2102                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2103            }
2104            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2105            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2106            tr_free( tmp );
2107
2108            /* "added.f" */
2109            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2110            for( i = 0; i < diffs.addedCount; ++i )
2111                *walk++ = diffs.added[i].flags;
2112            assert( ( walk - tmp ) == diffs.addedCount );
2113            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2114            tr_free( tmp );
2115
2116            /* "dropped" */
2117            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2118            for( i = 0; i < diffs.droppedCount; ++i )
2119            {
2120                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2121                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2122            }
2123            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2124            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2125            tr_free( tmp );
2126           
2127            /* "added6" */
2128            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2129            for( i = 0; i < diffs6.addedCount; ++i )
2130            {
2131                tr_suspectAddress( &diffs6.added[i].addr, "pex6" );
2132                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2133                walk += 16;
2134                memcpy( walk, &diffs6.added[i].port, 2 );
2135                walk += 2;
2136            }
2137            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2138            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2139            tr_free( tmp );
2140           
2141            /* "added6.f" */
2142            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2143            for( i = 0; i < diffs6.addedCount; ++i )
2144                *walk++ = diffs6.added[i].flags;
2145            assert( ( walk - tmp ) == diffs6.addedCount );
2146            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2147            tr_free( tmp );
2148           
2149            /* "dropped6" */
2150            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2151            for( i = 0; i < diffs6.droppedCount; ++i )
2152            {
2153                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2154                walk += 16;
2155                memcpy( walk, &diffs6.dropped[i].port, 2 );
2156                walk += 2;
2157            }
2158            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2159            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2160            tr_free( tmp );
2161
2162            /* write the pex message */
2163            benc = tr_bencSave( &val, &bencLen );
2164            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2165            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2166            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2167            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2168            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2169            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2170
2171            tr_free( benc );
2172            tr_bencFree( &val );
2173        }
2174
2175        /* cleanup */
2176        tr_free( diffs.added );
2177        tr_free( diffs.dropped );
2178        tr_free( newPex );
2179        tr_free( diffs6.added );
2180        tr_free( diffs6.dropped );
2181        tr_free( newPex6 );
2182
2183        msgs->clientSentPexAt = time( NULL );
2184    }
2185}
2186
2187static int
2188pexPulse( void * vpeer )
2189{
2190    sendPex( vpeer );
2191    return TRUE;
2192}
2193
2194/**
2195***
2196**/
2197
2198tr_peermsgs*
2199tr_peerMsgsNew( struct tr_torrent * torrent,
2200                struct tr_peer    * peer,
2201                tr_delivery_func    func,
2202                void              * userData,
2203                tr_publisher_tag  * setme )
2204{
2205    tr_peermsgs * m;
2206
2207    assert( peer );
2208    assert( peer->io );
2209
2210    m = tr_new0( tr_peermsgs, 1 );
2211    m->publisher = tr_publisherNew( );
2212    m->peer = peer;
2213    m->session = torrent->session;
2214    m->torrent = torrent;
2215    m->peer->clientIsChoked = 1;
2216    m->peer->peerIsChoked = 1;
2217    m->peer->clientIsInterested = 0;
2218    m->peer->peerIsInterested = 0;
2219    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2220    m->state = AWAITING_BT_LENGTH;
2221    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2222    m->outMessages = evbuffer_new( );
2223    m->outMessagesBatchedAt = 0;
2224    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2225    m->incoming.block = evbuffer_new( );
2226    m->peerAskedFor = REQUEST_LIST_INIT;
2227    m->clientAskedFor = REQUEST_LIST_INIT;
2228    m->clientWillAskFor = REQUEST_LIST_INIT;
2229    peer->msgs = m;
2230
2231    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2232
2233    if( tr_peerIoSupportsLTEP( peer->io ) )
2234        sendLtepHandshake( m );
2235
2236    tellPeerWhatWeHave( m );
2237
2238    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2239    ratePulse( m );
2240
2241    return m;
2242}
2243
2244void
2245tr_peerMsgsFree( tr_peermsgs* msgs )
2246{
2247    if( msgs )
2248    {
2249        tr_timerFree( &msgs->pexTimer );
2250        tr_publisherFree( &msgs->publisher );
2251        reqListClear( &msgs->clientWillAskFor );
2252        reqListClear( &msgs->clientAskedFor );
2253        reqListClear( &msgs->peerAskedFor );
2254
2255        evbuffer_free( msgs->incoming.block );
2256        evbuffer_free( msgs->outMessages );
2257        tr_free( msgs->pex6 );
2258        tr_free( msgs->pex );
2259
2260        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2261        tr_free( msgs );
2262    }
2263}
2264
2265void
2266tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2267                        tr_publisher_tag tag )
2268{
2269    tr_publisherUnsubscribe( peer->publisher, tag );
2270}
2271
Note: See TracBrowser for help on using the repository browser.