source: trunk/libtransmission/peer-msgs.c @ 7357

Last change on this file since 7357 was 7357, checked in by charles, 12 years ago

(libT) add some documentation about the three separate peer structs and how they are related.

  • Property svn:keywords set to Date Rev Author Id
File size: 61.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7357 2008-12-11 17:02:34Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57
58    BT_FEXT_SUGGEST         = 13,
59    BT_FEXT_HAVE_ALL        = 14,
60    BT_FEXT_HAVE_NONE       = 15,
61    BT_FEXT_REJECT          = 16,
62    BT_FEXT_ALLOWED_FAST    = 17,
63
64    BT_LTEP                 = 20,
65
66    LTEP_HANDSHAKE          = 0,
67
68    TR_LTEP_PEX             = 1,
69
70
71
72    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
73
74    /* idle seconds before we send a keepalive */
75    KEEPALIVE_INTERVAL_SECS = 100,
76
77    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
78    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
79                                               */
80
81    MAX_QUEUE_SIZE          = ( 100 ),
82
83    /* how long an unsent request can stay queued before it's returned
84       back to the peer-mgr's pool of requests */
85    QUEUED_REQUEST_TTL_SECS = 20,
86
87    /* how long a sent request can stay queued before it's returned
88       back to the peer-mgr's pool of requests */
89    SENT_REQUEST_TTL_SECS = 240,
90
91    /* used in lowering the outMessages queue period */
92    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
93    HIGH_PRIORITY_INTERVAL_SECS = 2,
94    LOW_PRIORITY_INTERVAL_SECS = 20,
95
96    /* number of pieces to remove from the bitfield when
97     * lazy bitfields are turned on */
98    LAZY_PIECE_COUNT = 26,
99
100    /* number of pieces we'll allow in our fast set */
101    MAX_FAST_SET_SIZE = 3
102};
103
104/**
105***  REQUEST MANAGEMENT
106**/
107
108enum
109{
110    AWAITING_BT_LENGTH,
111    AWAITING_BT_ID,
112    AWAITING_BT_MESSAGE,
113    AWAITING_BT_PIECE
114};
115
116struct peer_request
117{
118    uint32_t    index;
119    uint32_t    offset;
120    uint32_t    length;
121    time_t      time_requested;
122};
123
124static int
125compareRequest( const void * va, const void * vb )
126{
127    const struct peer_request * a = va;
128    const struct peer_request * b = vb;
129
130    if( a->index != b->index )
131        return a->index < b->index ? -1 : 1;
132
133    if( a->offset != b->offset )
134        return a->offset < b->offset ? -1 : 1;
135
136    if( a->length != b->length )
137        return a->length < b->length ? -1 : 1;
138
139    return 0;
140}
141
142struct request_list
143{
144    uint16_t               count;
145    uint16_t               max;
146    struct peer_request *  requests;
147};
148
149static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
150
151static void
152reqListReserve( struct request_list * list,
153                uint16_t              max )
154{
155    if( list->max < max )
156    {
157        list->max = max;
158        list->requests = tr_renew( struct peer_request,
159                                   list->requests,
160                                   list->max );
161    }
162}
163
164static void
165reqListClear( struct request_list * list )
166{
167    tr_free( list->requests );
168    *list = REQUEST_LIST_INIT;
169}
170
171static void
172reqListCopy( struct request_list * dest, const struct request_list * src )
173{
174    dest->count = dest->max = src->count;
175    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
176}
177
178static void
179reqListRemoveOne( struct request_list * list,
180                  int                   i )
181{
182    assert( 0 <= i && i < list->count );
183
184    memmove( &list->requests[i],
185             &list->requests[i + 1],
186             sizeof( struct peer_request ) * ( --list->count - i ) );
187}
188
189static void
190reqListAppend( struct request_list *       list,
191               const struct peer_request * req )
192{
193    if( ++list->count >= list->max )
194        reqListReserve( list, list->max + 8 );
195
196    list->requests[list->count - 1] = *req;
197}
198
199static int
200reqListPop( struct request_list * list,
201            struct peer_request * setme )
202{
203    int success;
204
205    if( !list->count )
206        success = FALSE;
207    else {
208        *setme = list->requests[0];
209        reqListRemoveOne( list, 0 );
210        success = TRUE;
211    }
212
213    return success;
214}
215
216static int
217reqListFind( struct request_list *       list,
218             const struct peer_request * key )
219{
220    uint16_t i;
221
222    for( i = 0; i < list->count; ++i )
223        if( !compareRequest( key, list->requests + i ) )
224            return i;
225
226    return -1;
227}
228
229static int
230reqListRemove( struct request_list *       list,
231               const struct peer_request * key )
232{
233    int success;
234    const int i = reqListFind( list, key );
235
236    if( i < 0 )
237        success = FALSE;
238    else {
239        reqListRemoveOne( list, i );
240        success = TRUE;
241    }
242
243    return success;
244}
245
246/**
247***
248**/
249
250/* this is raw, unchanged data from the peer regarding
251 * the current message that it's sending us. */
252struct tr_incoming
253{
254    uint8_t                id;
255    uint32_t               length; /* includes the +1 for id length */
256    struct peer_request    blockReq; /* metadata for incoming blocks */
257    struct evbuffer *      block; /* piece data for incoming blocks */
258};
259
260/**
261 * Low-level communication state information about a connected peer.
262 *
263 * This structure remembers the low-level protocol states that we're
264 * in with this peer, such as active requests, pex messages, and so on.
265 * Its fields are all private to peer-msgs.c.
266 *
267 * Data not directly involved with sending & receiving messages is
268 * stored in tr_peer, where it can be accessed by both peermsgs and
269 * the peer manager.
270 *
271 * @see struct peer_atom
272 * @see tr_peer
273 */
274struct tr_peermsgs
275{
276    tr_bool         peerSentBitfield;
277    tr_bool         peerSupportsPex;
278    tr_bool         clientSentLtepHandshake;
279    tr_bool         peerSentLtepHandshake;
280    tr_bool         haveFastSet;
281
282    uint8_t         state;
283    uint8_t         ut_pex_id;
284    uint16_t        pexCount;
285    uint16_t        minActiveRequests;
286    uint16_t        maxActiveRequests;
287
288    size_t                 fastsetSize;
289    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
290
291    /* how long the outMessages batch should be allowed to grow before
292     * it's flushed -- some messages (like requests >:) should be sent
293     * very quickly; others aren't as urgent. */
294    int                    outMessagesBatchPeriod;
295
296    tr_peer *              info;
297
298    tr_session *           session;
299    tr_torrent *           torrent;
300    tr_peerIo *            io;
301
302    tr_publisher_t *       publisher;
303
304    struct evbuffer *      outMessages; /* all the non-piece messages */
305
306    struct request_list    peerAskedFor;
307    struct request_list    clientAskedFor;
308    struct request_list    clientWillAskFor;
309
310    tr_timer *             pexTimer;
311
312    time_t                 clientSentPexAt;
313    time_t                 clientSentAnythingAt;
314
315    /* when we started batching the outMessages */
316    time_t                outMessagesBatchedAt;
317
318    tr_bitfield *         peerAllowedPieces;
319
320    struct tr_incoming    incoming;
321
322    tr_pex *              pex;
323
324    /* if the peer supports the Extension Protocol in BEP 10 and
325       supplied a reqq argument, it's stored here.  otherwise the
326       value is zero and should be ignored. */
327    int64_t               reqq;
328};
329
330/**
331***
332**/
333
334static void
335myDebug( const char * file, int line,
336         const struct tr_peermsgs * msgs,
337         const char * fmt, ... )
338{
339    FILE * fp = tr_getLog( );
340
341    if( fp )
342    {
343        va_list           args;
344        char              timestr[64];
345        struct evbuffer * buf = evbuffer_new( );
346        char *            base = tr_basename( file );
347
348        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
349                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
350                             msgs->torrent->info.name,
351                             tr_peerIoGetAddrStr( msgs->io ),
352                             msgs->info->client );
353        va_start( args, fmt );
354        evbuffer_add_vprintf( buf, fmt, args );
355        va_end( args );
356        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
357        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
358
359        tr_free( base );
360        evbuffer_free( buf );
361    }
362}
363
364#define dbgmsg( msgs, ... ) \
365    do { \
366        if( tr_deepLoggingIsActive( ) ) \
367            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
368    } while( 0 )
369
370/**
371***
372**/
373
374static void
375pokeBatchPeriod( tr_peermsgs * msgs,
376                 int           interval )
377{
378    if( msgs->outMessagesBatchPeriod > interval )
379    {
380        msgs->outMessagesBatchPeriod = interval;
381        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
382    }
383}
384
385static void
386dbgOutMessageLen( tr_peermsgs * msgs )
387{
388    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
389}
390
391static void
392protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
393{
394    tr_peerIo       * io  = msgs->io;
395    struct evbuffer * out = msgs->outMessages;
396
397    assert( tr_peerIoSupportsFEXT( msgs->io ) );
398
399    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
400    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
401    tr_peerIoWriteUint32( io, out, req->index );
402    tr_peerIoWriteUint32( io, out, req->offset );
403    tr_peerIoWriteUint32( io, out, req->length );
404
405    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
406    dbgOutMessageLen( msgs );
407}
408
409static void
410protocolSendRequest( tr_peermsgs *               msgs,
411                     const struct peer_request * req )
412{
413    tr_peerIo       * io  = msgs->io;
414    struct evbuffer * out = msgs->outMessages;
415
416    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
417    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
418    tr_peerIoWriteUint32( io, out, req->index );
419    tr_peerIoWriteUint32( io, out, req->offset );
420    tr_peerIoWriteUint32( io, out, req->length );
421
422    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
423    dbgOutMessageLen( msgs );
424    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
425}
426
427static void
428protocolSendCancel( tr_peermsgs *               msgs,
429                    const struct peer_request * req )
430{
431    tr_peerIo       * io  = msgs->io;
432    struct evbuffer * out = msgs->outMessages;
433
434    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
435    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
436    tr_peerIoWriteUint32( io, out, req->index );
437    tr_peerIoWriteUint32( io, out, req->offset );
438    tr_peerIoWriteUint32( io, out, req->length );
439
440    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
441    dbgOutMessageLen( msgs );
442    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
443}
444
445static void
446protocolSendHave( tr_peermsgs * msgs,
447                  uint32_t      index )
448{
449    tr_peerIo       * io  = msgs->io;
450    struct evbuffer * out = msgs->outMessages;
451
452    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
453    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
454    tr_peerIoWriteUint32( io, out, index );
455
456    dbgmsg( msgs, "sending Have %u...", index );
457    dbgOutMessageLen( msgs );
458    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
459}
460
461#if 0
462static void
463protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
464{
465    tr_peerIo       * io  = msgs->io;
466    struct evbuffer * out = msgs->outMessages;
467
468    assert( tr_peerIoSupportsFEXT( msgs->io ) );
469
470    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
471    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
472    tr_peerIoWriteUint32( io, out, pieceIndex );
473
474    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
475    dbgOutMessageLen( msgs );
476}
477#endif
478
479static void
480protocolSendChoke( tr_peermsgs * msgs,
481                   int           choke )
482{
483    tr_peerIo       * io  = msgs->io;
484    struct evbuffer * out = msgs->outMessages;
485
486    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
487    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
488
489    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
490    dbgOutMessageLen( msgs );
491    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
492}
493
494static void
495protocolSendHaveAll( tr_peermsgs * msgs )
496{
497    tr_peerIo       * io  = msgs->io;
498    struct evbuffer * out = msgs->outMessages;
499
500    assert( tr_peerIoSupportsFEXT( msgs->io ) );
501
502    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
503    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
504
505    dbgmsg( msgs, "sending HAVE_ALL..." );
506    dbgOutMessageLen( msgs );
507    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
508}
509
510static void
511protocolSendHaveNone( tr_peermsgs * msgs )
512{
513    tr_peerIo       * io  = msgs->io;
514    struct evbuffer * out = msgs->outMessages;
515
516    assert( tr_peerIoSupportsFEXT( msgs->io ) );
517
518    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
519    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
520
521    dbgmsg( msgs, "sending HAVE_NONE..." );
522    dbgOutMessageLen( msgs );
523    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
524}
525
526/**
527***  EVENTS
528**/
529
530static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
531
532static void
533publish( tr_peermsgs * msgs, tr_peer_event * e )
534{
535    assert( msgs->info );
536    assert( msgs->info->msgs == msgs );
537
538    tr_publisherPublish( msgs->publisher, msgs->info, e );
539}
540
541static void
542fireError( tr_peermsgs * msgs, int err )
543{
544    tr_peer_event e = blankEvent;
545    e.eventType = TR_PEER_ERROR;
546    e.err = err;
547    publish( msgs, &e );
548}
549
550static void
551fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
552{
553    tr_peer_event e = blankEvent;
554    e.eventType = TR_PEER_UPLOAD_ONLY;
555    e.uploadOnly = uploadOnly;
556    publish( msgs, &e );
557}
558
559static void
560fireNeedReq( tr_peermsgs * msgs )
561{
562    tr_peer_event e = blankEvent;
563    e.eventType = TR_PEER_NEED_REQ;
564    publish( msgs, &e );
565}
566
567static void
568firePeerProgress( tr_peermsgs * msgs )
569{
570    tr_peer_event e = blankEvent;
571    e.eventType = TR_PEER_PEER_PROGRESS;
572    e.progress = msgs->info->progress;
573    publish( msgs, &e );
574}
575
576static void
577fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
578{
579    tr_peer_event e = blankEvent;
580    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
581    e.pieceIndex = req->index;
582    e.offset = req->offset;
583    e.length = req->length;
584    publish( msgs, &e );
585}
586
587static void
588fireClientGotData( tr_peermsgs * msgs,
589                   uint32_t      length,
590                   int           wasPieceData )
591{
592    tr_peer_event e = blankEvent;
593
594    e.length = length;
595    e.eventType = TR_PEER_CLIENT_GOT_DATA;
596    e.wasPieceData = wasPieceData;
597    publish( msgs, &e );
598}
599
600static void
601fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
602{
603    tr_peer_event e = blankEvent;
604    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
605    e.pieceIndex = pieceIndex;
606    publish( msgs, &e );
607}
608
609static void
610fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
611{
612    tr_peer_event e = blankEvent;
613    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
614    e.pieceIndex = pieceIndex;
615    publish( msgs, &e );
616}
617
618static void
619firePeerGotData( tr_peermsgs  * msgs,
620                 uint32_t       length,
621                 int            wasPieceData )
622{
623    tr_peer_event e = blankEvent;
624
625    e.length = length;
626    e.eventType = TR_PEER_PEER_GOT_DATA;
627    e.wasPieceData = wasPieceData;
628
629    publish( msgs, &e );
630}
631
632static void
633fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
634{
635    tr_peer_event e = blankEvent;
636    e.eventType = TR_PEER_CANCEL;
637    e.pieceIndex = req->index;
638    e.offset = req->offset;
639    e.length = req->length;
640    publish( msgs, &e );
641}
642
643/**
644***  ALLOWED FAST SET
645***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
646**/
647
648size_t
649tr_generateAllowedSet( tr_piece_index_t * setmePieces,
650                       size_t             desiredSetSize,
651                       size_t             pieceCount,
652                       const uint8_t    * infohash,
653                       const tr_address * addr )
654{
655    size_t setSize = 0;
656
657    assert( setmePieces );
658    assert( desiredSetSize <= pieceCount );
659    assert( desiredSetSize );
660    assert( pieceCount );
661    assert( infohash );
662    assert( addr );
663
664    if( addr->type == TR_AF_INET )
665    {
666        uint8_t w[SHA_DIGEST_LENGTH + 4];
667        uint8_t x[SHA_DIGEST_LENGTH];
668
669        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
670        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
671        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
672
673        while( setSize<desiredSetSize )
674        {
675            int i;
676            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
677            {
678                size_t k;
679                uint32_t j = i * 4;                                  /* (5) */
680                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
681                uint32_t index = y % pieceCount;                     /* (7) */
682
683                for( k=0; k<setSize; ++k )                           /* (8) */
684                    if( setmePieces[k] == index )
685                        break;
686
687                if( k == setSize )
688                    setmePieces[setSize++] = index;                  /* (9) */
689            }
690
691            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
692        }
693    }
694
695    return setSize;
696}
697
698static void
699updateFastSet( tr_peermsgs * msgs UNUSED )
700{
701#if 0
702    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
703    const int peerIsNeedy = msgs->info->progress < 0.10;
704
705    if( fext && peerIsNeedy && !msgs->haveFastSet )
706    {
707        size_t i;
708        const struct tr_address * addr = tr_peerIoGetAddress( msgs->io, NULL );
709        const tr_info * inf = &msgs->torrent->info;
710        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
711
712        /* build the fast set */
713        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
714        msgs->haveFastSet = 1;
715
716        /* send it to the peer */
717        for( i=0; i<msgs->fastsetSize; ++i )
718            protocolSendAllowedFast( msgs, msgs->fastset[i] );
719    }
720#endif
721}
722
723/**
724***  INTEREST
725**/
726
727static int
728isPieceInteresting( const tr_peermsgs * peer,
729                    tr_piece_index_t    piece )
730{
731    const tr_torrent * torrent = peer->torrent;
732
733    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
734          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
735          && ( tr_bitfieldHas( peer->info->have, piece ) );    /* peer has it */
736}
737
738/* "interested" means we'll ask for piece data if they unchoke us */
739static int
740isPeerInteresting( const tr_peermsgs * msgs )
741{
742    tr_piece_index_t    i;
743    const tr_torrent *  torrent;
744    const tr_bitfield * bitfield;
745    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
746
747    if( clientIsSeed )
748        return FALSE;
749
750    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
751        return FALSE;
752
753    torrent = msgs->torrent;
754    bitfield = tr_cpPieceBitfield( torrent->completion );
755
756    if( !msgs->info->have )
757        return TRUE;
758
759    assert( bitfield->byteCount == msgs->info->have->byteCount );
760
761    for( i = 0; i < torrent->info.pieceCount; ++i )
762        if( isPieceInteresting( msgs, i ) )
763            return TRUE;
764
765    return FALSE;
766}
767
768static void
769sendInterest( tr_peermsgs * msgs,
770              int           weAreInterested )
771{
772    struct evbuffer * out = msgs->outMessages;
773
774    assert( msgs );
775    assert( weAreInterested == 0 || weAreInterested == 1 );
776
777    msgs->info->clientIsInterested = weAreInterested;
778    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
779    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
780    tr_peerIoWriteUint8 ( msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
781    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
782    dbgOutMessageLen( msgs );
783}
784
785static void
786updateInterest( tr_peermsgs * msgs )
787{
788    const int i = isPeerInteresting( msgs );
789
790    if( i != msgs->info->clientIsInterested )
791        sendInterest( msgs, i );
792    if( i )
793        fireNeedReq( msgs );
794}
795
796static int
797popNextRequest( tr_peermsgs *         msgs,
798                struct peer_request * setme )
799{
800    return reqListPop( &msgs->peerAskedFor, setme );
801}
802
803static void
804cancelAllRequestsToClient( tr_peermsgs * msgs )
805{
806    struct peer_request req;
807    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->io );
808
809    while( popNextRequest( msgs, &req ) )
810        if( mustSendCancel )
811            protocolSendReject( msgs, &req );
812}
813
814void
815tr_peerMsgsSetChoke( tr_peermsgs * msgs,
816                     int           choke )
817{
818    const time_t now = time( NULL );
819    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
820
821    assert( msgs );
822    assert( msgs->info );
823    assert( choke == 0 || choke == 1 );
824
825    if( msgs->info->chokeChangedAt > fibrillationTime )
826    {
827        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
828    }
829    else if( msgs->info->peerIsChoked != choke )
830    {
831        msgs->info->peerIsChoked = choke;
832        if( choke )
833            cancelAllRequestsToClient( msgs );
834        protocolSendChoke( msgs, choke );
835        msgs->info->chokeChangedAt = now;
836    }
837}
838
839/**
840***
841**/
842
843void
844tr_peerMsgsHave( tr_peermsgs * msgs,
845                 uint32_t      index )
846{
847    protocolSendHave( msgs, index );
848
849    /* since we have more pieces now, we might not be interested in this peer */
850    updateInterest( msgs );
851}
852
853/**
854***
855**/
856
857static int
858reqIsValid( const tr_peermsgs * peer,
859            uint32_t            index,
860            uint32_t            offset,
861            uint32_t            length )
862{
863    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
864}
865
866static int
867requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
868{
869    return reqIsValid( msgs, req->index, req->offset, req->length );
870}
871
872static void
873expireOldRequests( tr_peermsgs * msgs, const time_t now  )
874{
875    int i;
876    time_t oldestAllowed;
877    struct request_list tmp = REQUEST_LIST_INIT;
878    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
879
880    /* cancel requests that have been queued for too long */
881    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
882    reqListCopy( &tmp, &msgs->clientWillAskFor );
883    for( i = 0; i < tmp.count; ++i ) {
884        const struct peer_request * req = &tmp.requests[i];
885        if( req->time_requested < oldestAllowed )
886            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
887    }
888    reqListClear( &tmp );
889
890    /* if the peer doesn't support "Reject Request",
891     * cancel requests that were sent too long ago. */
892    if( !fext ) {
893        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
894        reqListCopy( &tmp, &msgs->clientAskedFor );
895        for( i=0; i<tmp.count; ++i ) {
896            const struct peer_request * req = &tmp.requests[i];
897            if( req->time_requested < oldestAllowed )
898                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
899        }
900        reqListClear( &tmp );
901    }
902}
903
904static void
905pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
906{
907    const int           max = msgs->maxActiveRequests;
908    const int           min = msgs->minActiveRequests;
909    int                 sent = 0;
910    int                 count = msgs->clientAskedFor.count;
911    struct peer_request req;
912
913    if( count > min )
914        return;
915    if( msgs->info->clientIsChoked )
916        return;
917    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
918        return;
919
920    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
921    {
922        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
923
924        assert( requestIsValid( msgs, &req ) );
925        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
926
927        /* don't ask for it if we've already got it... this block may have
928         * come in from a different peer after we cancelled a request for it */
929        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
930        {
931            protocolSendRequest( msgs, &req );
932            req.time_requested = now;
933            reqListAppend( &msgs->clientAskedFor, &req );
934
935            ++count;
936            ++sent;
937        }
938    }
939
940    if( sent )
941        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
942                sent,
943                msgs->clientAskedFor.count,
944                msgs->clientWillAskFor.count );
945
946    if( count < max )
947        fireNeedReq( msgs );
948}
949
950static int
951requestQueueIsFull( const tr_peermsgs * msgs )
952{
953    const int req_max = msgs->maxActiveRequests;
954    return msgs->clientWillAskFor.count >= req_max;
955}
956
957tr_addreq_t
958tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
959                       uint32_t         index,
960                       uint32_t         offset,
961                       uint32_t         length,
962                       int              doForce )
963{
964    struct peer_request req;
965
966    assert( msgs );
967    assert( msgs->torrent );
968    assert( reqIsValid( msgs, index, offset, length ) );
969
970    /**
971    ***  Reasons to decline the request
972    **/
973
974    /* don't send requests to choked clients */
975    if( !doForce && msgs->info->clientIsChoked ) {
976        dbgmsg( msgs, "declining request because they're choking us" );
977        return TR_ADDREQ_CLIENT_CHOKED;
978    }
979
980    /* peer doesn't have this piece */
981    if( !doForce && !tr_bitfieldHas( msgs->info->have, index ) )
982        return TR_ADDREQ_MISSING;
983
984    /* peer's queue is full */
985    if( !doForce && requestQueueIsFull( msgs ) ) {
986        dbgmsg( msgs, "declining request because we're full" );
987        return TR_ADDREQ_FULL;
988    }
989
990    /* have we already asked for this piece? */
991    req.index = index;
992    req.offset = offset;
993    req.length = length;
994    if( !doForce ) {
995        if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
996            dbgmsg( msgs, "declining because it's a duplicate" );
997            return TR_ADDREQ_DUPLICATE;
998        }
999        if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
1000            dbgmsg( msgs, "declining because it's a duplicate" );
1001            return TR_ADDREQ_DUPLICATE;
1002        }
1003    }
1004
1005    /**
1006    ***  Accept this request
1007    **/
1008
1009    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1010            index, offset, length );
1011    req.time_requested = time( NULL );
1012    reqListAppend( &msgs->clientWillAskFor, &req );
1013    return TR_ADDREQ_OK;
1014}
1015
1016static void
1017cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1018{
1019    int i;
1020    struct request_list a = msgs->clientWillAskFor;
1021    struct request_list b = msgs->clientAskedFor;
1022    dbgmsg( msgs, "cancelling all requests to peer" );
1023
1024    msgs->clientAskedFor = REQUEST_LIST_INIT;
1025    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1026
1027    for( i=0; i<a.count; ++i )
1028        fireCancelledReq( msgs, &a.requests[i] );
1029
1030    for( i = 0; i < b.count; ++i ) {
1031        fireCancelledReq( msgs, &b.requests[i] );
1032        if( sendCancel )
1033            protocolSendCancel( msgs, &b.requests[i] );
1034    }
1035
1036    reqListClear( &a );
1037    reqListClear( &b );
1038}
1039
1040void
1041tr_peerMsgsCancel( tr_peermsgs * msgs,
1042                   uint32_t      pieceIndex,
1043                   uint32_t      offset,
1044                   uint32_t      length )
1045{
1046    struct peer_request req;
1047
1048    assert( msgs != NULL );
1049    assert( length > 0 );
1050
1051
1052    /* have we asked the peer for this piece? */
1053    req.index = pieceIndex;
1054    req.offset = offset;
1055    req.length = length;
1056
1057    /* if it's only in the queue and hasn't been sent yet, free it */
1058    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1059        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1060        fireCancelledReq( msgs, &req );
1061    }
1062
1063    /* if it's already been sent, send a cancel message too */
1064    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1065        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1066        protocolSendCancel( msgs, &req );
1067        fireCancelledReq( msgs, &req );
1068    }
1069}
1070
1071
1072/**
1073***
1074**/
1075
1076static void
1077sendLtepHandshake( tr_peermsgs * msgs )
1078{
1079    tr_benc val, *m;
1080    char * buf;
1081    int len;
1082    int pex;
1083    struct evbuffer * out = msgs->outMessages;
1084
1085    if( msgs->clientSentLtepHandshake )
1086        return;
1087
1088    dbgmsg( msgs, "sending an ltep handshake" );
1089    msgs->clientSentLtepHandshake = 1;
1090
1091    /* decide if we want to advertise pex support */
1092    if( !tr_torrentAllowsPex( msgs->torrent ) )
1093        pex = 0;
1094    else if( msgs->peerSentLtepHandshake )
1095        pex = msgs->peerSupportsPex ? 1 : 0;
1096    else
1097        pex = 1;
1098
1099    tr_bencInitDict( &val, 5 );
1100    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1101    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1102    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1103    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1104    m  = tr_bencDictAddDict( &val, "m", 1 );
1105    if( pex )
1106        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1107    buf = tr_bencSave( &val, &len );
1108
1109    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1110    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1111    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1112    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1113    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1114    dbgOutMessageLen( msgs );
1115
1116    /* cleanup */
1117    tr_bencFree( &val );
1118    tr_free( buf );
1119}
1120
1121static void
1122parseLtepHandshake( tr_peermsgs *     msgs,
1123                    int               len,
1124                    struct evbuffer * inbuf )
1125{
1126    int64_t   i;
1127    tr_benc   val, * sub;
1128    uint8_t * tmp = tr_new( uint8_t, len );
1129
1130    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1131    msgs->peerSentLtepHandshake = 1;
1132
1133    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1134    {
1135        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1136        tr_free( tmp );
1137        return;
1138    }
1139
1140    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1141
1142    /* does the peer prefer encrypted connections? */
1143    if( tr_bencDictFindInt( &val, "e", &i ) )
1144        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1145                                              : ENCRYPTION_PREFERENCE_NO;
1146
1147    /* check supported messages for utorrent pex */
1148    msgs->peerSupportsPex = 0;
1149    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1150        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1151            msgs->ut_pex_id = (uint8_t) i;
1152            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1153            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1154        }
1155    }
1156
1157    /* look for upload_only (BEP 21) */
1158    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1159        fireUploadOnly( msgs, i!=0 );
1160
1161    /* get peer's listening port */
1162    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1163        msgs->info->port = htons( (uint16_t)i );
1164        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1165    }
1166
1167    /* get peer's maximum request queue size */
1168    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1169        msgs->reqq = i;
1170
1171    tr_bencFree( &val );
1172    tr_free( tmp );
1173}
1174
1175static void
1176parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1177{
1178    int loaded = 0;
1179    uint8_t * tmp = tr_new( uint8_t, msglen );
1180    tr_benc val;
1181    const tr_torrent * tor = msgs->torrent;
1182    const uint8_t * added;
1183    size_t added_len;
1184
1185    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1186
1187    if( tr_torrentAllowsPex( tor )
1188      && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
1189      && tr_bencDictFindRaw( &val, "added", &added, &added_len ))
1190    {
1191        const uint8_t * added_f = NULL;
1192        tr_pex *        pex;
1193        size_t          i, n;
1194        size_t          added_f_len = 0;
1195        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1196        pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1197        for( i=0; i<n; ++i )
1198            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1199                              TR_PEER_FROM_PEX, pex + i );
1200        tr_free( pex );
1201    }
1202
1203    if( loaded )
1204        tr_bencFree( &val );
1205    tr_free( tmp );
1206}
1207
1208static void sendPex( tr_peermsgs * msgs );
1209
1210static void
1211parseLtep( tr_peermsgs *     msgs,
1212           int               msglen,
1213           struct evbuffer * inbuf )
1214{
1215    uint8_t ltep_msgid;
1216
1217    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1218    msglen--;
1219
1220    if( ltep_msgid == LTEP_HANDSHAKE )
1221    {
1222        dbgmsg( msgs, "got ltep handshake" );
1223        parseLtepHandshake( msgs, msglen, inbuf );
1224        if( tr_peerIoSupportsLTEP( msgs->io ) )
1225        {
1226            sendLtepHandshake( msgs );
1227            sendPex( msgs );
1228        }
1229    }
1230    else if( ltep_msgid == TR_LTEP_PEX )
1231    {
1232        dbgmsg( msgs, "got ut pex" );
1233        msgs->peerSupportsPex = 1;
1234        parseUtPex( msgs, msglen, inbuf );
1235    }
1236    else
1237    {
1238        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1239        evbuffer_drain( inbuf, msglen );
1240    }
1241}
1242
1243static int
1244readBtLength( tr_peermsgs *     msgs,
1245              struct evbuffer * inbuf,
1246              size_t            inlen )
1247{
1248    uint32_t len;
1249
1250    if( inlen < sizeof( len ) )
1251        return READ_LATER;
1252
1253    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1254
1255    if( len == 0 ) /* peer sent us a keepalive message */
1256        dbgmsg( msgs, "got KeepAlive" );
1257    else
1258    {
1259        msgs->incoming.length = len;
1260        msgs->state = AWAITING_BT_ID;
1261    }
1262
1263    return READ_NOW;
1264}
1265
1266static int readBtMessage( tr_peermsgs *     msgs,
1267                          struct evbuffer * inbuf,
1268                          size_t            inlen );
1269
1270static int
1271readBtId( tr_peermsgs *     msgs,
1272          struct evbuffer * inbuf,
1273          size_t            inlen )
1274{
1275    uint8_t id;
1276
1277    if( inlen < sizeof( uint8_t ) )
1278        return READ_LATER;
1279
1280    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1281    msgs->incoming.id = id;
1282
1283    if( id == BT_PIECE )
1284    {
1285        msgs->state = AWAITING_BT_PIECE;
1286        return READ_NOW;
1287    }
1288    else if( msgs->incoming.length != 1 )
1289    {
1290        msgs->state = AWAITING_BT_MESSAGE;
1291        return READ_NOW;
1292    }
1293    else return readBtMessage( msgs, inbuf, inlen - 1 );
1294}
1295
1296static void
1297updatePeerProgress( tr_peermsgs * msgs )
1298{
1299    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
1300    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1301    updateFastSet( msgs );
1302    updateInterest( msgs );
1303    firePeerProgress( msgs );
1304}
1305
1306static void
1307peerMadeRequest( tr_peermsgs *               msgs,
1308                 const struct peer_request * req )
1309{
1310    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
1311    const int reqIsValid = requestIsValid( msgs, req );
1312    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1313    const int peerIsChoked = msgs->info->peerIsChoked;
1314
1315    int allow = FALSE;
1316
1317    if( !reqIsValid )
1318        dbgmsg( msgs, "rejecting an invalid request." );
1319    else if( !clientHasPiece )
1320        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1321    else if( peerIsChoked )
1322        dbgmsg( msgs, "rejecting request from choked peer" );
1323    else
1324        allow = TRUE;
1325
1326    if( allow )
1327        reqListAppend( &msgs->peerAskedFor, req );
1328    else if( fext )
1329        protocolSendReject( msgs, req );
1330}
1331
1332static int
1333messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1334{
1335    switch( id )
1336    {
1337        case BT_CHOKE:
1338        case BT_UNCHOKE:
1339        case BT_INTERESTED:
1340        case BT_NOT_INTERESTED:
1341        case BT_FEXT_HAVE_ALL:
1342        case BT_FEXT_HAVE_NONE:
1343            return len == 1;
1344
1345        case BT_HAVE:
1346        case BT_FEXT_SUGGEST:
1347        case BT_FEXT_ALLOWED_FAST:
1348            return len == 5;
1349
1350        case BT_BITFIELD:
1351            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1352
1353        case BT_REQUEST:
1354        case BT_CANCEL:
1355        case BT_FEXT_REJECT:
1356            return len == 13;
1357
1358        case BT_PIECE:
1359            return len > 9 && len <= 16393;
1360
1361        case BT_PORT:
1362            return len == 3;
1363
1364        case BT_LTEP:
1365            return len >= 2;
1366
1367        default:
1368            return FALSE;
1369    }
1370}
1371
1372static int clientGotBlock( tr_peermsgs *               msgs,
1373                           const uint8_t *             block,
1374                           const struct peer_request * req );
1375
1376static int
1377readBtPiece( tr_peermsgs      * msgs,
1378             struct evbuffer  * inbuf,
1379             size_t             inlen,
1380             size_t           * setme_piece_bytes_read )
1381{
1382    struct peer_request * req = &msgs->incoming.blockReq;
1383
1384    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1385    dbgmsg( msgs, "In readBtPiece" );
1386
1387    if( !req->length )
1388    {
1389        if( inlen < 8 )
1390            return READ_LATER;
1391
1392        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1393        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1394        req->length = msgs->incoming.length - 9;
1395        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1396        return READ_NOW;
1397    }
1398    else
1399    {
1400        int err;
1401
1402        /* read in another chunk of data */
1403        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1404        size_t n = MIN( nLeft, inlen );
1405        uint8_t * buf = tr_new( uint8_t, n );
1406        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1407        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1408        evbuffer_add( msgs->incoming.block, buf, n );
1409        fireClientGotData( msgs, n, TRUE );
1410        *setme_piece_bytes_read += n;
1411        tr_free( buf );
1412        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1413               n, req->index, req->offset, req->length,
1414               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1415        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1416            return READ_LATER;
1417
1418        /* we've got the whole block ... process it */
1419        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1420
1421        /* cleanup */
1422        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1423        req->length = 0;
1424        msgs->state = AWAITING_BT_LENGTH;
1425        if( !err )
1426            return READ_NOW;
1427        else {
1428            fireError( msgs, err );
1429            return READ_ERR;
1430        }
1431    }
1432}
1433
1434static int
1435readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1436{
1437    uint32_t      ui32;
1438    uint32_t      msglen = msgs->incoming.length;
1439    const uint8_t id = msgs->incoming.id;
1440    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1441    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
1442
1443    --msglen; /* id length */
1444
1445    if( inlen < msglen )
1446        return READ_LATER;
1447
1448    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1449
1450    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1451    {
1452        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1453        fireError( msgs, EMSGSIZE );
1454        return READ_ERR;
1455    }
1456
1457    switch( id )
1458    {
1459        case BT_CHOKE:
1460            dbgmsg( msgs, "got Choke" );
1461            msgs->info->clientIsChoked = 1;
1462            if( !fext )
1463                cancelAllRequestsToPeer( msgs, FALSE );
1464            break;
1465
1466        case BT_UNCHOKE:
1467            dbgmsg( msgs, "got Unchoke" );
1468            msgs->info->clientIsChoked = 0;
1469            fireNeedReq( msgs );
1470            break;
1471
1472        case BT_INTERESTED:
1473            dbgmsg( msgs, "got Interested" );
1474            msgs->info->peerIsInterested = 1;
1475            break;
1476
1477        case BT_NOT_INTERESTED:
1478            dbgmsg( msgs, "got Not Interested" );
1479            msgs->info->peerIsInterested = 0;
1480            break;
1481
1482        case BT_HAVE:
1483            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1484            dbgmsg( msgs, "got Have: %u", ui32 );
1485            if( tr_bitfieldAdd( msgs->info->have, ui32 ) ) {
1486                fireError( msgs, ERANGE );
1487                return READ_ERR;
1488            }
1489            updatePeerProgress( msgs );
1490            tr_rcTransferred( msgs->torrent->swarmSpeed,
1491                              msgs->torrent->info.pieceSize );
1492            break;
1493
1494        case BT_BITFIELD:
1495        {
1496            dbgmsg( msgs, "got a bitfield" );
1497            msgs->peerSentBitfield = 1;
1498            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1499            updatePeerProgress( msgs );
1500            fireNeedReq( msgs );
1501            break;
1502        }
1503
1504        case BT_REQUEST:
1505        {
1506            struct peer_request r;
1507            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1508            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1509            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1510            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1511            peerMadeRequest( msgs, &r );
1512            break;
1513        }
1514
1515        case BT_CANCEL:
1516        {
1517            struct peer_request r;
1518            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1519            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1520            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1521            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1522            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1523                protocolSendReject( msgs, &r );
1524            break;
1525        }
1526
1527        case BT_PIECE:
1528            assert( 0 ); /* handled elsewhere! */
1529            break;
1530
1531        case BT_PORT:
1532            dbgmsg( msgs, "Got a BT_PORT" );
1533            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1534            break;
1535
1536        case BT_FEXT_SUGGEST:
1537            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1538            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1539            if( fext )
1540                fireClientGotSuggest( msgs, ui32 );
1541            else {
1542                fireError( msgs, EMSGSIZE );
1543                return READ_ERR;
1544            }
1545            break;
1546
1547        case BT_FEXT_ALLOWED_FAST:
1548            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1549            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1550            if( fext )
1551                fireClientGotAllowedFast( msgs, ui32 );
1552            else {
1553                fireError( msgs, EMSGSIZE );
1554                return READ_ERR;
1555            }
1556            break;
1557
1558        case BT_FEXT_HAVE_ALL:
1559            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1560            if( fext ) {
1561                tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1562                updatePeerProgress( msgs );
1563            } else {
1564                fireError( msgs, EMSGSIZE );
1565                return READ_ERR;
1566            }
1567            break;
1568
1569
1570        case BT_FEXT_HAVE_NONE:
1571            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1572            if( fext ) {
1573                tr_bitfieldClear( msgs->info->have );
1574                updatePeerProgress( msgs );
1575            } else {
1576                fireError( msgs, EMSGSIZE );
1577                return READ_ERR;
1578            }
1579            break;
1580
1581        case BT_FEXT_REJECT:
1582        {
1583            struct peer_request r;
1584            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1585            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1586            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1587            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1588            if( fext )
1589                reqListRemove( &msgs->clientAskedFor, &r );
1590            else {
1591                fireError( msgs, EMSGSIZE );
1592                return READ_ERR;
1593            }
1594            break;
1595        }
1596
1597        case BT_LTEP:
1598            dbgmsg( msgs, "Got a BT_LTEP" );
1599            parseLtep( msgs, msglen, inbuf );
1600            break;
1601
1602        default:
1603            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1604            tr_peerIoDrain( msgs->io, inbuf, msglen );
1605            break;
1606    }
1607
1608    assert( msglen + 1 == msgs->incoming.length );
1609    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1610
1611    msgs->state = AWAITING_BT_LENGTH;
1612    return READ_NOW;
1613}
1614
1615static void
1616decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1617{
1618    tr_torrent * tor = msgs->torrent;
1619
1620    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1621}
1622
1623static void
1624clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1625{
1626    decrementDownloadedCount( msgs, req->length );
1627}
1628
1629static void
1630addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1631{
1632    if( !msgs->info->blame )
1633         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1634    tr_bitfieldAdd( msgs->info->blame, index );
1635}
1636
1637/* returns 0 on success, or an errno on failure */
1638static int
1639clientGotBlock( tr_peermsgs *               msgs,
1640                const uint8_t *             data,
1641                const struct peer_request * req )
1642{
1643    int err;
1644    tr_torrent * tor = msgs->torrent;
1645    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1646
1647    assert( msgs );
1648    assert( req );
1649
1650    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1651        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1652                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1653        return EMSGSIZE;
1654    }
1655
1656    /* save the block */
1657    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1658
1659    /**
1660    *** Remove the block from our `we asked for this' list
1661    **/
1662
1663    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1664        clientGotUnwantedBlock( msgs, req );
1665        dbgmsg( msgs, "we didn't ask for this message..." );
1666        return 0;
1667    }
1668
1669    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1670            msgs->clientAskedFor.count );
1671
1672    /**
1673    *** Error checks
1674    **/
1675
1676    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1677        dbgmsg( msgs, "we have this block already..." );
1678        clientGotUnwantedBlock( msgs, req );
1679        return 0;
1680    }
1681
1682    /**
1683    ***  Save the block
1684    **/
1685
1686    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1687        return err;
1688
1689    addPeerToBlamefield( msgs, req->index );
1690    fireGotBlock( msgs, req );
1691    return 0;
1692}
1693
1694static int peerPulse( void * vmsgs );
1695
1696static void
1697didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1698{
1699    tr_peermsgs * msgs = vmsgs;
1700    firePeerGotData( msgs, bytesWritten, wasPieceData );
1701    peerPulse( msgs );
1702}
1703
1704static ReadState
1705canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1706{
1707    ReadState         ret;
1708    tr_peermsgs *     msgs = vmsgs;
1709    struct evbuffer * in = tr_iobuf_input( iobuf );
1710    const size_t      inlen = EVBUFFER_LENGTH( in );
1711
1712    if( !inlen )
1713    {
1714        ret = READ_LATER;
1715    }
1716    else if( msgs->state == AWAITING_BT_PIECE )
1717    {
1718        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1719    }
1720    else switch( msgs->state )
1721    {
1722        case AWAITING_BT_LENGTH:
1723            ret = readBtLength ( msgs, in, inlen ); break;
1724
1725        case AWAITING_BT_ID:
1726            ret = readBtId     ( msgs, in, inlen ); break;
1727
1728        case AWAITING_BT_MESSAGE:
1729            ret = readBtMessage( msgs, in, inlen ); break;
1730
1731        default:
1732            assert( 0 );
1733    }
1734
1735    /* log the raw data that was read */
1736    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1737        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1738
1739    return ret;
1740}
1741
1742/**
1743***
1744**/
1745
1746static int
1747ratePulse( void * vmsgs )
1748{
1749    tr_peermsgs * msgs = vmsgs;
1750    const double rateToClient = tr_peerGetPieceSpeed( msgs->info, TR_PEER_TO_CLIENT );
1751    const int estimatedBlocksInNext30Seconds =
1752                  ( rateToClient * 30 * 1024 ) / msgs->torrent->blockSize;
1753    msgs->minActiveRequests = 8;
1754    msgs->maxActiveRequests = msgs->minActiveRequests + estimatedBlocksInNext30Seconds;
1755    if( msgs->reqq > 0 )
1756        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1757    return TRUE;
1758}
1759
1760static size_t
1761fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1762{
1763    size_t bytesWritten = 0;
1764    struct peer_request req;
1765    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1766    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
1767
1768    /**
1769    ***  Protocol messages
1770    **/
1771
1772    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1773    {
1774        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1775        msgs->outMessagesBatchedAt = now;
1776    }
1777    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1778    {
1779        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1780        /* flush the protocol messages */
1781        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len );
1782        tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1783        msgs->clientSentAnythingAt = now;
1784        msgs->outMessagesBatchedAt = 0;
1785        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1786        bytesWritten +=  len;
1787    }
1788
1789    /**
1790    ***  Blocks
1791    **/
1792
1793    if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
1794        && popNextRequest( msgs, &req ) )
1795    {
1796        if( requestIsValid( msgs, &req )
1797            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1798        {
1799            /* send a block */
1800            uint8_t * buf = tr_new( uint8_t, req.length );
1801            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1802            if( err ) {
1803                fireError( msgs, err );
1804                bytesWritten = 0;
1805                msgs = NULL;
1806            } else {
1807                tr_peerIo * io = msgs->io;
1808                struct evbuffer * out = evbuffer_new( );
1809                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1810                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1811                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1812                tr_peerIoWriteUint32( io, out, req.index );
1813                tr_peerIoWriteUint32( io, out, req.offset );
1814                tr_peerIoWriteBytes ( io, out, buf, req.length );
1815                tr_peerIoWriteBuf( io, out, TRUE );
1816                bytesWritten += EVBUFFER_LENGTH( out );
1817                evbuffer_free( out );
1818                msgs->clientSentAnythingAt = now;
1819            }
1820            tr_free( buf );
1821        }
1822        else if( fext ) /* peer needs a reject message */
1823        {
1824            protocolSendReject( msgs, &req );
1825        }
1826    }
1827
1828    /**
1829    ***  Keepalive
1830    **/
1831
1832    if( ( msgs != NULL )
1833        && ( msgs->clientSentAnythingAt != 0 )
1834        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1835    {
1836        dbgmsg( msgs, "sending a keepalive message" );
1837        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1838        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1839    }
1840
1841    return bytesWritten;
1842}
1843
1844static int
1845peerPulse( void * vmsgs )
1846{
1847    tr_peermsgs * msgs = vmsgs;
1848    const time_t  now = time( NULL );
1849
1850    ratePulse( msgs );
1851
1852    pumpRequestQueue( msgs, now );
1853    expireOldRequests( msgs, now );
1854
1855    for( ;; )
1856        if( fillOutputBuffer( msgs, now ) < 1 )
1857            break;
1858
1859    return TRUE; /* loop forever */
1860}
1861
1862void
1863tr_peerMsgsPulse( tr_peermsgs * msgs )
1864{
1865    if( msgs != NULL )
1866        peerPulse( msgs );
1867}
1868
1869static void
1870gotError( struct tr_iobuf  * iobuf UNUSED,
1871          short              what,
1872          void             * vmsgs )
1873{
1874    if( what & EVBUFFER_TIMEOUT )
1875        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1876    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1877        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1878               what, errno, tr_strerror( errno ) );
1879    fireError( vmsgs, ENOTCONN );
1880}
1881
1882static void
1883sendBitfield( tr_peermsgs * msgs )
1884{
1885    struct evbuffer * out = msgs->outMessages;
1886    tr_bitfield *     field;
1887    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1888    size_t            i;
1889    size_t            lazyCount = 0;
1890
1891    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1892
1893    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1894    {
1895        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1896            speed over a truly random sample -- let's limit the pool size to
1897            the first 1000 pieces so large torrents don't bog things down */
1898        size_t poolSize;
1899        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1900        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1901
1902        /* build the pool */
1903        for( i=poolSize=0; i<maxPoolSize; ++i )
1904            if( tr_bitfieldHas( field, i ) )
1905                pool[poolSize++] = i;
1906
1907        /* pull random piece indices from the pool */
1908        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1909        {
1910            const int pos = tr_cryptoWeakRandInt( poolSize );
1911            const tr_piece_index_t piece = pool[pos];
1912            tr_bitfieldRem( field, piece );
1913            lazyPieces[lazyCount++] = piece;
1914            pool[pos] = pool[--poolSize];
1915        }
1916
1917        /* cleanup */
1918        tr_free( pool );
1919    }
1920
1921    tr_peerIoWriteUint32( msgs->io, out,
1922                          sizeof( uint8_t ) + field->byteCount );
1923    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1924    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1925    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1926            EVBUFFER_LENGTH( out ) );
1927    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1928
1929    for( i = 0; i < lazyCount; ++i )
1930        protocolSendHave( msgs, lazyPieces[i] );
1931
1932    tr_bitfieldFree( field );
1933}
1934
1935static void
1936tellPeerWhatWeHave( tr_peermsgs * msgs )
1937{
1938    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
1939
1940    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1941    {
1942        protocolSendHaveAll( msgs );
1943    }
1944    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1945    {
1946        protocolSendHaveNone( msgs );
1947    }
1948    else
1949    {
1950        sendBitfield( msgs );
1951    }
1952}
1953
1954/**
1955***
1956**/
1957
1958/* some peers give us error messages if we send
1959   more than this many peers in a single pex message
1960   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1961#define MAX_PEX_ADDED 50
1962#define MAX_PEX_DROPPED 50
1963
1964typedef struct
1965{
1966    tr_pex *  added;
1967    tr_pex *  dropped;
1968    tr_pex *  elements;
1969    int       addedCount;
1970    int       droppedCount;
1971    int       elementCount;
1972}
1973PexDiffs;
1974
1975static void
1976pexAddedCb( void * vpex,
1977            void * userData )
1978{
1979    PexDiffs * diffs = userData;
1980    tr_pex *   pex = vpex;
1981
1982    if( diffs->addedCount < MAX_PEX_ADDED )
1983    {
1984        diffs->added[diffs->addedCount++] = *pex;
1985        diffs->elements[diffs->elementCount++] = *pex;
1986    }
1987}
1988
1989static void
1990pexDroppedCb( void * vpex,
1991              void * userData )
1992{
1993    PexDiffs * diffs = userData;
1994    tr_pex *   pex = vpex;
1995
1996    if( diffs->droppedCount < MAX_PEX_DROPPED )
1997    {
1998        diffs->dropped[diffs->droppedCount++] = *pex;
1999    }
2000}
2001
2002static void
2003pexElementCb( void * vpex,
2004              void * userData )
2005{
2006    PexDiffs * diffs = userData;
2007    tr_pex *   pex = vpex;
2008
2009    diffs->elements[diffs->elementCount++] = *pex;
2010}
2011
2012/* TODO: ipv6 pex */
2013static void
2014sendPex( tr_peermsgs * msgs )
2015{
2016    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2017    {
2018        PexDiffs diffs;
2019        tr_pex * newPex = NULL;
2020        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2021                                                 msgs->torrent->info.hash,
2022                                                 &newPex );
2023
2024        /* build the diffs */
2025        diffs.added = tr_new( tr_pex, newCount );
2026        diffs.addedCount = 0;
2027        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2028        diffs.droppedCount = 0;
2029        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2030        diffs.elementCount = 0;
2031        tr_set_compare( msgs->pex, msgs->pexCount,
2032                        newPex, newCount,
2033                        tr_pexCompare, sizeof( tr_pex ),
2034                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2035        dbgmsg(
2036            msgs,
2037            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2038            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
2039
2040        if( !diffs.addedCount && !diffs.droppedCount )
2041        {
2042            tr_free( diffs.elements );
2043        }
2044        else
2045        {
2046            int  i;
2047            tr_benc val;
2048            char * benc;
2049            int bencLen;
2050            uint8_t * tmp, *walk;
2051            struct evbuffer * out = msgs->outMessages;
2052
2053            /* update peer */
2054            tr_free( msgs->pex );
2055            msgs->pex = diffs.elements;
2056            msgs->pexCount = diffs.elementCount;
2057
2058            /* build the pex payload */
2059            tr_bencInitDict( &val, 3 );
2060
2061            /* "added" */
2062            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2063            for( i = 0; i < diffs.addedCount; ++i ) {
2064                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2065                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2066            }
2067            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2068            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2069            tr_free( tmp );
2070
2071            /* "added.f" */
2072            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2073            for( i = 0; i < diffs.addedCount; ++i )
2074                *walk++ = diffs.added[i].flags;
2075            assert( ( walk - tmp ) == diffs.addedCount );
2076            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2077            tr_free( tmp );
2078
2079            /* "dropped" */
2080            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2081            for( i = 0; i < diffs.droppedCount; ++i ) {
2082                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2083                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2084            }
2085            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2086            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2087            tr_free( tmp );
2088
2089            /* write the pex message */
2090            benc = tr_bencSave( &val, &bencLen );
2091            tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + bencLen );
2092            tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2093            tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2094            tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2095            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2096            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2097
2098            tr_free( benc );
2099            tr_bencFree( &val );
2100        }
2101
2102        /* cleanup */
2103        tr_free( diffs.added );
2104        tr_free( diffs.dropped );
2105        tr_free( newPex );
2106
2107        msgs->clientSentPexAt = time( NULL );
2108    }
2109}
2110
2111static int
2112pexPulse( void * vpeer )
2113{
2114    sendPex( vpeer );
2115    return TRUE;
2116}
2117
2118/**
2119***
2120**/
2121
2122tr_peermsgs*
2123tr_peerMsgsNew( struct tr_torrent * torrent,
2124                struct tr_peer *    peer,
2125                tr_delivery_func    func,
2126                void *              userData,
2127                tr_publisher_tag *  setme )
2128{
2129    tr_peermsgs * m;
2130
2131    assert( peer );
2132    assert( peer->io );
2133
2134    m = tr_new0( tr_peermsgs, 1 );
2135    m->publisher = tr_publisherNew( );
2136    m->info = peer;
2137    m->session = torrent->session;
2138    m->torrent = torrent;
2139    m->io = peer->io;
2140    m->info->clientIsChoked = 1;
2141    m->info->peerIsChoked = 1;
2142    m->info->clientIsInterested = 0;
2143    m->info->peerIsInterested = 0;
2144    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2145    m->state = AWAITING_BT_LENGTH;
2146    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2147    m->outMessages = evbuffer_new( );
2148    m->outMessagesBatchedAt = 0;
2149    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2150    m->incoming.block = evbuffer_new( );
2151    m->peerAllowedPieces = NULL;
2152    m->peerAskedFor = REQUEST_LIST_INIT;
2153    m->clientAskedFor = REQUEST_LIST_INIT;
2154    m->clientWillAskFor = REQUEST_LIST_INIT;
2155    peer->msgs = m;
2156
2157    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2158
2159    if( tr_peerIoSupportsLTEP( m->io ) )
2160        sendLtepHandshake( m );
2161
2162    tellPeerWhatWeHave( m );
2163
2164    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
2165    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2166    ratePulse( m );
2167
2168    return m;
2169}
2170
2171void
2172tr_peerMsgsFree( tr_peermsgs* msgs )
2173{
2174    if( msgs )
2175    {
2176        tr_timerFree( &msgs->pexTimer );
2177        tr_publisherFree( &msgs->publisher );
2178        reqListClear( &msgs->clientWillAskFor );
2179        reqListClear( &msgs->clientAskedFor );
2180        reqListClear( &msgs->peerAskedFor );
2181
2182        tr_bitfieldFree( msgs->peerAllowedPieces );
2183        evbuffer_free( msgs->incoming.block );
2184        evbuffer_free( msgs->outMessages );
2185        tr_free( msgs->pex );
2186
2187        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2188        tr_free( msgs );
2189    }
2190}
2191
2192void
2193tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2194                        tr_publisher_tag tag )
2195{
2196    tr_publisherUnsubscribe( peer->publisher, tag );
2197}
2198
Note: See TracBrowser for help on using the repository browser.