source: branches/1.4x/libtransmission/peer-msgs.c @ 7455

Last change on this file since 7455 was 7455, checked in by charles, 12 years ago

(1.4x libT) backport handshake, peer, bandwidth, peer-io to 1.4x.

  • Property svn:keywords set to Date Rev Author Id
File size: 61.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7455 2008-12-22 00:51:14Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56
57    BT_FEXT_SUGGEST         = 13,
58    BT_FEXT_HAVE_ALL        = 14,
59    BT_FEXT_HAVE_NONE       = 15,
60    BT_FEXT_REJECT          = 16,
61    BT_FEXT_ALLOWED_FAST    = 17,
62
63    BT_LTEP                 = 20,
64
65    LTEP_HANDSHAKE          = 0,
66
67    TR_LTEP_PEX             = 1,
68
69
70
71    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
72
73    /* idle seconds before we send a keepalive */
74    KEEPALIVE_INTERVAL_SECS = 100,
75
76    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
77
78    MAX_QUEUE_SIZE          = ( 100 ),
79
80    /* how long an unsent request can stay queued before it's returned
81       back to the peer-mgr's pool of requests */
82    QUEUED_REQUEST_TTL_SECS = 20,
83
84    /* how long a sent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    SENT_REQUEST_TTL_SECS = 240,
87
88    /* used in lowering the outMessages queue period */
89    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
90    HIGH_PRIORITY_INTERVAL_SECS = 2,
91    LOW_PRIORITY_INTERVAL_SECS = 20,
92
93    /* number of pieces to remove from the bitfield when
94     * lazy bitfields are turned on */
95    LAZY_PIECE_COUNT = 26,
96
97    /* number of pieces we'll allow in our fast set */
98    MAX_FAST_SET_SIZE = 3
99};
100
101/**
102***  REQUEST MANAGEMENT
103**/
104
105enum
106{
107    AWAITING_BT_LENGTH,
108    AWAITING_BT_ID,
109    AWAITING_BT_MESSAGE,
110    AWAITING_BT_PIECE
111};
112
113struct peer_request
114{
115    uint32_t    index;
116    uint32_t    offset;
117    uint32_t    length;
118    time_t      time_requested;
119};
120
121static int
122compareRequest( const void * va, const void * vb )
123{
124    const struct peer_request * a = va;
125    const struct peer_request * b = vb;
126
127    if( a->index != b->index )
128        return a->index < b->index ? -1 : 1;
129
130    if( a->offset != b->offset )
131        return a->offset < b->offset ? -1 : 1;
132
133    if( a->length != b->length )
134        return a->length < b->length ? -1 : 1;
135
136    return 0;
137}
138
139struct request_list
140{
141    uint16_t               count;
142    uint16_t               max;
143    struct peer_request *  requests;
144};
145
146static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
147
148static void
149reqListReserve( struct request_list * list,
150                uint16_t              max )
151{
152    if( list->max < max )
153    {
154        list->max = max;
155        list->requests = tr_renew( struct peer_request,
156                                   list->requests,
157                                   list->max );
158    }
159}
160
161static void
162reqListClear( struct request_list * list )
163{
164    tr_free( list->requests );
165    *list = REQUEST_LIST_INIT;
166}
167
168static void
169reqListCopy( struct request_list * dest, const struct request_list * src )
170{
171    dest->count = dest->max = src->count;
172    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
173}
174
175static void
176reqListRemoveOne( struct request_list * list,
177                  int                   i )
178{
179    assert( 0 <= i && i < list->count );
180
181    memmove( &list->requests[i],
182             &list->requests[i + 1],
183             sizeof( struct peer_request ) * ( --list->count - i ) );
184}
185
186static void
187reqListAppend( struct request_list *       list,
188               const struct peer_request * req )
189{
190    if( ++list->count >= list->max )
191        reqListReserve( list, list->max + 8 );
192
193    list->requests[list->count - 1] = *req;
194}
195
196static int
197reqListPop( struct request_list * list,
198            struct peer_request * setme )
199{
200    int success;
201
202    if( !list->count )
203        success = FALSE;
204    else {
205        *setme = list->requests[0];
206        reqListRemoveOne( list, 0 );
207        success = TRUE;
208    }
209
210    return success;
211}
212
213static int
214reqListFind( struct request_list *       list,
215             const struct peer_request * key )
216{
217    uint16_t i;
218
219    for( i = 0; i < list->count; ++i )
220        if( !compareRequest( key, list->requests + i ) )
221            return i;
222
223    return -1;
224}
225
226static int
227reqListRemove( struct request_list *       list,
228               const struct peer_request * key )
229{
230    int success;
231    const int i = reqListFind( list, key );
232
233    if( i < 0 )
234        success = FALSE;
235    else {
236        reqListRemoveOne( list, i );
237        success = TRUE;
238    }
239
240    return success;
241}
242
243/**
244***
245**/
246
247/* this is raw, unchanged data from the peer regarding
248 * the current message that it's sending us. */
249struct tr_incoming
250{
251    uint8_t                id;
252    uint32_t               length; /* includes the +1 for id length */
253    struct peer_request    blockReq; /* metadata for incoming blocks */
254    struct evbuffer *      block; /* piece data for incoming blocks */
255};
256
257/**
258 * Low-level communication state information about a connected peer.
259 *
260 * This structure remembers the low-level protocol states that we're
261 * in with this peer, such as active requests, pex messages, and so on.
262 * Its fields are all private to peer-msgs.c.
263 *
264 * Data not directly involved with sending & receiving messages is
265 * stored in tr_peer, where it can be accessed by both peermsgs and
266 * the peer manager.
267 *
268 * @see struct peer_atom
269 * @see tr_peer
270 */
271struct tr_peermsgs
272{
273    tr_bool         peerSentBitfield;
274    tr_bool         peerSupportsPex;
275    tr_bool         clientSentLtepHandshake;
276    tr_bool         peerSentLtepHandshake;
277    tr_bool         haveFastSet;
278
279    uint8_t         state;
280    uint8_t         ut_pex_id;
281    uint16_t        pexCount;
282    uint16_t        maxActiveRequests;
283
284    size_t                 fastsetSize;
285    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
286
287    /* how long the outMessages batch should be allowed to grow before
288     * it's flushed -- some messages (like requests >:) should be sent
289     * very quickly; others aren't as urgent. */
290    int                    outMessagesBatchPeriod;
291
292    tr_peer *              peer;
293
294    tr_session *           session;
295    tr_torrent *           torrent;
296
297    tr_publisher_t *       publisher;
298
299    struct evbuffer *      outMessages; /* all the non-piece messages */
300
301    struct request_list    peerAskedFor;
302    struct request_list    clientAskedFor;
303    struct request_list    clientWillAskFor;
304
305    tr_timer             * pexTimer;
306    tr_pex               * pex;
307
308    time_t                 clientSentPexAt;
309    time_t                 clientSentAnythingAt;
310
311    /* when we started batching the outMessages */
312    time_t                outMessagesBatchedAt;
313
314    struct tr_incoming    incoming;
315
316    /* if the peer supports the Extension Protocol in BEP 10 and
317       supplied a reqq argument, it's stored here.  otherwise the
318       value is zero and should be ignored. */
319    int64_t               reqq;
320};
321
322/**
323***
324**/
325
326static void
327myDebug( const char * file, int line,
328         const struct tr_peermsgs * msgs,
329         const char * fmt, ... )
330{
331    FILE * fp = tr_getLog( );
332
333    if( fp )
334    {
335        va_list           args;
336        char              timestr[64];
337        struct evbuffer * buf = evbuffer_new( );
338        char *            base = tr_basename( file );
339
340        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
341                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
342                             msgs->torrent->info.name,
343                             tr_peerIoGetAddrStr( msgs->peer->io ),
344                             msgs->peer->client );
345        va_start( args, fmt );
346        evbuffer_add_vprintf( buf, fmt, args );
347        va_end( args );
348        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
349        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
350
351        tr_free( base );
352        evbuffer_free( buf );
353    }
354}
355
356#define dbgmsg( msgs, ... ) \
357    do { \
358        if( tr_deepLoggingIsActive( ) ) \
359            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
360    } while( 0 )
361
362/**
363***
364**/
365
366static void
367pokeBatchPeriod( tr_peermsgs * msgs,
368                 int           interval )
369{
370    if( msgs->outMessagesBatchPeriod > interval )
371    {
372        msgs->outMessagesBatchPeriod = interval;
373        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
374    }
375}
376
377static void
378dbgOutMessageLen( tr_peermsgs * msgs )
379{
380    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
381}
382
383static void
384protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
385{
386    tr_peerIo       * io  = msgs->peer->io;
387    struct evbuffer * out = msgs->outMessages;
388
389    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
390
391    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
392    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
393    tr_peerIoWriteUint32( io, out, req->index );
394    tr_peerIoWriteUint32( io, out, req->offset );
395    tr_peerIoWriteUint32( io, out, req->length );
396
397    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
398    dbgOutMessageLen( msgs );
399}
400
401static void
402protocolSendRequest( tr_peermsgs               * msgs,
403                     const struct peer_request * req )
404{
405    tr_peerIo       * io  = msgs->peer->io;
406    struct evbuffer * out = msgs->outMessages;
407
408    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
409    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
410    tr_peerIoWriteUint32( io, out, req->index );
411    tr_peerIoWriteUint32( io, out, req->offset );
412    tr_peerIoWriteUint32( io, out, req->length );
413
414    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
415    dbgOutMessageLen( msgs );
416    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
417}
418
419static void
420protocolSendCancel( tr_peermsgs               * msgs,
421                    const struct peer_request * req )
422{
423    tr_peerIo       * io  = msgs->peer->io;
424    struct evbuffer * out = msgs->outMessages;
425
426    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
427    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
428    tr_peerIoWriteUint32( io, out, req->index );
429    tr_peerIoWriteUint32( io, out, req->offset );
430    tr_peerIoWriteUint32( io, out, req->length );
431
432    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
433    dbgOutMessageLen( msgs );
434    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
435}
436
437static void
438protocolSendHave( tr_peermsgs * msgs,
439                  uint32_t      index )
440{
441    tr_peerIo       * io  = msgs->peer->io;
442    struct evbuffer * out = msgs->outMessages;
443
444    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
445    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
446    tr_peerIoWriteUint32( io, out, index );
447
448    dbgmsg( msgs, "sending Have %u", index );
449    dbgOutMessageLen( msgs );
450    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
451}
452
453#if 0
454static void
455protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
456{
457    tr_peerIo       * io  = msgs->peer->io;
458    struct evbuffer * out = msgs->outMessages;
459
460    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
461
462    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
463    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
464    tr_peerIoWriteUint32( io, out, pieceIndex );
465
466    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
467    dbgOutMessageLen( msgs );
468}
469#endif
470
471static void
472protocolSendChoke( tr_peermsgs * msgs,
473                   int           choke )
474{
475    tr_peerIo       * io  = msgs->peer->io;
476    struct evbuffer * out = msgs->outMessages;
477
478    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
479    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
480
481    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
482    dbgOutMessageLen( msgs );
483    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
484}
485
486static void
487protocolSendHaveAll( tr_peermsgs * msgs )
488{
489    tr_peerIo       * io  = msgs->peer->io;
490    struct evbuffer * out = msgs->outMessages;
491
492    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
493
494    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
495    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
496
497    dbgmsg( msgs, "sending HAVE_ALL..." );
498    dbgOutMessageLen( msgs );
499    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
500}
501
502static void
503protocolSendHaveNone( tr_peermsgs * msgs )
504{
505    tr_peerIo       * io  = msgs->peer->io;
506    struct evbuffer * out = msgs->outMessages;
507
508    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
509
510    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
511    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
512
513    dbgmsg( msgs, "sending HAVE_NONE..." );
514    dbgOutMessageLen( msgs );
515    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
516}
517
518/**
519***  EVENTS
520**/
521
522static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
523
524static void
525publish( tr_peermsgs * msgs, tr_peer_event * e )
526{
527    assert( msgs->peer );
528    assert( msgs->peer->msgs == msgs );
529
530    tr_publisherPublish( msgs->publisher, msgs->peer, e );
531}
532
533static void
534fireError( tr_peermsgs * msgs, int err )
535{
536    tr_peer_event e = blankEvent;
537    e.eventType = TR_PEER_ERROR;
538    e.err = err;
539    publish( msgs, &e );
540}
541
542static void
543fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
544{
545    tr_peer_event e = blankEvent;
546    e.eventType = TR_PEER_UPLOAD_ONLY;
547    e.uploadOnly = uploadOnly;
548    publish( msgs, &e );
549}
550
551static void
552fireNeedReq( tr_peermsgs * msgs )
553{
554    tr_peer_event e = blankEvent;
555    e.eventType = TR_PEER_NEED_REQ;
556    publish( msgs, &e );
557}
558
559static void
560firePeerProgress( tr_peermsgs * msgs )
561{
562    tr_peer_event e = blankEvent;
563    e.eventType = TR_PEER_PEER_PROGRESS;
564    e.progress = msgs->peer->progress;
565    publish( msgs, &e );
566}
567
568static void
569fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
570{
571    tr_peer_event e = blankEvent;
572    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
573    e.pieceIndex = req->index;
574    e.offset = req->offset;
575    e.length = req->length;
576    publish( msgs, &e );
577}
578
579static void
580fireClientGotData( tr_peermsgs * msgs,
581                   uint32_t      length,
582                   int           wasPieceData )
583{
584    tr_peer_event e = blankEvent;
585
586    e.length = length;
587    e.eventType = TR_PEER_CLIENT_GOT_DATA;
588    e.wasPieceData = wasPieceData;
589    publish( msgs, &e );
590}
591
592static void
593fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
594{
595    tr_peer_event e = blankEvent;
596    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
597    e.pieceIndex = pieceIndex;
598    publish( msgs, &e );
599}
600
601static void
602fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
603{
604    tr_peer_event e = blankEvent;
605    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
606    e.pieceIndex = pieceIndex;
607    publish( msgs, &e );
608}
609
610static void
611firePeerGotData( tr_peermsgs  * msgs,
612                 uint32_t       length,
613                 int            wasPieceData )
614{
615    tr_peer_event e = blankEvent;
616
617    e.length = length;
618    e.eventType = TR_PEER_PEER_GOT_DATA;
619    e.wasPieceData = wasPieceData;
620
621    publish( msgs, &e );
622}
623
624static void
625fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
626{
627    tr_peer_event e = blankEvent;
628    e.eventType = TR_PEER_CANCEL;
629    e.pieceIndex = req->index;
630    e.offset = req->offset;
631    e.length = req->length;
632    publish( msgs, &e );
633}
634
635/**
636***  ALLOWED FAST SET
637***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
638**/
639
640size_t
641tr_generateAllowedSet( tr_piece_index_t * setmePieces,
642                       size_t             desiredSetSize,
643                       size_t             pieceCount,
644                       const uint8_t    * infohash,
645                       const tr_address * addr )
646{
647    size_t setSize = 0;
648
649    assert( setmePieces );
650    assert( desiredSetSize <= pieceCount );
651    assert( desiredSetSize );
652    assert( pieceCount );
653    assert( infohash );
654    assert( addr );
655
656    if( 1 )
657    {
658        uint8_t w[SHA_DIGEST_LENGTH + 4];
659        uint8_t x[SHA_DIGEST_LENGTH];
660
661        *(uint32_t*)w = ntohl( htonl( addr->s_addr ) & 0xffffff00 ); /* (1) */
662        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
663        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
664
665        while( setSize<desiredSetSize )
666        {
667            int i;
668            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
669            {
670                size_t k;
671                uint32_t j = i * 4;                                  /* (5) */
672                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
673                uint32_t index = y % pieceCount;                     /* (7) */
674
675                for( k=0; k<setSize; ++k )                           /* (8) */
676                    if( setmePieces[k] == index )
677                        break;
678
679                if( k == setSize )
680                    setmePieces[setSize++] = index;                  /* (9) */
681            }
682
683            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
684        }
685    }
686
687    return setSize;
688}
689
690static void
691updateFastSet( tr_peermsgs * msgs UNUSED )
692{
693#if 0
694    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
695    const int peerIsNeedy = msgs->peer->progress < 0.10;
696
697    if( fext && peerIsNeedy && !msgs->haveFastSet )
698    {
699        size_t i;
700        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
701        const tr_info * inf = &msgs->torrent->info;
702        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
703
704        /* build the fast set */
705        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
706        msgs->haveFastSet = 1;
707
708        /* send it to the peer */
709        for( i=0; i<msgs->fastsetSize; ++i )
710            protocolSendAllowedFast( msgs, msgs->fastset[i] );
711    }
712#endif
713}
714
715/**
716***  INTEREST
717**/
718
719static int
720isPieceInteresting( const tr_peermsgs * msgs,
721                    tr_piece_index_t    piece )
722{
723    const tr_torrent * torrent = msgs->torrent;
724
725    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
726          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
727          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
728}
729
730/* "interested" means we'll ask for piece data if they unchoke us */
731static int
732isPeerInteresting( const tr_peermsgs * msgs )
733{
734    tr_piece_index_t    i;
735    const tr_torrent *  torrent;
736    const tr_bitfield * bitfield;
737    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
738
739    if( clientIsSeed )
740        return FALSE;
741
742    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
743        return FALSE;
744
745    torrent = msgs->torrent;
746    bitfield = tr_cpPieceBitfield( torrent->completion );
747
748    if( !msgs->peer->have )
749        return TRUE;
750
751    assert( bitfield->byteCount == msgs->peer->have->byteCount );
752
753    for( i = 0; i < torrent->info.pieceCount; ++i )
754        if( isPieceInteresting( msgs, i ) )
755            return TRUE;
756
757    return FALSE;
758}
759
760static void
761sendInterest( tr_peermsgs * msgs,
762              int           weAreInterested )
763{
764    struct evbuffer * out = msgs->outMessages;
765
766    assert( msgs );
767    assert( weAreInterested == 0 || weAreInterested == 1 );
768
769    msgs->peer->clientIsInterested = weAreInterested;
770    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
771    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
772    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
773
774    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
775    dbgOutMessageLen( msgs );
776}
777
778static void
779updateInterest( tr_peermsgs * msgs )
780{
781    const int i = isPeerInteresting( msgs );
782
783    if( i != msgs->peer->clientIsInterested )
784        sendInterest( msgs, i );
785    if( i )
786        fireNeedReq( msgs );
787}
788
789static int
790popNextRequest( tr_peermsgs *         msgs,
791                struct peer_request * setme )
792{
793    return reqListPop( &msgs->peerAskedFor, setme );
794}
795
796static void
797cancelAllRequestsToClient( tr_peermsgs * msgs )
798{
799    struct peer_request req;
800    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
801
802    while( popNextRequest( msgs, &req ) )
803        if( mustSendCancel )
804            protocolSendReject( msgs, &req );
805}
806
807void
808tr_peerMsgsSetChoke( tr_peermsgs * msgs,
809                     int           choke )
810{
811    const time_t now = time( NULL );
812    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
813
814    assert( msgs );
815    assert( msgs->peer );
816    assert( choke == 0 || choke == 1 );
817
818    if( msgs->peer->chokeChangedAt > fibrillationTime )
819    {
820        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
821    }
822    else if( msgs->peer->peerIsChoked != choke )
823    {
824        msgs->peer->peerIsChoked = choke;
825        if( choke )
826            cancelAllRequestsToClient( msgs );
827        protocolSendChoke( msgs, choke );
828        msgs->peer->chokeChangedAt = now;
829    }
830}
831
832/**
833***
834**/
835
836void
837tr_peerMsgsHave( tr_peermsgs * msgs,
838                 uint32_t      index )
839{
840    protocolSendHave( msgs, index );
841
842    /* since we have more pieces now, we might not be interested in this peer */
843    updateInterest( msgs );
844}
845
846/**
847***
848**/
849
850static int
851reqIsValid( const tr_peermsgs * peer,
852            uint32_t            index,
853            uint32_t            offset,
854            uint32_t            length )
855{
856    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
857}
858
859static int
860requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
861{
862    return reqIsValid( msgs, req->index, req->offset, req->length );
863}
864
865static void
866expireOldRequests( tr_peermsgs * msgs, const time_t now  )
867{
868    int i;
869    time_t oldestAllowed;
870    struct request_list tmp = REQUEST_LIST_INIT;
871    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
872    dbgmsg( msgs, "entering `expire old requests' block" );
873
874    /* cancel requests that have been queued for too long */
875    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
876    reqListCopy( &tmp, &msgs->clientWillAskFor );
877    for( i=0; i<tmp.count; ++i ) {
878        const struct peer_request * req = &tmp.requests[i];
879        if( req->time_requested < oldestAllowed )
880            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
881    }
882    reqListClear( &tmp );
883
884    /* if the peer doesn't support "Reject Request",
885     * cancel requests that were sent too long ago. */
886    if( !fext ) {
887        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
888        reqListCopy( &tmp, &msgs->clientAskedFor );
889        for( i=0; i<tmp.count; ++i ) {
890            const struct peer_request * req = &tmp.requests[i];
891            if( req->time_requested < oldestAllowed )
892                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
893        }
894        reqListClear( &tmp );
895    }
896
897    dbgmsg( msgs, "leaving `expire old requests' block" );
898}
899
900static void
901pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
902{
903    const int           max = msgs->maxActiveRequests;
904    int                 sent = 0;
905    int                 count = msgs->clientAskedFor.count;
906    struct peer_request req;
907
908    if( msgs->peer->clientIsChoked )
909        return;
910    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
911        return;
912
913    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
914    {
915        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
916
917        assert( requestIsValid( msgs, &req ) );
918        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
919
920        /* don't ask for it if we've already got it... this block may have
921         * come in from a different peer after we cancelled a request for it */
922        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
923        {
924            protocolSendRequest( msgs, &req );
925            req.time_requested = now;
926            reqListAppend( &msgs->clientAskedFor, &req );
927
928            ++count;
929            ++sent;
930        }
931    }
932
933    if( sent )
934        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
935                sent, msgs->clientAskedFor.count, msgs->clientWillAskFor.count );
936
937    if( count < max )
938        fireNeedReq( msgs );
939}
940
941static int
942requestQueueIsFull( const tr_peermsgs * msgs )
943{
944    const int req_max = msgs->maxActiveRequests;
945    return msgs->clientWillAskFor.count >= req_max;
946}
947
948tr_addreq_t
949tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
950                       uint32_t         index,
951                       uint32_t         offset,
952                       uint32_t         length )
953{
954    struct peer_request req;
955
956    assert( msgs );
957    assert( msgs->torrent );
958    assert( reqIsValid( msgs, index, offset, length ) );
959
960    /**
961    ***  Reasons to decline the request
962    **/
963
964    /* don't send requests to choked clients */
965    if( msgs->peer->clientIsChoked ) {
966        dbgmsg( msgs, "declining request because they're choking us" );
967        return TR_ADDREQ_CLIENT_CHOKED;
968    }
969
970    /* peer doesn't have this piece */
971    if( !tr_bitfieldHas( msgs->peer->have, index ) )
972        return TR_ADDREQ_MISSING;
973
974    /* peer's queue is full */
975    if( requestQueueIsFull( msgs ) ) {
976        dbgmsg( msgs, "declining request because we're full" );
977        return TR_ADDREQ_FULL;
978    }
979
980    /* have we already asked for this piece? */
981    req.index = index;
982    req.offset = offset;
983    req.length = length;
984    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
985        dbgmsg( msgs, "declining because it's a duplicate" );
986        return TR_ADDREQ_DUPLICATE;
987    }
988    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
989        dbgmsg( msgs, "declining because it's a duplicate" );
990        return TR_ADDREQ_DUPLICATE;
991    }
992
993    /**
994    ***  Accept this request
995    **/
996
997    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
998            index, offset, length );
999    req.time_requested = time( NULL );
1000    reqListAppend( &msgs->clientWillAskFor, &req );
1001    return TR_ADDREQ_OK;
1002}
1003
1004static void
1005cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1006{
1007    int i;
1008    struct request_list a = msgs->clientWillAskFor;
1009    struct request_list b = msgs->clientAskedFor;
1010    dbgmsg( msgs, "cancelling all requests to peer" );
1011
1012    msgs->clientAskedFor = REQUEST_LIST_INIT;
1013    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1014
1015    for( i=0; i<a.count; ++i )
1016        fireCancelledReq( msgs, &a.requests[i] );
1017
1018    for( i = 0; i < b.count; ++i ) {
1019        fireCancelledReq( msgs, &b.requests[i] );
1020        if( sendCancel )
1021            protocolSendCancel( msgs, &b.requests[i] );
1022    }
1023
1024    reqListClear( &a );
1025    reqListClear( &b );
1026}
1027
1028void
1029tr_peerMsgsCancel( tr_peermsgs * msgs,
1030                   uint32_t      pieceIndex,
1031                   uint32_t      offset,
1032                   uint32_t      length )
1033{
1034    struct peer_request req;
1035
1036    assert( msgs != NULL );
1037    assert( length > 0 );
1038
1039
1040    /* have we asked the peer for this piece? */
1041    req.index = pieceIndex;
1042    req.offset = offset;
1043    req.length = length;
1044
1045    /* if it's only in the queue and hasn't been sent yet, free it */
1046    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1047        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1048        fireCancelledReq( msgs, &req );
1049    }
1050
1051    /* if it's already been sent, send a cancel message too */
1052    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1053        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1054        protocolSendCancel( msgs, &req );
1055        fireCancelledReq( msgs, &req );
1056    }
1057}
1058
1059
1060/**
1061***
1062**/
1063
1064static void
1065sendLtepHandshake( tr_peermsgs * msgs )
1066{
1067    tr_benc val, *m;
1068    char * buf;
1069    int len;
1070    int pex;
1071    struct evbuffer * out = msgs->outMessages;
1072
1073    if( msgs->clientSentLtepHandshake )
1074        return;
1075
1076    dbgmsg( msgs, "sending an ltep handshake" );
1077    msgs->clientSentLtepHandshake = 1;
1078
1079    /* decide if we want to advertise pex support */
1080    if( !tr_torrentAllowsPex( msgs->torrent ) )
1081        pex = 0;
1082    else if( msgs->peerSentLtepHandshake )
1083        pex = msgs->peerSupportsPex ? 1 : 0;
1084    else
1085        pex = 1;
1086
1087    tr_bencInitDict( &val, 5 );
1088    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1089    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1090    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1091    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1092    m  = tr_bencDictAddDict( &val, "m", 1 );
1093    if( pex )
1094        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1095    buf = tr_bencSave( &val, &len );
1096
1097    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1098    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1099    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1100    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1101    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1102    dbgOutMessageLen( msgs );
1103
1104    /* cleanup */
1105    tr_bencFree( &val );
1106    tr_free( buf );
1107}
1108
1109static void
1110parseLtepHandshake( tr_peermsgs *     msgs,
1111                    int               len,
1112                    struct evbuffer * inbuf )
1113{
1114    int64_t   i;
1115    tr_benc   val, * sub;
1116    uint8_t * tmp = tr_new( uint8_t, len );
1117
1118    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1119    msgs->peerSentLtepHandshake = 1;
1120
1121    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1122    {
1123        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1124        tr_free( tmp );
1125        return;
1126    }
1127
1128    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1129
1130    /* does the peer prefer encrypted connections? */
1131    if( tr_bencDictFindInt( &val, "e", &i ) )
1132        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1133                                              : ENCRYPTION_PREFERENCE_NO;
1134
1135    /* check supported messages for utorrent pex */
1136    msgs->peerSupportsPex = 0;
1137    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1138        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1139            msgs->ut_pex_id = (uint8_t) i;
1140            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1141            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1142        }
1143    }
1144
1145    /* look for upload_only (BEP 21) */
1146    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1147        fireUploadOnly( msgs, i!=0 );
1148
1149    /* get peer's listening port */
1150    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1151        msgs->peer->port = htons( (uint16_t)i );
1152        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1153    }
1154
1155    /* get peer's maximum request queue size */
1156    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1157        msgs->reqq = i;
1158
1159    tr_bencFree( &val );
1160    tr_free( tmp );
1161}
1162
1163static void
1164parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1165{
1166    int loaded = 0;
1167    uint8_t * tmp = tr_new( uint8_t, msglen );
1168    tr_benc val;
1169    const tr_torrent * tor = msgs->torrent;
1170    const uint8_t * added;
1171    size_t added_len;
1172
1173    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1174
1175    if( tr_torrentAllowsPex( tor )
1176      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
1177      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1178    {
1179        const uint8_t * added_f = NULL;
1180        tr_pex *        pex;
1181        size_t          i, n;
1182        size_t          added_f_len = 0;
1183        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1184        pex =
1185            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1186                                    &n );
1187        for( i = 0; i < n; ++i )
1188            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1189                              TR_PEER_FROM_PEX, pex + i );
1190        tr_free( pex );
1191    }
1192
1193    if( loaded )
1194        tr_bencFree( &val );
1195    tr_free( tmp );
1196}
1197
1198static void sendPex( tr_peermsgs * msgs );
1199
1200static void
1201parseLtep( tr_peermsgs *     msgs,
1202           int               msglen,
1203           struct evbuffer * inbuf )
1204{
1205    uint8_t ltep_msgid;
1206
1207    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1208    msglen--;
1209
1210    if( ltep_msgid == LTEP_HANDSHAKE )
1211    {
1212        dbgmsg( msgs, "got ltep handshake" );
1213        parseLtepHandshake( msgs, msglen, inbuf );
1214        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1215        {
1216            sendLtepHandshake( msgs );
1217            sendPex( msgs );
1218        }
1219    }
1220    else if( ltep_msgid == TR_LTEP_PEX )
1221    {
1222        dbgmsg( msgs, "got ut pex" );
1223        msgs->peerSupportsPex = 1;
1224        parseUtPex( msgs, msglen, inbuf );
1225    }
1226    else
1227    {
1228        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1229        evbuffer_drain( inbuf, msglen );
1230    }
1231}
1232
1233static int
1234readBtLength( tr_peermsgs *     msgs,
1235              struct evbuffer * inbuf,
1236              size_t            inlen )
1237{
1238    uint32_t len;
1239
1240    if( inlen < sizeof( len ) )
1241        return READ_LATER;
1242
1243    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1244
1245    if( len == 0 ) /* peer sent us a keepalive message */
1246        dbgmsg( msgs, "got KeepAlive" );
1247    else
1248    {
1249        msgs->incoming.length = len;
1250        msgs->state = AWAITING_BT_ID;
1251    }
1252
1253    return READ_NOW;
1254}
1255
1256static int readBtMessage( tr_peermsgs *     msgs,
1257                          struct evbuffer * inbuf,
1258                          size_t            inlen );
1259
1260static int
1261readBtId( tr_peermsgs *     msgs,
1262          struct evbuffer * inbuf,
1263          size_t            inlen )
1264{
1265    uint8_t id;
1266
1267    if( inlen < sizeof( uint8_t ) )
1268        return READ_LATER;
1269
1270    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1271    msgs->incoming.id = id;
1272
1273    if( id == BT_PIECE )
1274    {
1275        msgs->state = AWAITING_BT_PIECE;
1276        return READ_NOW;
1277    }
1278    else if( msgs->incoming.length != 1 )
1279    {
1280        msgs->state = AWAITING_BT_MESSAGE;
1281        return READ_NOW;
1282    }
1283    else return readBtMessage( msgs, inbuf, inlen - 1 );
1284}
1285
1286static void
1287updatePeerProgress( tr_peermsgs * msgs )
1288{
1289    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1290    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1291    updateFastSet( msgs );
1292    updateInterest( msgs );
1293    firePeerProgress( msgs );
1294}
1295
1296static void
1297peerMadeRequest( tr_peermsgs *               msgs,
1298                 const struct peer_request * req )
1299{
1300    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1301    const int reqIsValid = requestIsValid( msgs, req );
1302    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1303    const int peerIsChoked = msgs->peer->peerIsChoked;
1304
1305    int allow = FALSE;
1306
1307    if( !reqIsValid )
1308        dbgmsg( msgs, "rejecting an invalid request." );
1309    else if( !clientHasPiece )
1310        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1311    else if( peerIsChoked )
1312        dbgmsg( msgs, "rejecting request from choked peer" );
1313    else
1314        allow = TRUE;
1315
1316    if( allow )
1317        reqListAppend( &msgs->peerAskedFor, req );
1318    else if( fext )
1319        protocolSendReject( msgs, req );
1320}
1321
1322static int
1323messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1324{
1325    switch( id )
1326    {
1327        case BT_CHOKE:
1328        case BT_UNCHOKE:
1329        case BT_INTERESTED:
1330        case BT_NOT_INTERESTED:
1331        case BT_FEXT_HAVE_ALL:
1332        case BT_FEXT_HAVE_NONE:
1333            return len == 1;
1334
1335        case BT_HAVE:
1336        case BT_FEXT_SUGGEST:
1337        case BT_FEXT_ALLOWED_FAST:
1338            return len == 5;
1339
1340        case BT_BITFIELD:
1341            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1342
1343        case BT_REQUEST:
1344        case BT_CANCEL:
1345        case BT_FEXT_REJECT:
1346            return len == 13;
1347
1348        case BT_PIECE:
1349            return len > 9 && len <= 16393;
1350
1351        case BT_PORT:
1352            return len == 3;
1353
1354        case BT_LTEP:
1355            return len >= 2;
1356
1357        default:
1358            return FALSE;
1359    }
1360}
1361
1362static int clientGotBlock( tr_peermsgs *               msgs,
1363                           const uint8_t *             block,
1364                           const struct peer_request * req );
1365
1366static int
1367readBtPiece( tr_peermsgs      * msgs,
1368             struct evbuffer  * inbuf,
1369             size_t             inlen,
1370             size_t           * setme_piece_bytes_read )
1371{
1372    struct peer_request * req = &msgs->incoming.blockReq;
1373
1374    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1375    dbgmsg( msgs, "In readBtPiece" );
1376
1377    if( !req->length )
1378    {
1379        if( inlen < 8 )
1380            return READ_LATER;
1381
1382        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1383        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1384        req->length = msgs->incoming.length - 9;
1385        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1386        return READ_NOW;
1387    }
1388    else
1389    {
1390        int err;
1391
1392        /* read in another chunk of data */
1393        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1394        size_t n = MIN( nLeft, inlen );
1395        uint8_t * buf = tr_new( uint8_t, n );
1396        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1397        tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, n );
1398        evbuffer_add( msgs->incoming.block, buf, n );
1399        fireClientGotData( msgs, n, TRUE );
1400        *setme_piece_bytes_read += n;
1401        tr_free( buf );
1402        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1403               n, req->index, req->offset, req->length,
1404               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1405        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1406            return READ_LATER;
1407
1408        /* we've got the whole block ... process it */
1409        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1410
1411        /* cleanup */
1412        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1413        req->length = 0;
1414        msgs->state = AWAITING_BT_LENGTH;
1415        if( !err )
1416            return READ_NOW;
1417        else {
1418            fireError( msgs, err );
1419            return READ_ERR;
1420        }
1421    }
1422}
1423
1424static int
1425readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1426{
1427    uint32_t      ui32;
1428    uint32_t      msglen = msgs->incoming.length;
1429    const uint8_t id = msgs->incoming.id;
1430    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1431    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1432
1433    --msglen; /* id length */
1434
1435    if( inlen < msglen )
1436        return READ_LATER;
1437
1438    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1439
1440    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1441    {
1442        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1443        fireError( msgs, EMSGSIZE );
1444        return READ_ERR;
1445    }
1446
1447    switch( id )
1448    {
1449        case BT_CHOKE:
1450            dbgmsg( msgs, "got Choke" );
1451            msgs->peer->clientIsChoked = 1;
1452            if( !fext )
1453                cancelAllRequestsToPeer( msgs, FALSE );
1454            break;
1455
1456        case BT_UNCHOKE:
1457            dbgmsg( msgs, "got Unchoke" );
1458            msgs->peer->clientIsChoked = 0;
1459            fireNeedReq( msgs );
1460            break;
1461
1462        case BT_INTERESTED:
1463            dbgmsg( msgs, "got Interested" );
1464            msgs->peer->peerIsInterested = 1;
1465            break;
1466
1467        case BT_NOT_INTERESTED:
1468            dbgmsg( msgs, "got Not Interested" );
1469            msgs->peer->peerIsInterested = 0;
1470            break;
1471
1472        case BT_HAVE:
1473            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1474            dbgmsg( msgs, "got Have: %u", ui32 );
1475            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1476                fireError( msgs, ERANGE );
1477                return READ_ERR;
1478            }
1479            updatePeerProgress( msgs );
1480            tr_rcTransferred( msgs->torrent->swarmSpeed,
1481                              msgs->torrent->info.pieceSize );
1482            break;
1483
1484        case BT_BITFIELD:
1485        {
1486            dbgmsg( msgs, "got a bitfield" );
1487            msgs->peerSentBitfield = 1;
1488            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1489            updatePeerProgress( msgs );
1490            fireNeedReq( msgs );
1491            break;
1492        }
1493
1494        case BT_REQUEST:
1495        {
1496            struct peer_request r;
1497            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1498            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1499            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1500            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1501            peerMadeRequest( msgs, &r );
1502            break;
1503        }
1504
1505        case BT_CANCEL:
1506        {
1507            struct peer_request r;
1508            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1509            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1510            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1511            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1512            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1513                protocolSendReject( msgs, &r );
1514            break;
1515        }
1516
1517        case BT_PIECE:
1518            assert( 0 ); /* handled elsewhere! */
1519            break;
1520
1521        case BT_PORT:
1522            dbgmsg( msgs, "Got a BT_PORT" );
1523            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1524            break;
1525
1526        case BT_FEXT_SUGGEST:
1527            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1528            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1529            if( fext )
1530                fireClientGotSuggest( msgs, ui32 );
1531            else {
1532                fireError( msgs, EMSGSIZE );
1533                return READ_ERR;
1534            }
1535            break;
1536
1537        case BT_FEXT_ALLOWED_FAST:
1538            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1539            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1540            if( fext )
1541                fireClientGotAllowedFast( msgs, ui32 );
1542            else {
1543                fireError( msgs, EMSGSIZE );
1544                return READ_ERR;
1545            }
1546            break;
1547
1548        case BT_FEXT_HAVE_ALL:
1549            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1550            if( fext ) {
1551                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1552                updatePeerProgress( msgs );
1553            } else {
1554                fireError( msgs, EMSGSIZE );
1555                return READ_ERR;
1556            }
1557            break;
1558
1559        case BT_FEXT_HAVE_NONE:
1560            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1561            if( fext ) {
1562                tr_bitfieldClear( msgs->peer->have );
1563                updatePeerProgress( msgs );
1564            } else {
1565                fireError( msgs, EMSGSIZE );
1566                return READ_ERR;
1567            }
1568            break;
1569
1570        case BT_FEXT_REJECT:
1571        {
1572            struct peer_request r;
1573            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1574            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1575            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1576            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1577            if( fext )
1578                reqListRemove( &msgs->clientAskedFor, &r );
1579            else {
1580                fireError( msgs, EMSGSIZE );
1581                return READ_ERR;
1582            }
1583            break;
1584        }
1585
1586        case BT_LTEP:
1587            dbgmsg( msgs, "Got a BT_LTEP" );
1588            parseLtep( msgs, msglen, inbuf );
1589            break;
1590
1591        default:
1592            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1593            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1594            break;
1595    }
1596
1597    assert( msglen + 1 == msgs->incoming.length );
1598    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1599
1600    msgs->state = AWAITING_BT_LENGTH;
1601    return READ_NOW;
1602}
1603
1604static void
1605decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1606{
1607    tr_torrent * tor = msgs->torrent;
1608
1609    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1610}
1611
1612static void
1613clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1614{
1615    decrementDownloadedCount( msgs, req->length );
1616}
1617
1618static void
1619addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1620{
1621    if( !msgs->peer->blame )
1622         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1623    tr_bitfieldAdd( msgs->peer->blame, index );
1624}
1625
1626/* returns 0 on success, or an errno on failure */
1627static int
1628clientGotBlock( tr_peermsgs *               msgs,
1629                const uint8_t *             data,
1630                const struct peer_request * req )
1631{
1632    int err;
1633    tr_torrent * tor = msgs->torrent;
1634    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1635
1636    assert( msgs );
1637    assert( req );
1638
1639    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1640        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1641                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1642        return EMSGSIZE;
1643    }
1644
1645    /* save the block */
1646    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1647
1648    /**
1649    *** Remove the block from our `we asked for this' list
1650    **/
1651
1652    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1653        clientGotUnwantedBlock( msgs, req );
1654        dbgmsg( msgs, "we didn't ask for this message..." );
1655        return 0;
1656    }
1657
1658    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1659            msgs->clientAskedFor.count );
1660
1661    /**
1662    *** Error checks
1663    **/
1664
1665    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1666        dbgmsg( msgs, "we have this block already..." );
1667        clientGotUnwantedBlock( msgs, req );
1668        return 0;
1669    }
1670
1671    /**
1672    ***  Save the block
1673    **/
1674
1675    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1676        return err;
1677
1678    addPeerToBlamefield( msgs, req->index );
1679    fireGotBlock( msgs, req );
1680    return 0;
1681}
1682
1683static int peerPulse( void * vmsgs );
1684
1685static void
1686didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1687{
1688    tr_peermsgs * msgs = vmsgs;
1689    firePeerGotData( msgs, bytesWritten, wasPieceData );
1690    peerPulse( msgs );
1691}
1692
1693static ReadState
1694canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1695{
1696    ReadState         ret;
1697    tr_peermsgs *     msgs = vmsgs;
1698    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1699    const size_t      inlen = EVBUFFER_LENGTH( in );
1700
1701    if( !inlen )
1702    {
1703        ret = READ_LATER;
1704    }
1705    else if( msgs->state == AWAITING_BT_PIECE )
1706    {
1707        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1708    }
1709    else switch( msgs->state )
1710    {
1711        case AWAITING_BT_LENGTH:
1712            ret = readBtLength ( msgs, in, inlen ); break;
1713
1714        case AWAITING_BT_ID:
1715            ret = readBtId     ( msgs, in, inlen ); break;
1716
1717        case AWAITING_BT_MESSAGE:
1718            ret = readBtMessage( msgs, in, inlen ); break;
1719
1720        default:
1721            assert( 0 );
1722    }
1723
1724    /* log the raw data that was read */
1725    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1726        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1727
1728    return ret;
1729}
1730
1731/**
1732***
1733**/
1734
1735static int
1736ratePulse( void * vmsgs )
1737{
1738    tr_peermsgs * msgs = vmsgs;
1739    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1740    const int seconds = 10;
1741    const int floor = 8;
1742    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1743
1744    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1745
1746    if( msgs->reqq > 0 )
1747        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1748
1749    return TRUE;
1750}
1751
1752static size_t
1753fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1754{
1755    size_t bytesWritten = 0;
1756    struct peer_request req;
1757    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1758    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1759
1760    /**
1761    ***  Protocol messages
1762    **/
1763
1764    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1765    {
1766        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1767        msgs->outMessagesBatchedAt = now;
1768    }
1769    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1770    {
1771        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1772        /* flush the protocol messages */
1773        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1774        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1775        msgs->clientSentAnythingAt = now;
1776        msgs->outMessagesBatchedAt = 0;
1777        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1778        bytesWritten +=  len;
1779    }
1780
1781    /**
1782    ***  Blocks
1783    **/
1784
1785    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1786        && popNextRequest( msgs, &req ) )
1787    {
1788        if( requestIsValid( msgs, &req )
1789            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1790        {
1791            /* send a block */
1792            uint8_t * buf = tr_new( uint8_t, req.length );
1793            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1794            if( err ) {
1795                fireError( msgs, err );
1796                bytesWritten = 0;
1797                msgs = NULL;
1798            } else {
1799                tr_peerIo * io = msgs->peer->io;
1800                struct evbuffer * out = evbuffer_new( );
1801                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1802                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1803                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1804                tr_peerIoWriteUint32( io, out, req.index );
1805                tr_peerIoWriteUint32( io, out, req.offset );
1806                tr_peerIoWriteBytes ( io, out, buf, req.length );
1807                tr_peerIoWriteBuf( io, out, TRUE );
1808                bytesWritten += EVBUFFER_LENGTH( out );
1809                evbuffer_free( out );
1810                msgs->clientSentAnythingAt = now;
1811            }
1812            tr_free( buf );
1813        }
1814        else if( fext ) /* peer needs a reject message */
1815        {
1816            protocolSendReject( msgs, &req );
1817        }
1818    }
1819
1820    /**
1821    ***  Keepalive
1822    **/
1823
1824    if( ( msgs != NULL )
1825        && ( msgs->clientSentAnythingAt != 0 )
1826        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1827    {
1828        dbgmsg( msgs, "sending a keepalive message" );
1829        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1830        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1831    }
1832
1833    return bytesWritten;
1834}
1835
1836static int
1837peerPulse( void * vmsgs )
1838{
1839    tr_peermsgs * msgs = vmsgs;
1840    const time_t  now = time( NULL );
1841
1842    ratePulse( msgs );
1843
1844    pumpRequestQueue( msgs, now );
1845    expireOldRequests( msgs, now );
1846
1847    for( ;; )
1848        if( fillOutputBuffer( msgs, now ) < 1 )
1849            break;
1850
1851    return TRUE; /* loop forever */
1852}
1853
1854void
1855tr_peerMsgsPulse( tr_peermsgs * msgs )
1856{
1857    if( msgs != NULL )
1858        peerPulse( msgs );
1859}
1860
1861static void
1862gotError( tr_peerIo  * io UNUSED,
1863          short        what,
1864          void       * vmsgs )
1865{
1866    if( what & EVBUFFER_TIMEOUT )
1867        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1868    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1869        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1870               what, errno, tr_strerror( errno ) );
1871    fireError( vmsgs, ENOTCONN );
1872}
1873
1874static void
1875sendBitfield( tr_peermsgs * msgs )
1876{
1877    struct evbuffer * out = msgs->outMessages;
1878    tr_bitfield *     field;
1879    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1880    size_t            i;
1881    size_t            lazyCount = 0;
1882
1883    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1884
1885    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1886    {
1887        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1888            speed over a truly random sample -- let's limit the pool size to
1889            the first 1000 pieces so large torrents don't bog things down */
1890        size_t poolSize;
1891        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1892        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1893
1894        /* build the pool */
1895        for( i=poolSize=0; i<maxPoolSize; ++i )
1896            if( tr_bitfieldHas( field, i ) )
1897                pool[poolSize++] = i;
1898
1899        /* pull random piece indices from the pool */
1900        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1901        {
1902            const int pos = tr_cryptoWeakRandInt( poolSize );
1903            const tr_piece_index_t piece = pool[pos];
1904            tr_bitfieldRem( field, piece );
1905            lazyPieces[lazyCount++] = piece;
1906            pool[pos] = pool[--poolSize];
1907        }
1908
1909        /* cleanup */
1910        tr_free( pool );
1911    }
1912
1913    tr_peerIoWriteUint32( msgs->peer->io, out,
1914                          sizeof( uint8_t ) + field->byteCount );
1915    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1916    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1917    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1918            EVBUFFER_LENGTH( out ) );
1919    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1920
1921    for( i = 0; i < lazyCount; ++i )
1922        protocolSendHave( msgs, lazyPieces[i] );
1923
1924    tr_bitfieldFree( field );
1925}
1926
1927static void
1928tellPeerWhatWeHave( tr_peermsgs * msgs )
1929{
1930    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1931
1932    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_CP_COMPLETE ) )
1933    {
1934        protocolSendHaveAll( msgs );
1935    }
1936    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1937    {
1938        protocolSendHaveNone( msgs );
1939    }
1940    else
1941    {
1942        sendBitfield( msgs );
1943    }
1944}
1945
1946/**
1947***
1948**/
1949
1950/* some peers give us error messages if we send
1951   more than this many peers in a single pex message
1952   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1953#define MAX_PEX_ADDED 50
1954#define MAX_PEX_DROPPED 50
1955
1956typedef struct
1957{
1958    tr_pex *  added;
1959    tr_pex *  dropped;
1960    tr_pex *  elements;
1961    int       addedCount;
1962    int       droppedCount;
1963    int       elementCount;
1964}
1965PexDiffs;
1966
1967static void
1968pexAddedCb( void * vpex,
1969            void * userData )
1970{
1971    PexDiffs * diffs = userData;
1972    tr_pex *   pex = vpex;
1973
1974    if( diffs->addedCount < MAX_PEX_ADDED )
1975    {
1976        diffs->added[diffs->addedCount++] = *pex;
1977        diffs->elements[diffs->elementCount++] = *pex;
1978    }
1979}
1980
1981static void
1982pexDroppedCb( void * vpex,
1983              void * userData )
1984{
1985    PexDiffs * diffs = userData;
1986    tr_pex *   pex = vpex;
1987
1988    if( diffs->droppedCount < MAX_PEX_DROPPED )
1989    {
1990        diffs->dropped[diffs->droppedCount++] = *pex;
1991    }
1992}
1993
1994static void
1995pexElementCb( void * vpex,
1996              void * userData )
1997{
1998    PexDiffs * diffs = userData;
1999    tr_pex *   pex = vpex;
2000
2001    diffs->elements[diffs->elementCount++] = *pex;
2002}
2003
2004static void
2005sendPex( tr_peermsgs * msgs )
2006{
2007    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2008    {
2009        PexDiffs diffs;
2010        tr_pex * newPex = NULL;
2011        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2012                                                 msgs->torrent->info.hash,
2013                                                 &newPex );
2014
2015        /* build the diffs */
2016        diffs.added = tr_new( tr_pex, newCount );
2017        diffs.addedCount = 0;
2018        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2019        diffs.droppedCount = 0;
2020        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2021        diffs.elementCount = 0;
2022        tr_set_compare( msgs->pex, msgs->pexCount,
2023                        newPex, newCount,
2024                        tr_pexCompare, sizeof( tr_pex ),
2025                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2026        dbgmsg(
2027            msgs,
2028            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2029            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
2030
2031        if( !diffs.addedCount && !diffs.droppedCount )
2032        {
2033            tr_free( diffs.elements );
2034        }
2035        else
2036        {
2037            int  i;
2038            tr_benc val;
2039            char * benc;
2040            int bencLen;
2041            uint8_t * tmp, *walk;
2042            struct evbuffer * out = msgs->outMessages;
2043
2044            /* update peer */
2045            tr_free( msgs->pex );
2046            msgs->pex = diffs.elements;
2047            msgs->pexCount = diffs.elementCount;
2048
2049            /* build the pex payload */
2050            tr_bencInitDict( &val, 3 );
2051
2052            /* "added" */
2053            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2054            for( i = 0; i < diffs.addedCount; ++i ) {
2055                memcpy( walk, &diffs.added[i].addr, 4 ); walk += 4;
2056                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2057            }
2058            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2059            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2060            tr_free( tmp );
2061
2062            /* "added.f" */
2063            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2064            for( i = 0; i < diffs.addedCount; ++i )
2065                *walk++ = diffs.added[i].flags;
2066            assert( ( walk - tmp ) == diffs.addedCount );
2067            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2068            tr_free( tmp );
2069
2070            /* "dropped" */
2071            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2072            for( i = 0; i < diffs.droppedCount; ++i ) {
2073                memcpy( walk, &diffs.dropped[i].addr, 4 ); walk += 4;
2074                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2075            }
2076            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2077            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2078            tr_free( tmp );
2079
2080            /* write the pex message */
2081            benc = tr_bencSave( &val, &bencLen );
2082            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2083            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2084            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2085            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2086            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2087            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2088
2089            tr_free( benc );
2090            tr_bencFree( &val );
2091        }
2092
2093        /* cleanup */
2094        tr_free( diffs.added );
2095        tr_free( diffs.dropped );
2096        tr_free( newPex );
2097
2098        msgs->clientSentPexAt = time( NULL );
2099    }
2100}
2101
2102static int
2103pexPulse( void * vpeer )
2104{
2105    sendPex( vpeer );
2106    return TRUE;
2107}
2108
2109/**
2110***
2111**/
2112
2113tr_peermsgs*
2114tr_peerMsgsNew( struct tr_torrent * torrent,
2115                struct tr_peer    * peer,
2116                tr_delivery_func    func,
2117                void              * userData,
2118                tr_publisher_tag  * setme )
2119{
2120    tr_peermsgs * m;
2121
2122    assert( peer );
2123    assert( peer->io );
2124
2125    m = tr_new0( tr_peermsgs, 1 );
2126    m->publisher = tr_publisherNew( );
2127    m->peer = peer;
2128    m->session = torrent->session;
2129    m->torrent = torrent;
2130    m->peer->clientIsChoked = 1;
2131    m->peer->peerIsChoked = 1;
2132    m->peer->clientIsInterested = 0;
2133    m->peer->peerIsInterested = 0;
2134    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2135    m->state = AWAITING_BT_LENGTH;
2136    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2137    m->outMessages = evbuffer_new( );
2138    m->outMessagesBatchedAt = 0;
2139    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2140    m->incoming.block = evbuffer_new( );
2141    m->peerAskedFor = REQUEST_LIST_INIT;
2142    m->clientAskedFor = REQUEST_LIST_INIT;
2143    m->clientWillAskFor = REQUEST_LIST_INIT;
2144    peer->msgs = m;
2145
2146    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2147
2148    if( tr_peerIoSupportsLTEP( peer->io ) )
2149        sendLtepHandshake( m );
2150
2151    tellPeerWhatWeHave( m );
2152
2153    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2154    ratePulse( m );
2155
2156    return m;
2157}
2158
2159void
2160tr_peerMsgsFree( tr_peermsgs* msgs )
2161{
2162    if( msgs )
2163    {
2164        tr_timerFree( &msgs->pexTimer );
2165        tr_publisherFree( &msgs->publisher );
2166        reqListClear( &msgs->clientWillAskFor );
2167        reqListClear( &msgs->clientAskedFor );
2168        reqListClear( &msgs->peerAskedFor );
2169
2170        evbuffer_free( msgs->incoming.block );
2171        evbuffer_free( msgs->outMessages );
2172        tr_free( msgs->pex );
2173
2174        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2175        tr_free( msgs );
2176    }
2177}
2178
2179void
2180tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2181                        tr_publisher_tag tag )
2182{
2183    tr_publisherUnsubscribe( peer->publisher, tag );
2184}
2185
Note: See TracBrowser for help on using the repository browser.