source: trunk/libtransmission/peer-msgs.c @ 7447

Last change on this file since 7447 was 7447, checked in by charles, 12 years ago

(trunk libT) remove tr_peermsgs.minActiveRequests based on wereHamster's feedback

  • Property svn:keywords set to Date Rev Author Id
File size: 64.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7447 2008-12-21 18:31:28Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56
57    BT_FEXT_SUGGEST         = 13,
58    BT_FEXT_HAVE_ALL        = 14,
59    BT_FEXT_HAVE_NONE       = 15,
60    BT_FEXT_REJECT          = 16,
61    BT_FEXT_ALLOWED_FAST    = 17,
62
63    BT_LTEP                 = 20,
64
65    LTEP_HANDSHAKE          = 0,
66
67    TR_LTEP_PEX             = 1,
68
69
70
71    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
72
73    /* idle seconds before we send a keepalive */
74    KEEPALIVE_INTERVAL_SECS = 100,
75
76    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
77
78    MAX_QUEUE_SIZE          = ( 100 ),
79
80    /* how long an unsent request can stay queued before it's returned
81       back to the peer-mgr's pool of requests */
82    QUEUED_REQUEST_TTL_SECS = 20,
83
84    /* how long a sent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    SENT_REQUEST_TTL_SECS = 240,
87
88    /* used in lowering the outMessages queue period */
89    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
90    HIGH_PRIORITY_INTERVAL_SECS = 2,
91    LOW_PRIORITY_INTERVAL_SECS = 20,
92
93    /* number of pieces to remove from the bitfield when
94     * lazy bitfields are turned on */
95    LAZY_PIECE_COUNT = 26,
96
97    /* number of pieces we'll allow in our fast set */
98    MAX_FAST_SET_SIZE = 3
99};
100
101/**
102***  REQUEST MANAGEMENT
103**/
104
105enum
106{
107    AWAITING_BT_LENGTH,
108    AWAITING_BT_ID,
109    AWAITING_BT_MESSAGE,
110    AWAITING_BT_PIECE
111};
112
113struct peer_request
114{
115    uint32_t    index;
116    uint32_t    offset;
117    uint32_t    length;
118    time_t      time_requested;
119};
120
121static int
122compareRequest( const void * va, const void * vb )
123{
124    const struct peer_request * a = va;
125    const struct peer_request * b = vb;
126
127    if( a->index != b->index )
128        return a->index < b->index ? -1 : 1;
129
130    if( a->offset != b->offset )
131        return a->offset < b->offset ? -1 : 1;
132
133    if( a->length != b->length )
134        return a->length < b->length ? -1 : 1;
135
136    return 0;
137}
138
139struct request_list
140{
141    uint16_t               count;
142    uint16_t               max;
143    struct peer_request *  requests;
144};
145
146static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
147
148static void
149reqListReserve( struct request_list * list,
150                uint16_t              max )
151{
152    if( list->max < max )
153    {
154        list->max = max;
155        list->requests = tr_renew( struct peer_request,
156                                   list->requests,
157                                   list->max );
158    }
159}
160
161static void
162reqListClear( struct request_list * list )
163{
164    tr_free( list->requests );
165    *list = REQUEST_LIST_INIT;
166}
167
168static void
169reqListCopy( struct request_list * dest, const struct request_list * src )
170{
171    dest->count = dest->max = src->count;
172    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
173}
174
175static void
176reqListRemoveOne( struct request_list * list,
177                  int                   i )
178{
179    assert( 0 <= i && i < list->count );
180
181    memmove( &list->requests[i],
182             &list->requests[i + 1],
183             sizeof( struct peer_request ) * ( --list->count - i ) );
184}
185
186static void
187reqListAppend( struct request_list *       list,
188               const struct peer_request * req )
189{
190    if( ++list->count >= list->max )
191        reqListReserve( list, list->max + 8 );
192
193    list->requests[list->count - 1] = *req;
194}
195
196static int
197reqListPop( struct request_list * list,
198            struct peer_request * setme )
199{
200    int success;
201
202    if( !list->count )
203        success = FALSE;
204    else {
205        *setme = list->requests[0];
206        reqListRemoveOne( list, 0 );
207        success = TRUE;
208    }
209
210    return success;
211}
212
213static int
214reqListFind( struct request_list *       list,
215             const struct peer_request * key )
216{
217    uint16_t i;
218
219    for( i = 0; i < list->count; ++i )
220        if( !compareRequest( key, list->requests + i ) )
221            return i;
222
223    return -1;
224}
225
226static int
227reqListRemove( struct request_list *       list,
228               const struct peer_request * key )
229{
230    int success;
231    const int i = reqListFind( list, key );
232
233    if( i < 0 )
234        success = FALSE;
235    else {
236        reqListRemoveOne( list, i );
237        success = TRUE;
238    }
239
240    return success;
241}
242
243/**
244***
245**/
246
247/* this is raw, unchanged data from the peer regarding
248 * the current message that it's sending us. */
249struct tr_incoming
250{
251    uint8_t                id;
252    uint32_t               length; /* includes the +1 for id length */
253    struct peer_request    blockReq; /* metadata for incoming blocks */
254    struct evbuffer *      block; /* piece data for incoming blocks */
255};
256
257/**
258 * Low-level communication state information about a connected peer.
259 *
260 * This structure remembers the low-level protocol states that we're
261 * in with this peer, such as active requests, pex messages, and so on.
262 * Its fields are all private to peer-msgs.c.
263 *
264 * Data not directly involved with sending & receiving messages is
265 * stored in tr_peer, where it can be accessed by both peermsgs and
266 * the peer manager.
267 *
268 * @see struct peer_atom
269 * @see tr_peer
270 */
271struct tr_peermsgs
272{
273    tr_bool         peerSentBitfield;
274    tr_bool         peerSupportsPex;
275    tr_bool         clientSentLtepHandshake;
276    tr_bool         peerSentLtepHandshake;
277    tr_bool         haveFastSet;
278
279    uint8_t         state;
280    uint8_t         ut_pex_id;
281    uint16_t        pexCount;
282    uint16_t        pexCount6;
283    uint16_t        maxActiveRequests;
284
285    size_t                 fastsetSize;
286    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
287
288    /* how long the outMessages batch should be allowed to grow before
289     * it's flushed -- some messages (like requests >:) should be sent
290     * very quickly; others aren't as urgent. */
291    int                    outMessagesBatchPeriod;
292
293    tr_peer *              peer;
294
295    tr_session *           session;
296    tr_torrent *           torrent;
297
298    tr_publisher_t *       publisher;
299
300    struct evbuffer *      outMessages; /* all the non-piece messages */
301
302    struct request_list    peerAskedFor;
303    struct request_list    clientAskedFor;
304    struct request_list    clientWillAskFor;
305
306    tr_timer             * pexTimer;
307    tr_pex               * pex;
308    tr_pex               * pex6;
309
310    time_t                 clientSentPexAt;
311    time_t                 clientSentAnythingAt;
312
313    /* when we started batching the outMessages */
314    time_t                outMessagesBatchedAt;
315
316    struct tr_incoming    incoming;
317
318    /* if the peer supports the Extension Protocol in BEP 10 and
319       supplied a reqq argument, it's stored here.  otherwise the
320       value is zero and should be ignored. */
321    int64_t               reqq;
322};
323
324/**
325***
326**/
327
328static void
329myDebug( const char * file, int line,
330         const struct tr_peermsgs * msgs,
331         const char * fmt, ... )
332{
333    FILE * fp = tr_getLog( );
334
335    if( fp )
336    {
337        va_list           args;
338        char              timestr[64];
339        struct evbuffer * buf = evbuffer_new( );
340        char *            base = tr_basename( file );
341
342        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
343                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
344                             msgs->torrent->info.name,
345                             tr_peerIoGetAddrStr( msgs->peer->io ),
346                             msgs->peer->client );
347        va_start( args, fmt );
348        evbuffer_add_vprintf( buf, fmt, args );
349        va_end( args );
350        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
351        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
352
353        tr_free( base );
354        evbuffer_free( buf );
355    }
356}
357
358#define dbgmsg( msgs, ... ) \
359    do { \
360        if( tr_deepLoggingIsActive( ) ) \
361            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
362    } while( 0 )
363
364/**
365***
366**/
367
368static void
369pokeBatchPeriod( tr_peermsgs * msgs,
370                 int           interval )
371{
372    if( msgs->outMessagesBatchPeriod > interval )
373    {
374        msgs->outMessagesBatchPeriod = interval;
375        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
376    }
377}
378
379static void
380dbgOutMessageLen( tr_peermsgs * msgs )
381{
382    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
383}
384
385static void
386protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
387{
388    tr_peerIo       * io  = msgs->peer->io;
389    struct evbuffer * out = msgs->outMessages;
390
391    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
392
393    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
394    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
395    tr_peerIoWriteUint32( io, out, req->index );
396    tr_peerIoWriteUint32( io, out, req->offset );
397    tr_peerIoWriteUint32( io, out, req->length );
398
399    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
400    dbgOutMessageLen( msgs );
401}
402
403static void
404protocolSendRequest( tr_peermsgs *               msgs,
405                     const struct peer_request * req )
406{
407    tr_peerIo       * io  = msgs->peer->io;
408    struct evbuffer * out = msgs->outMessages;
409
410    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
411    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
412    tr_peerIoWriteUint32( io, out, req->index );
413    tr_peerIoWriteUint32( io, out, req->offset );
414    tr_peerIoWriteUint32( io, out, req->length );
415
416    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
417    dbgOutMessageLen( msgs );
418    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
419}
420
421static void
422protocolSendCancel( tr_peermsgs *               msgs,
423                    const struct peer_request * req )
424{
425    tr_peerIo       * io  = msgs->peer->io;
426    struct evbuffer * out = msgs->outMessages;
427
428    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
429    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
430    tr_peerIoWriteUint32( io, out, req->index );
431    tr_peerIoWriteUint32( io, out, req->offset );
432    tr_peerIoWriteUint32( io, out, req->length );
433
434    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
435    dbgOutMessageLen( msgs );
436    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
437}
438
439static void
440protocolSendHave( tr_peermsgs * msgs,
441                  uint32_t      index )
442{
443    tr_peerIo       * io  = msgs->peer->io;
444    struct evbuffer * out = msgs->outMessages;
445
446    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
447    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
448    tr_peerIoWriteUint32( io, out, index );
449
450    dbgmsg( msgs, "sending Have %u...", index );
451    dbgOutMessageLen( msgs );
452    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
453}
454
455#if 0
456static void
457protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
458{
459    tr_peerIo       * io  = msgs->peer->io;
460    struct evbuffer * out = msgs->outMessages;
461
462    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
463
464    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
465    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
466    tr_peerIoWriteUint32( io, out, pieceIndex );
467
468    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
469    dbgOutMessageLen( msgs );
470}
471#endif
472
473static void
474protocolSendChoke( tr_peermsgs * msgs,
475                   int           choke )
476{
477    tr_peerIo       * io  = msgs->peer->io;
478    struct evbuffer * out = msgs->outMessages;
479
480    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
481    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
482
483    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
484    dbgOutMessageLen( msgs );
485    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
486}
487
488static void
489protocolSendHaveAll( tr_peermsgs * msgs )
490{
491    tr_peerIo       * io  = msgs->peer->io;
492    struct evbuffer * out = msgs->outMessages;
493
494    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
495
496    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
497    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
498
499    dbgmsg( msgs, "sending HAVE_ALL..." );
500    dbgOutMessageLen( msgs );
501    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
502}
503
504static void
505protocolSendHaveNone( tr_peermsgs * msgs )
506{
507    tr_peerIo       * io  = msgs->peer->io;
508    struct evbuffer * out = msgs->outMessages;
509
510    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
511
512    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
513    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
514
515    dbgmsg( msgs, "sending HAVE_NONE..." );
516    dbgOutMessageLen( msgs );
517    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
518}
519
520/**
521***  EVENTS
522**/
523
524static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
525
526static void
527publish( tr_peermsgs * msgs, tr_peer_event * e )
528{
529    assert( msgs->peer );
530    assert( msgs->peer->msgs == msgs );
531
532    tr_publisherPublish( msgs->publisher, msgs->peer, e );
533}
534
535static void
536fireError( tr_peermsgs * msgs, int err )
537{
538    tr_peer_event e = blankEvent;
539    e.eventType = TR_PEER_ERROR;
540    e.err = err;
541    publish( msgs, &e );
542}
543
544static void
545fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
546{
547    tr_peer_event e = blankEvent;
548    e.eventType = TR_PEER_UPLOAD_ONLY;
549    e.uploadOnly = uploadOnly;
550    publish( msgs, &e );
551}
552
553static void
554fireNeedReq( tr_peermsgs * msgs )
555{
556    tr_peer_event e = blankEvent;
557    e.eventType = TR_PEER_NEED_REQ;
558    publish( msgs, &e );
559}
560
561static void
562firePeerProgress( tr_peermsgs * msgs )
563{
564    tr_peer_event e = blankEvent;
565    e.eventType = TR_PEER_PEER_PROGRESS;
566    e.progress = msgs->peer->progress;
567    publish( msgs, &e );
568}
569
570static void
571fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
572{
573    tr_peer_event e = blankEvent;
574    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
575    e.pieceIndex = req->index;
576    e.offset = req->offset;
577    e.length = req->length;
578    publish( msgs, &e );
579}
580
581static void
582fireClientGotData( tr_peermsgs * msgs,
583                   uint32_t      length,
584                   int           wasPieceData )
585{
586    tr_peer_event e = blankEvent;
587
588    e.length = length;
589    e.eventType = TR_PEER_CLIENT_GOT_DATA;
590    e.wasPieceData = wasPieceData;
591    publish( msgs, &e );
592}
593
594static void
595fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
596{
597    tr_peer_event e = blankEvent;
598    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
599    e.pieceIndex = pieceIndex;
600    publish( msgs, &e );
601}
602
603static void
604fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
605{
606    tr_peer_event e = blankEvent;
607    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
608    e.pieceIndex = pieceIndex;
609    publish( msgs, &e );
610}
611
612static void
613firePeerGotData( tr_peermsgs  * msgs,
614                 uint32_t       length,
615                 int            wasPieceData )
616{
617    tr_peer_event e = blankEvent;
618
619    e.length = length;
620    e.eventType = TR_PEER_PEER_GOT_DATA;
621    e.wasPieceData = wasPieceData;
622
623    publish( msgs, &e );
624}
625
626static void
627fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
628{
629    tr_peer_event e = blankEvent;
630    e.eventType = TR_PEER_CANCEL;
631    e.pieceIndex = req->index;
632    e.offset = req->offset;
633    e.length = req->length;
634    publish( msgs, &e );
635}
636
637/**
638***  ALLOWED FAST SET
639***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
640**/
641
642size_t
643tr_generateAllowedSet( tr_piece_index_t * setmePieces,
644                       size_t             desiredSetSize,
645                       size_t             pieceCount,
646                       const uint8_t    * infohash,
647                       const tr_address * addr )
648{
649    size_t setSize = 0;
650
651    assert( setmePieces );
652    assert( desiredSetSize <= pieceCount );
653    assert( desiredSetSize );
654    assert( pieceCount );
655    assert( infohash );
656    assert( addr );
657
658    if( addr->type == TR_AF_INET )
659    {
660        uint8_t w[SHA_DIGEST_LENGTH + 4];
661        uint8_t x[SHA_DIGEST_LENGTH];
662
663        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
664        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
665        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
666
667        while( setSize<desiredSetSize )
668        {
669            int i;
670            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
671            {
672                size_t k;
673                uint32_t j = i * 4;                                  /* (5) */
674                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
675                uint32_t index = y % pieceCount;                     /* (7) */
676
677                for( k=0; k<setSize; ++k )                           /* (8) */
678                    if( setmePieces[k] == index )
679                        break;
680
681                if( k == setSize )
682                    setmePieces[setSize++] = index;                  /* (9) */
683            }
684
685            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
686        }
687    }
688
689    return setSize;
690}
691
692static void
693updateFastSet( tr_peermsgs * msgs UNUSED )
694{
695#if 0
696    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
697    const int peerIsNeedy = msgs->peer->progress < 0.10;
698
699    if( fext && peerIsNeedy && !msgs->haveFastSet )
700    {
701        size_t i;
702        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
703        const tr_info * inf = &msgs->torrent->info;
704        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
705
706        /* build the fast set */
707        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
708        msgs->haveFastSet = 1;
709
710        /* send it to the peer */
711        for( i=0; i<msgs->fastsetSize; ++i )
712            protocolSendAllowedFast( msgs, msgs->fastset[i] );
713    }
714#endif
715}
716
717/**
718***  INTEREST
719**/
720
721static int
722isPieceInteresting( const tr_peermsgs * msgs,
723                    tr_piece_index_t    piece )
724{
725    const tr_torrent * torrent = msgs->torrent;
726
727    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
728          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
729          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
730}
731
732/* "interested" means we'll ask for piece data if they unchoke us */
733static int
734isPeerInteresting( const tr_peermsgs * msgs )
735{
736    tr_piece_index_t    i;
737    const tr_torrent *  torrent;
738    const tr_bitfield * bitfield;
739    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
740
741    if( clientIsSeed )
742        return FALSE;
743
744    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
745        return FALSE;
746
747    torrent = msgs->torrent;
748    bitfield = tr_cpPieceBitfield( torrent->completion );
749
750    if( !msgs->peer->have )
751        return TRUE;
752
753    assert( bitfield->byteCount == msgs->peer->have->byteCount );
754
755    for( i = 0; i < torrent->info.pieceCount; ++i )
756        if( isPieceInteresting( msgs, i ) )
757            return TRUE;
758
759    return FALSE;
760}
761
762static void
763sendInterest( tr_peermsgs * msgs,
764              int           weAreInterested )
765{
766    struct evbuffer * out = msgs->outMessages;
767
768    assert( msgs );
769    assert( weAreInterested == 0 || weAreInterested == 1 );
770
771    msgs->peer->clientIsInterested = weAreInterested;
772    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
773    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
774    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
775    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
776    dbgOutMessageLen( msgs );
777}
778
779static void
780updateInterest( tr_peermsgs * msgs )
781{
782    const int i = isPeerInteresting( msgs );
783
784    if( i != msgs->peer->clientIsInterested )
785        sendInterest( msgs, i );
786    if( i )
787        fireNeedReq( msgs );
788}
789
790static int
791popNextRequest( tr_peermsgs *         msgs,
792                struct peer_request * setme )
793{
794    return reqListPop( &msgs->peerAskedFor, setme );
795}
796
797static void
798cancelAllRequestsToClient( tr_peermsgs * msgs )
799{
800    struct peer_request req;
801    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
802
803    while( popNextRequest( msgs, &req ) )
804        if( mustSendCancel )
805            protocolSendReject( msgs, &req );
806}
807
808void
809tr_peerMsgsSetChoke( tr_peermsgs * msgs,
810                     int           choke )
811{
812    const time_t now = time( NULL );
813    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
814
815    assert( msgs );
816    assert( msgs->peer );
817    assert( choke == 0 || choke == 1 );
818
819    if( msgs->peer->chokeChangedAt > fibrillationTime )
820    {
821        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
822    }
823    else if( msgs->peer->peerIsChoked != choke )
824    {
825        msgs->peer->peerIsChoked = choke;
826        if( choke )
827            cancelAllRequestsToClient( msgs );
828        protocolSendChoke( msgs, choke );
829        msgs->peer->chokeChangedAt = now;
830    }
831}
832
833/**
834***
835**/
836
837void
838tr_peerMsgsHave( tr_peermsgs * msgs,
839                 uint32_t      index )
840{
841    protocolSendHave( msgs, index );
842
843    /* since we have more pieces now, we might not be interested in this peer */
844    updateInterest( msgs );
845}
846
847/**
848***
849**/
850
851static int
852reqIsValid( const tr_peermsgs * peer,
853            uint32_t            index,
854            uint32_t            offset,
855            uint32_t            length )
856{
857    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
858}
859
860static int
861requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
862{
863    return reqIsValid( msgs, req->index, req->offset, req->length );
864}
865
866static void
867expireOldRequests( tr_peermsgs * msgs, const time_t now  )
868{
869    int i;
870    time_t oldestAllowed;
871    struct request_list tmp = REQUEST_LIST_INIT;
872    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
873    dbgmsg( msgs, "entering `expire old requests' block" );
874
875    /* cancel requests that have been queued for too long */
876    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
877    reqListCopy( &tmp, &msgs->clientWillAskFor );
878    for( i = 0; i < tmp.count; ++i ) {
879        const struct peer_request * req = &tmp.requests[i];
880        if( req->time_requested < oldestAllowed )
881            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
882    }
883    reqListClear( &tmp );
884
885    /* if the peer doesn't support "Reject Request",
886     * cancel requests that were sent too long ago. */
887    if( !fext ) {
888        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
889        reqListCopy( &tmp, &msgs->clientAskedFor );
890        for( i=0; i<tmp.count; ++i ) {
891            const struct peer_request * req = &tmp.requests[i];
892            if( req->time_requested < oldestAllowed )
893                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
894        }
895        reqListClear( &tmp );
896    }
897
898    dbgmsg( msgs, "leaving `expire old requests' block" );
899}
900
901static void
902pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
903{
904    const int           max = msgs->maxActiveRequests;
905    int                 sent = 0;
906    int                 count = msgs->clientAskedFor.count;
907    struct peer_request req;
908
909    if( msgs->peer->clientIsChoked )
910        return;
911    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
912        return;
913
914    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
915    {
916        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
917
918        assert( requestIsValid( msgs, &req ) );
919        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
920
921        /* don't ask for it if we've already got it... this block may have
922         * come in from a different peer after we cancelled a request for it */
923        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
924        {
925            protocolSendRequest( msgs, &req );
926            req.time_requested = now;
927            reqListAppend( &msgs->clientAskedFor, &req );
928
929            ++count;
930            ++sent;
931        }
932    }
933
934    if( sent )
935        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
936                sent,
937                msgs->clientAskedFor.count,
938                msgs->clientWillAskFor.count );
939
940    if( count < max )
941        fireNeedReq( msgs );
942}
943
944static int
945requestQueueIsFull( const tr_peermsgs * msgs )
946{
947    const int req_max = msgs->maxActiveRequests;
948    return msgs->clientWillAskFor.count >= req_max;
949}
950
951tr_addreq_t
952tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
953                       uint32_t         index,
954                       uint32_t         offset,
955                       uint32_t         length,
956                       int              doForce )
957{
958    struct peer_request req;
959
960    assert( msgs );
961    assert( msgs->torrent );
962    assert( reqIsValid( msgs, index, offset, length ) );
963
964    /**
965    ***  Reasons to decline the request
966    **/
967
968    /* don't send requests to choked clients */
969    if( !doForce && msgs->peer->clientIsChoked ) {
970        dbgmsg( msgs, "declining request because they're choking us" );
971        return TR_ADDREQ_CLIENT_CHOKED;
972    }
973
974    /* peer doesn't have this piece */
975    if( !doForce && !tr_bitfieldHas( msgs->peer->have, index ) )
976        return TR_ADDREQ_MISSING;
977
978    /* peer's queue is full */
979    if( !doForce && requestQueueIsFull( msgs ) ) {
980        dbgmsg( msgs, "declining request because we're full" );
981        return TR_ADDREQ_FULL;
982    }
983
984    /* have we already asked for this piece? */
985    req.index = index;
986    req.offset = offset;
987    req.length = length;
988    if( !doForce ) {
989        if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
990            dbgmsg( msgs, "declining because it's a duplicate" );
991            return TR_ADDREQ_DUPLICATE;
992        }
993        if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
994            dbgmsg( msgs, "declining because it's a duplicate" );
995            return TR_ADDREQ_DUPLICATE;
996        }
997    }
998
999    /**
1000    ***  Accept this request
1001    **/
1002
1003    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1004            index, offset, length );
1005    req.time_requested = time( NULL );
1006    reqListAppend( &msgs->clientWillAskFor, &req );
1007    return TR_ADDREQ_OK;
1008}
1009
1010static void
1011cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1012{
1013    int i;
1014    struct request_list a = msgs->clientWillAskFor;
1015    struct request_list b = msgs->clientAskedFor;
1016    dbgmsg( msgs, "cancelling all requests to peer" );
1017
1018    msgs->clientAskedFor = REQUEST_LIST_INIT;
1019    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1020
1021    for( i=0; i<a.count; ++i )
1022        fireCancelledReq( msgs, &a.requests[i] );
1023
1024    for( i = 0; i < b.count; ++i ) {
1025        fireCancelledReq( msgs, &b.requests[i] );
1026        if( sendCancel )
1027            protocolSendCancel( msgs, &b.requests[i] );
1028    }
1029
1030    reqListClear( &a );
1031    reqListClear( &b );
1032}
1033
1034void
1035tr_peerMsgsCancel( tr_peermsgs * msgs,
1036                   uint32_t      pieceIndex,
1037                   uint32_t      offset,
1038                   uint32_t      length )
1039{
1040    struct peer_request req;
1041
1042    assert( msgs != NULL );
1043    assert( length > 0 );
1044
1045
1046    /* have we asked the peer for this piece? */
1047    req.index = pieceIndex;
1048    req.offset = offset;
1049    req.length = length;
1050
1051    /* if it's only in the queue and hasn't been sent yet, free it */
1052    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1053        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1054        fireCancelledReq( msgs, &req );
1055    }
1056
1057    /* if it's already been sent, send a cancel message too */
1058    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1059        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1060        protocolSendCancel( msgs, &req );
1061        fireCancelledReq( msgs, &req );
1062    }
1063}
1064
1065
1066/**
1067***
1068**/
1069
1070static void
1071sendLtepHandshake( tr_peermsgs * msgs )
1072{
1073    tr_benc val, *m;
1074    char * buf;
1075    int len;
1076    int pex;
1077    struct evbuffer * out = msgs->outMessages;
1078
1079    if( msgs->clientSentLtepHandshake )
1080        return;
1081
1082    dbgmsg( msgs, "sending an ltep handshake" );
1083    msgs->clientSentLtepHandshake = 1;
1084
1085    /* decide if we want to advertise pex support */
1086    if( !tr_torrentAllowsPex( msgs->torrent ) )
1087        pex = 0;
1088    else if( msgs->peerSentLtepHandshake )
1089        pex = msgs->peerSupportsPex ? 1 : 0;
1090    else
1091        pex = 1;
1092
1093    tr_bencInitDict( &val, 5 );
1094    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1095    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1096    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1097    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1098    m  = tr_bencDictAddDict( &val, "m", 1 );
1099    if( pex )
1100        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1101    buf = tr_bencSave( &val, &len );
1102
1103    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1104    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1105    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1106    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1107    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1108    dbgOutMessageLen( msgs );
1109
1110    /* cleanup */
1111    tr_bencFree( &val );
1112    tr_free( buf );
1113}
1114
1115static void
1116parseLtepHandshake( tr_peermsgs *     msgs,
1117                    int               len,
1118                    struct evbuffer * inbuf )
1119{
1120    int64_t   i;
1121    tr_benc   val, * sub;
1122    uint8_t * tmp = tr_new( uint8_t, len );
1123
1124    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1125    msgs->peerSentLtepHandshake = 1;
1126
1127    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1128    {
1129        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1130        tr_free( tmp );
1131        return;
1132    }
1133
1134    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1135
1136    /* does the peer prefer encrypted connections? */
1137    if( tr_bencDictFindInt( &val, "e", &i ) )
1138        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1139                                              : ENCRYPTION_PREFERENCE_NO;
1140
1141    /* check supported messages for utorrent pex */
1142    msgs->peerSupportsPex = 0;
1143    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1144        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1145            msgs->ut_pex_id = (uint8_t) i;
1146            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1147            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1148        }
1149    }
1150
1151    /* look for upload_only (BEP 21) */
1152    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1153        fireUploadOnly( msgs, i!=0 );
1154
1155    /* get peer's listening port */
1156    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1157        msgs->peer->port = htons( (uint16_t)i );
1158        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1159    }
1160
1161    /* get peer's maximum request queue size */
1162    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1163        msgs->reqq = i;
1164
1165    tr_bencFree( &val );
1166    tr_free( tmp );
1167}
1168
1169static void
1170parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1171{
1172    int loaded = 0;
1173    uint8_t * tmp = tr_new( uint8_t, msglen );
1174    tr_benc val;
1175    const tr_torrent * tor = msgs->torrent;
1176    const uint8_t * added;
1177    size_t added_len;
1178
1179    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1180
1181    if( tr_torrentAllowsPex( tor )
1182      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1183    {
1184        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1185        {
1186            const uint8_t * added_f = NULL;
1187            tr_pex *        pex;
1188            size_t          i, n;
1189            size_t          added_f_len = 0;
1190            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1191            pex =
1192                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1193                                        &n );
1194            for( i = 0; i < n; ++i )
1195                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1196                                  TR_PEER_FROM_PEX, pex + i );
1197            tr_free( pex );
1198        }
1199       
1200        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1201        {
1202            const uint8_t * added_f = NULL;
1203            tr_pex *        pex;
1204            size_t          i, n;
1205            size_t          added_f_len = 0;
1206            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1207            pex =
1208                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1209                                         &n );
1210            for( i = 0; i < n; ++i )
1211                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1212                                  TR_PEER_FROM_PEX, pex + i );
1213            tr_free( pex );
1214        }
1215       
1216    }
1217
1218    if( loaded )
1219        tr_bencFree( &val );
1220    tr_free( tmp );
1221}
1222
1223static void sendPex( tr_peermsgs * msgs );
1224
1225static void
1226parseLtep( tr_peermsgs *     msgs,
1227           int               msglen,
1228           struct evbuffer * inbuf )
1229{
1230    uint8_t ltep_msgid;
1231
1232    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1233    msglen--;
1234
1235    if( ltep_msgid == LTEP_HANDSHAKE )
1236    {
1237        dbgmsg( msgs, "got ltep handshake" );
1238        parseLtepHandshake( msgs, msglen, inbuf );
1239        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1240        {
1241            sendLtepHandshake( msgs );
1242            sendPex( msgs );
1243        }
1244    }
1245    else if( ltep_msgid == TR_LTEP_PEX )
1246    {
1247        dbgmsg( msgs, "got ut pex" );
1248        msgs->peerSupportsPex = 1;
1249        parseUtPex( msgs, msglen, inbuf );
1250    }
1251    else
1252    {
1253        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1254        evbuffer_drain( inbuf, msglen );
1255    }
1256}
1257
1258static int
1259readBtLength( tr_peermsgs *     msgs,
1260              struct evbuffer * inbuf,
1261              size_t            inlen )
1262{
1263    uint32_t len;
1264
1265    if( inlen < sizeof( len ) )
1266        return READ_LATER;
1267
1268    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1269
1270    if( len == 0 ) /* peer sent us a keepalive message */
1271        dbgmsg( msgs, "got KeepAlive" );
1272    else
1273    {
1274        msgs->incoming.length = len;
1275        msgs->state = AWAITING_BT_ID;
1276    }
1277
1278    return READ_NOW;
1279}
1280
1281static int readBtMessage( tr_peermsgs *     msgs,
1282                          struct evbuffer * inbuf,
1283                          size_t            inlen );
1284
1285static int
1286readBtId( tr_peermsgs *     msgs,
1287          struct evbuffer * inbuf,
1288          size_t            inlen )
1289{
1290    uint8_t id;
1291
1292    if( inlen < sizeof( uint8_t ) )
1293        return READ_LATER;
1294
1295    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1296    msgs->incoming.id = id;
1297
1298    if( id == BT_PIECE )
1299    {
1300        msgs->state = AWAITING_BT_PIECE;
1301        return READ_NOW;
1302    }
1303    else if( msgs->incoming.length != 1 )
1304    {
1305        msgs->state = AWAITING_BT_MESSAGE;
1306        return READ_NOW;
1307    }
1308    else return readBtMessage( msgs, inbuf, inlen - 1 );
1309}
1310
1311static void
1312updatePeerProgress( tr_peermsgs * msgs )
1313{
1314    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1315    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1316    updateFastSet( msgs );
1317    updateInterest( msgs );
1318    firePeerProgress( msgs );
1319}
1320
1321static void
1322peerMadeRequest( tr_peermsgs *               msgs,
1323                 const struct peer_request * req )
1324{
1325    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1326    const int reqIsValid = requestIsValid( msgs, req );
1327    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1328    const int peerIsChoked = msgs->peer->peerIsChoked;
1329
1330    int allow = FALSE;
1331
1332    if( !reqIsValid )
1333        dbgmsg( msgs, "rejecting an invalid request." );
1334    else if( !clientHasPiece )
1335        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1336    else if( peerIsChoked )
1337        dbgmsg( msgs, "rejecting request from choked peer" );
1338    else
1339        allow = TRUE;
1340
1341    if( allow )
1342        reqListAppend( &msgs->peerAskedFor, req );
1343    else if( fext )
1344        protocolSendReject( msgs, req );
1345}
1346
1347static int
1348messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1349{
1350    switch( id )
1351    {
1352        case BT_CHOKE:
1353        case BT_UNCHOKE:
1354        case BT_INTERESTED:
1355        case BT_NOT_INTERESTED:
1356        case BT_FEXT_HAVE_ALL:
1357        case BT_FEXT_HAVE_NONE:
1358            return len == 1;
1359
1360        case BT_HAVE:
1361        case BT_FEXT_SUGGEST:
1362        case BT_FEXT_ALLOWED_FAST:
1363            return len == 5;
1364
1365        case BT_BITFIELD:
1366            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1367
1368        case BT_REQUEST:
1369        case BT_CANCEL:
1370        case BT_FEXT_REJECT:
1371            return len == 13;
1372
1373        case BT_PIECE:
1374            return len > 9 && len <= 16393;
1375
1376        case BT_PORT:
1377            return len == 3;
1378
1379        case BT_LTEP:
1380            return len >= 2;
1381
1382        default:
1383            return FALSE;
1384    }
1385}
1386
1387static int clientGotBlock( tr_peermsgs *               msgs,
1388                           const uint8_t *             block,
1389                           const struct peer_request * req );
1390
1391static int
1392readBtPiece( tr_peermsgs      * msgs,
1393             struct evbuffer  * inbuf,
1394             size_t             inlen,
1395             size_t           * setme_piece_bytes_read )
1396{
1397    struct peer_request * req = &msgs->incoming.blockReq;
1398
1399    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1400    dbgmsg( msgs, "In readBtPiece" );
1401
1402    if( !req->length )
1403    {
1404        if( inlen < 8 )
1405            return READ_LATER;
1406
1407        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1408        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1409        req->length = msgs->incoming.length - 9;
1410        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1411        return READ_NOW;
1412    }
1413    else
1414    {
1415        int err;
1416
1417        /* read in another chunk of data */
1418        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1419        size_t n = MIN( nLeft, inlen );
1420        uint8_t * buf = tr_new( uint8_t, n );
1421        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1422        tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, n );
1423        evbuffer_add( msgs->incoming.block, buf, n );
1424        fireClientGotData( msgs, n, TRUE );
1425        *setme_piece_bytes_read += n;
1426        tr_free( buf );
1427        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1428               n, req->index, req->offset, req->length,
1429               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1430        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1431            return READ_LATER;
1432
1433        /* we've got the whole block ... process it */
1434        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1435
1436        /* cleanup */
1437        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1438        req->length = 0;
1439        msgs->state = AWAITING_BT_LENGTH;
1440        if( !err )
1441            return READ_NOW;
1442        else {
1443            fireError( msgs, err );
1444            return READ_ERR;
1445        }
1446    }
1447}
1448
1449static int
1450readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1451{
1452    uint32_t      ui32;
1453    uint32_t      msglen = msgs->incoming.length;
1454    const uint8_t id = msgs->incoming.id;
1455    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1456    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1457
1458    --msglen; /* id length */
1459
1460    if( inlen < msglen )
1461        return READ_LATER;
1462
1463    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1464
1465    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1466    {
1467        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1468        fireError( msgs, EMSGSIZE );
1469        return READ_ERR;
1470    }
1471
1472    switch( id )
1473    {
1474        case BT_CHOKE:
1475            dbgmsg( msgs, "got Choke" );
1476            msgs->peer->clientIsChoked = 1;
1477            if( !fext )
1478                cancelAllRequestsToPeer( msgs, FALSE );
1479            break;
1480
1481        case BT_UNCHOKE:
1482            dbgmsg( msgs, "got Unchoke" );
1483            msgs->peer->clientIsChoked = 0;
1484            fireNeedReq( msgs );
1485            break;
1486
1487        case BT_INTERESTED:
1488            dbgmsg( msgs, "got Interested" );
1489            msgs->peer->peerIsInterested = 1;
1490            break;
1491
1492        case BT_NOT_INTERESTED:
1493            dbgmsg( msgs, "got Not Interested" );
1494            msgs->peer->peerIsInterested = 0;
1495            break;
1496
1497        case BT_HAVE:
1498            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1499            dbgmsg( msgs, "got Have: %u", ui32 );
1500            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1501                fireError( msgs, ERANGE );
1502                return READ_ERR;
1503            }
1504            updatePeerProgress( msgs );
1505            tr_rcTransferred( msgs->torrent->swarmSpeed,
1506                              msgs->torrent->info.pieceSize );
1507            break;
1508
1509        case BT_BITFIELD:
1510        {
1511            dbgmsg( msgs, "got a bitfield" );
1512            msgs->peerSentBitfield = 1;
1513            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1514            updatePeerProgress( msgs );
1515            fireNeedReq( msgs );
1516            break;
1517        }
1518
1519        case BT_REQUEST:
1520        {
1521            struct peer_request r;
1522            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1523            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1524            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1525            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1526            peerMadeRequest( msgs, &r );
1527            break;
1528        }
1529
1530        case BT_CANCEL:
1531        {
1532            struct peer_request r;
1533            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1534            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1535            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1536            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1537            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1538                protocolSendReject( msgs, &r );
1539            break;
1540        }
1541
1542        case BT_PIECE:
1543            assert( 0 ); /* handled elsewhere! */
1544            break;
1545
1546        case BT_PORT:
1547            dbgmsg( msgs, "Got a BT_PORT" );
1548            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1549            break;
1550
1551        case BT_FEXT_SUGGEST:
1552            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1553            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1554            if( fext )
1555                fireClientGotSuggest( msgs, ui32 );
1556            else {
1557                fireError( msgs, EMSGSIZE );
1558                return READ_ERR;
1559            }
1560            break;
1561
1562        case BT_FEXT_ALLOWED_FAST:
1563            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1564            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1565            if( fext )
1566                fireClientGotAllowedFast( msgs, ui32 );
1567            else {
1568                fireError( msgs, EMSGSIZE );
1569                return READ_ERR;
1570            }
1571            break;
1572
1573        case BT_FEXT_HAVE_ALL:
1574            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1575            if( fext ) {
1576                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1577                updatePeerProgress( msgs );
1578            } else {
1579                fireError( msgs, EMSGSIZE );
1580                return READ_ERR;
1581            }
1582            break;
1583
1584
1585        case BT_FEXT_HAVE_NONE:
1586            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1587            if( fext ) {
1588                tr_bitfieldClear( msgs->peer->have );
1589                updatePeerProgress( msgs );
1590            } else {
1591                fireError( msgs, EMSGSIZE );
1592                return READ_ERR;
1593            }
1594            break;
1595
1596        case BT_FEXT_REJECT:
1597        {
1598            struct peer_request r;
1599            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1600            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1601            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1602            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1603            if( fext )
1604                reqListRemove( &msgs->clientAskedFor, &r );
1605            else {
1606                fireError( msgs, EMSGSIZE );
1607                return READ_ERR;
1608            }
1609            break;
1610        }
1611
1612        case BT_LTEP:
1613            dbgmsg( msgs, "Got a BT_LTEP" );
1614            parseLtep( msgs, msglen, inbuf );
1615            break;
1616
1617        default:
1618            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1619            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1620            break;
1621    }
1622
1623    assert( msglen + 1 == msgs->incoming.length );
1624    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1625
1626    msgs->state = AWAITING_BT_LENGTH;
1627    return READ_NOW;
1628}
1629
1630static void
1631decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1632{
1633    tr_torrent * tor = msgs->torrent;
1634
1635    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1636}
1637
1638static void
1639clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1640{
1641    decrementDownloadedCount( msgs, req->length );
1642}
1643
1644static void
1645addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1646{
1647    if( !msgs->peer->blame )
1648         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1649    tr_bitfieldAdd( msgs->peer->blame, index );
1650}
1651
1652/* returns 0 on success, or an errno on failure */
1653static int
1654clientGotBlock( tr_peermsgs *               msgs,
1655                const uint8_t *             data,
1656                const struct peer_request * req )
1657{
1658    int err;
1659    tr_torrent * tor = msgs->torrent;
1660    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1661
1662    assert( msgs );
1663    assert( req );
1664
1665    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1666        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1667                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1668        return EMSGSIZE;
1669    }
1670
1671    /* save the block */
1672    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1673
1674    /**
1675    *** Remove the block from our `we asked for this' list
1676    **/
1677
1678    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1679        clientGotUnwantedBlock( msgs, req );
1680        dbgmsg( msgs, "we didn't ask for this message..." );
1681        return 0;
1682    }
1683
1684    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1685            msgs->clientAskedFor.count );
1686
1687    /**
1688    *** Error checks
1689    **/
1690
1691    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1692        dbgmsg( msgs, "we have this block already..." );
1693        clientGotUnwantedBlock( msgs, req );
1694        return 0;
1695    }
1696
1697    /**
1698    ***  Save the block
1699    **/
1700
1701    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1702        return err;
1703
1704    addPeerToBlamefield( msgs, req->index );
1705    fireGotBlock( msgs, req );
1706    return 0;
1707}
1708
1709static int peerPulse( void * vmsgs );
1710
1711static void
1712didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1713{
1714    tr_peermsgs * msgs = vmsgs;
1715    firePeerGotData( msgs, bytesWritten, wasPieceData );
1716    peerPulse( msgs );
1717}
1718
1719static ReadState
1720canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1721{
1722    ReadState         ret;
1723    tr_peermsgs *     msgs = vmsgs;
1724    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1725    const size_t      inlen = EVBUFFER_LENGTH( in );
1726
1727    if( !inlen )
1728    {
1729        ret = READ_LATER;
1730    }
1731    else if( msgs->state == AWAITING_BT_PIECE )
1732    {
1733        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1734    }
1735    else switch( msgs->state )
1736    {
1737        case AWAITING_BT_LENGTH:
1738            ret = readBtLength ( msgs, in, inlen ); break;
1739
1740        case AWAITING_BT_ID:
1741            ret = readBtId     ( msgs, in, inlen ); break;
1742
1743        case AWAITING_BT_MESSAGE:
1744            ret = readBtMessage( msgs, in, inlen ); break;
1745
1746        default:
1747            assert( 0 );
1748    }
1749
1750    /* log the raw data that was read */
1751    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1752        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1753
1754    return ret;
1755}
1756
1757/**
1758***
1759**/
1760
1761static int
1762ratePulse( void * vmsgs )
1763{
1764    tr_peermsgs * msgs = vmsgs;
1765    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1766    const int seconds = 10;
1767    const int floor = 8;
1768    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1769
1770    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1771
1772    if( msgs->reqq > 0 )
1773        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1774
1775    return TRUE;
1776}
1777
1778static size_t
1779fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1780{
1781    size_t bytesWritten = 0;
1782    struct peer_request req;
1783    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1784    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1785
1786    /**
1787    ***  Protocol messages
1788    **/
1789
1790    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1791    {
1792        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1793        msgs->outMessagesBatchedAt = now;
1794    }
1795    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1796    {
1797        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1798        /* flush the protocol messages */
1799        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1800        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1801        msgs->clientSentAnythingAt = now;
1802        msgs->outMessagesBatchedAt = 0;
1803        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1804        bytesWritten +=  len;
1805    }
1806
1807    /**
1808    ***  Blocks
1809    **/
1810
1811    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1812        && popNextRequest( msgs, &req ) )
1813    {
1814        if( requestIsValid( msgs, &req )
1815            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1816        {
1817            /* send a block */
1818            uint8_t * buf = tr_new( uint8_t, req.length );
1819            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1820            if( err ) {
1821                fireError( msgs, err );
1822                bytesWritten = 0;
1823                msgs = NULL;
1824            } else {
1825                tr_peerIo * io = msgs->peer->io;
1826                struct evbuffer * out = evbuffer_new( );
1827                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1828                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1829                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1830                tr_peerIoWriteUint32( io, out, req.index );
1831                tr_peerIoWriteUint32( io, out, req.offset );
1832                tr_peerIoWriteBytes ( io, out, buf, req.length );
1833                tr_peerIoWriteBuf( io, out, TRUE );
1834                bytesWritten += EVBUFFER_LENGTH( out );
1835                evbuffer_free( out );
1836                msgs->clientSentAnythingAt = now;
1837            }
1838            tr_free( buf );
1839        }
1840        else if( fext ) /* peer needs a reject message */
1841        {
1842            protocolSendReject( msgs, &req );
1843        }
1844    }
1845
1846    /**
1847    ***  Keepalive
1848    **/
1849
1850    if( ( msgs != NULL )
1851        && ( msgs->clientSentAnythingAt != 0 )
1852        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1853    {
1854        dbgmsg( msgs, "sending a keepalive message" );
1855        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1856        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1857    }
1858
1859    return bytesWritten;
1860}
1861
1862static int
1863peerPulse( void * vmsgs )
1864{
1865    tr_peermsgs * msgs = vmsgs;
1866    const time_t  now = time( NULL );
1867
1868    ratePulse( msgs );
1869
1870    pumpRequestQueue( msgs, now );
1871    expireOldRequests( msgs, now );
1872
1873    for( ;; )
1874        if( fillOutputBuffer( msgs, now ) < 1 )
1875            break;
1876
1877    return TRUE; /* loop forever */
1878}
1879
1880void
1881tr_peerMsgsPulse( tr_peermsgs * msgs )
1882{
1883    if( msgs )
1884        peerPulse( msgs );
1885}
1886
1887static void
1888gotError( tr_peerIo  * io UNUSED,
1889          short        what,
1890          void       * vmsgs )
1891{
1892    if( what & EVBUFFER_TIMEOUT )
1893        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1894    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1895        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1896               what, errno, tr_strerror( errno ) );
1897    fireError( vmsgs, ENOTCONN );
1898}
1899
1900static void
1901sendBitfield( tr_peermsgs * msgs )
1902{
1903    struct evbuffer * out = msgs->outMessages;
1904    tr_bitfield *     field;
1905    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1906    size_t            i;
1907    size_t            lazyCount = 0;
1908
1909    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1910
1911    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1912    {
1913        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1914            speed over a truly random sample -- let's limit the pool size to
1915            the first 1000 pieces so large torrents don't bog things down */
1916        size_t poolSize;
1917        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1918        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1919
1920        /* build the pool */
1921        for( i=poolSize=0; i<maxPoolSize; ++i )
1922            if( tr_bitfieldHas( field, i ) )
1923                pool[poolSize++] = i;
1924
1925        /* pull random piece indices from the pool */
1926        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1927        {
1928            const int pos = tr_cryptoWeakRandInt( poolSize );
1929            const tr_piece_index_t piece = pool[pos];
1930            tr_bitfieldRem( field, piece );
1931            lazyPieces[lazyCount++] = piece;
1932            pool[pos] = pool[--poolSize];
1933        }
1934
1935        /* cleanup */
1936        tr_free( pool );
1937    }
1938
1939    tr_peerIoWriteUint32( msgs->peer->io, out,
1940                          sizeof( uint8_t ) + field->byteCount );
1941    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1942    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1943    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1944            EVBUFFER_LENGTH( out ) );
1945    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1946
1947    for( i = 0; i < lazyCount; ++i )
1948        protocolSendHave( msgs, lazyPieces[i] );
1949
1950    tr_bitfieldFree( field );
1951}
1952
1953static void
1954tellPeerWhatWeHave( tr_peermsgs * msgs )
1955{
1956    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1957
1958    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1959    {
1960        protocolSendHaveAll( msgs );
1961    }
1962    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1963    {
1964        protocolSendHaveNone( msgs );
1965    }
1966    else
1967    {
1968        sendBitfield( msgs );
1969    }
1970}
1971
1972/**
1973***
1974**/
1975
1976/* some peers give us error messages if we send
1977   more than this many peers in a single pex message
1978   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1979#define MAX_PEX_ADDED 50
1980#define MAX_PEX_DROPPED 50
1981
1982typedef struct
1983{
1984    tr_pex *  added;
1985    tr_pex *  dropped;
1986    tr_pex *  elements;
1987    int       addedCount;
1988    int       droppedCount;
1989    int       elementCount;
1990}
1991PexDiffs;
1992
1993static void
1994pexAddedCb( void * vpex,
1995            void * userData )
1996{
1997    PexDiffs * diffs = userData;
1998    tr_pex *   pex = vpex;
1999
2000    if( diffs->addedCount < MAX_PEX_ADDED )
2001    {
2002        diffs->added[diffs->addedCount++] = *pex;
2003        diffs->elements[diffs->elementCount++] = *pex;
2004    }
2005}
2006
2007static void
2008pexDroppedCb( void * vpex,
2009              void * userData )
2010{
2011    PexDiffs * diffs = userData;
2012    tr_pex *   pex = vpex;
2013
2014    if( diffs->droppedCount < MAX_PEX_DROPPED )
2015    {
2016        diffs->dropped[diffs->droppedCount++] = *pex;
2017    }
2018}
2019
2020static void
2021pexElementCb( void * vpex,
2022              void * userData )
2023{
2024    PexDiffs * diffs = userData;
2025    tr_pex *   pex = vpex;
2026
2027    diffs->elements[diffs->elementCount++] = *pex;
2028}
2029
2030static void
2031sendPex( tr_peermsgs * msgs )
2032{
2033    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2034    {
2035        PexDiffs diffs;
2036        PexDiffs diffs6;
2037        tr_pex * newPex = NULL;
2038        tr_pex * newPex6 = NULL;
2039        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2040                                                 msgs->torrent->info.hash,
2041                                                 &newPex, TR_AF_INET );
2042        const int newCount6 = tr_peerMgrGetPeers( msgs->session->peerMgr,
2043                                                  msgs->torrent->info.hash,
2044                                                  &newPex6, TR_AF_INET6 );
2045
2046        /* build the diffs */
2047        diffs.added = tr_new( tr_pex, newCount );
2048        diffs.addedCount = 0;
2049        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2050        diffs.droppedCount = 0;
2051        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2052        diffs.elementCount = 0;
2053        tr_set_compare( msgs->pex, msgs->pexCount,
2054                        newPex, newCount,
2055                        tr_pexCompare, sizeof( tr_pex ),
2056                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2057        diffs6.added = tr_new( tr_pex, newCount6 );
2058        diffs6.addedCount = 0;
2059        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2060        diffs6.droppedCount = 0;
2061        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2062        diffs6.elementCount = 0;
2063        tr_set_compare( msgs->pex6, msgs->pexCount6,
2064                        newPex6, newCount6,
2065                        tr_pexCompare, sizeof( tr_pex ),
2066                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2067        dbgmsg(
2068            msgs,
2069            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2070            msgs->pexCount, newCount + newCount6,
2071            diffs.addedCount + diffs6.addedCount,
2072            diffs.droppedCount + diffs6.droppedCount );
2073
2074        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2075            !diffs6.droppedCount )
2076        {
2077            tr_free( diffs.elements );
2078            tr_free( diffs6.elements );
2079        }
2080        else
2081        {
2082            int  i;
2083            tr_benc val;
2084            char * benc;
2085            int bencLen;
2086            uint8_t * tmp, *walk;
2087            struct evbuffer * out = msgs->outMessages;
2088
2089            /* update peer */
2090            tr_free( msgs->pex );
2091            msgs->pex = diffs.elements;
2092            msgs->pexCount = diffs.elementCount;
2093            tr_free( msgs->pex6 );
2094            msgs->pex6 = diffs6.elements;
2095            msgs->pexCount6 = diffs6.elementCount;
2096
2097            /* build the pex payload */
2098            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2099                                         * speed vs. likelihood? */
2100
2101            /* "added" */
2102            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2103            for( i = 0; i < diffs.addedCount; ++i )
2104            {
2105                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2106                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2107            }
2108            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2109            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2110            tr_free( tmp );
2111
2112            /* "added.f" */
2113            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2114            for( i = 0; i < diffs.addedCount; ++i )
2115                *walk++ = diffs.added[i].flags;
2116            assert( ( walk - tmp ) == diffs.addedCount );
2117            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2118            tr_free( tmp );
2119
2120            /* "dropped" */
2121            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2122            for( i = 0; i < diffs.droppedCount; ++i )
2123            {
2124                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2125                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2126            }
2127            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2128            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2129            tr_free( tmp );
2130           
2131            /* "added6" */
2132            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2133            for( i = 0; i < diffs6.addedCount; ++i )
2134            {
2135                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2136                walk += 16;
2137                memcpy( walk, &diffs6.added[i].port, 2 );
2138                walk += 2;
2139            }
2140            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2141            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2142            tr_free( tmp );
2143           
2144            /* "added6.f" */
2145            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2146            for( i = 0; i < diffs6.addedCount; ++i )
2147                *walk++ = diffs6.added[i].flags;
2148            assert( ( walk - tmp ) == diffs6.addedCount );
2149            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2150            tr_free( tmp );
2151           
2152            /* "dropped6" */
2153            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2154            for( i = 0; i < diffs6.droppedCount; ++i )
2155            {
2156                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2157                walk += 16;
2158                memcpy( walk, &diffs6.dropped[i].port, 2 );
2159                walk += 2;
2160            }
2161            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2162            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2163            tr_free( tmp );
2164
2165            /* write the pex message */
2166            benc = tr_bencSave( &val, &bencLen );
2167            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2168            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2169            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2170            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2171            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2172            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2173
2174            tr_free( benc );
2175            tr_bencFree( &val );
2176        }
2177
2178        /* cleanup */
2179        tr_free( diffs.added );
2180        tr_free( diffs.dropped );
2181        tr_free( newPex );
2182        tr_free( diffs6.added );
2183        tr_free( diffs6.dropped );
2184        tr_free( newPex6 );
2185
2186        msgs->clientSentPexAt = time( NULL );
2187    }
2188}
2189
2190static int
2191pexPulse( void * vpeer )
2192{
2193    sendPex( vpeer );
2194    return TRUE;
2195}
2196
2197/**
2198***
2199**/
2200
2201tr_peermsgs*
2202tr_peerMsgsNew( struct tr_torrent * torrent,
2203                struct tr_peer *    peer,
2204                tr_delivery_func    func,
2205                void *              userData,
2206                tr_publisher_tag *  setme )
2207{
2208    tr_peermsgs * m;
2209
2210    assert( peer );
2211    assert( peer->io );
2212
2213    m = tr_new0( tr_peermsgs, 1 );
2214    m->publisher = tr_publisherNew( );
2215    m->peer = peer;
2216    m->session = torrent->session;
2217    m->torrent = torrent;
2218    m->peer->clientIsChoked = 1;
2219    m->peer->peerIsChoked = 1;
2220    m->peer->clientIsInterested = 0;
2221    m->peer->peerIsInterested = 0;
2222    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2223    m->state = AWAITING_BT_LENGTH;
2224    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2225    m->outMessages = evbuffer_new( );
2226    m->outMessagesBatchedAt = 0;
2227    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2228    m->incoming.block = evbuffer_new( );
2229    m->peerAskedFor = REQUEST_LIST_INIT;
2230    m->clientAskedFor = REQUEST_LIST_INIT;
2231    m->clientWillAskFor = REQUEST_LIST_INIT;
2232    peer->msgs = m;
2233
2234    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2235
2236    if( tr_peerIoSupportsLTEP( peer->io ) )
2237        sendLtepHandshake( m );
2238
2239    tellPeerWhatWeHave( m );
2240
2241    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2242    ratePulse( m );
2243
2244    return m;
2245}
2246
2247void
2248tr_peerMsgsFree( tr_peermsgs* msgs )
2249{
2250    if( msgs )
2251    {
2252        tr_timerFree( &msgs->pexTimer );
2253        tr_publisherFree( &msgs->publisher );
2254        reqListClear( &msgs->clientWillAskFor );
2255        reqListClear( &msgs->clientAskedFor );
2256        reqListClear( &msgs->peerAskedFor );
2257
2258        evbuffer_free( msgs->incoming.block );
2259        evbuffer_free( msgs->outMessages );
2260        tr_free( msgs->pex6 );
2261        tr_free( msgs->pex );
2262
2263        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2264        tr_free( msgs );
2265    }
2266}
2267
2268void
2269tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2270                        tr_publisher_tag tag )
2271{
2272    tr_publisherUnsubscribe( peer->publisher, tag );
2273}
2274
Note: See TracBrowser for help on using the repository browser.