source: trunk/libtransmission/peer-msgs.c @ 7353

Last change on this file since 7353 was 7353, checked in by charles, 12 years ago

(trunk libT) fix bug which caused libtransmission to hold onto nonproductive peers for longer than it should've

  • Property svn:keywords set to Date Rev Author Id
File size: 61.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7353 2008-12-11 07:04:46Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57
58    BT_FEXT_SUGGEST         = 13,
59    BT_FEXT_HAVE_ALL        = 14,
60    BT_FEXT_HAVE_NONE       = 15,
61    BT_FEXT_REJECT          = 16,
62    BT_FEXT_ALLOWED_FAST    = 17,
63
64    BT_LTEP                 = 20,
65
66    LTEP_HANDSHAKE          = 0,
67
68    TR_LTEP_PEX             = 1,
69
70
71
72    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
73
74    /* idle seconds before we send a keepalive */
75    KEEPALIVE_INTERVAL_SECS = 100,
76
77    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
78    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
79                                               */
80
81    MAX_QUEUE_SIZE          = ( 100 ),
82
83    /* how long an unsent request can stay queued before it's returned
84       back to the peer-mgr's pool of requests */
85    QUEUED_REQUEST_TTL_SECS = 20,
86
87    /* how long a sent request can stay queued before it's returned
88       back to the peer-mgr's pool of requests */
89    SENT_REQUEST_TTL_SECS = 240,
90
91    /* used in lowering the outMessages queue period */
92    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
93    HIGH_PRIORITY_INTERVAL_SECS = 2,
94    LOW_PRIORITY_INTERVAL_SECS = 20,
95
96    /* number of pieces to remove from the bitfield when
97     * lazy bitfields are turned on */
98    LAZY_PIECE_COUNT = 26,
99
100    /* number of pieces we'll allow in our fast set */
101    MAX_FAST_SET_SIZE = 3
102};
103
104/**
105***  REQUEST MANAGEMENT
106**/
107
108enum
109{
110    AWAITING_BT_LENGTH,
111    AWAITING_BT_ID,
112    AWAITING_BT_MESSAGE,
113    AWAITING_BT_PIECE
114};
115
116struct peer_request
117{
118    uint32_t    index;
119    uint32_t    offset;
120    uint32_t    length;
121    time_t      time_requested;
122};
123
124static int
125compareRequest( const void * va, const void * vb )
126{
127    const struct peer_request * a = va;
128    const struct peer_request * b = vb;
129
130    if( a->index != b->index )
131        return a->index < b->index ? -1 : 1;
132
133    if( a->offset != b->offset )
134        return a->offset < b->offset ? -1 : 1;
135
136    if( a->length != b->length )
137        return a->length < b->length ? -1 : 1;
138
139    return 0;
140}
141
142struct request_list
143{
144    uint16_t               count;
145    uint16_t               max;
146    struct peer_request *  requests;
147};
148
149static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
150
151static void
152reqListReserve( struct request_list * list,
153                uint16_t              max )
154{
155    if( list->max < max )
156    {
157        list->max = max;
158        list->requests = tr_renew( struct peer_request,
159                                   list->requests,
160                                   list->max );
161    }
162}
163
164static void
165reqListClear( struct request_list * list )
166{
167    tr_free( list->requests );
168    *list = REQUEST_LIST_INIT;
169}
170
171static void
172reqListCopy( struct request_list * dest, const struct request_list * src )
173{
174    dest->count = dest->max = src->count;
175    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
176}
177
178static void
179reqListRemoveOne( struct request_list * list,
180                  int                   i )
181{
182    assert( 0 <= i && i < list->count );
183
184    memmove( &list->requests[i],
185             &list->requests[i + 1],
186             sizeof( struct peer_request ) * ( --list->count - i ) );
187}
188
189static void
190reqListAppend( struct request_list *       list,
191               const struct peer_request * req )
192{
193    if( ++list->count >= list->max )
194        reqListReserve( list, list->max + 8 );
195
196    list->requests[list->count - 1] = *req;
197}
198
199static int
200reqListPop( struct request_list * list,
201            struct peer_request * setme )
202{
203    int success;
204
205    if( !list->count )
206        success = FALSE;
207    else {
208        *setme = list->requests[0];
209        reqListRemoveOne( list, 0 );
210        success = TRUE;
211    }
212
213    return success;
214}
215
216static int
217reqListFind( struct request_list *       list,
218             const struct peer_request * key )
219{
220    uint16_t i;
221
222    for( i = 0; i < list->count; ++i )
223        if( !compareRequest( key, list->requests + i ) )
224            return i;
225
226    return -1;
227}
228
229static int
230reqListRemove( struct request_list *       list,
231               const struct peer_request * key )
232{
233    int success;
234    const int i = reqListFind( list, key );
235
236    if( i < 0 )
237        success = FALSE;
238    else {
239        reqListRemoveOne( list, i );
240        success = TRUE;
241    }
242
243    return success;
244}
245
246/**
247***
248**/
249
250/* this is raw, unchanged data from the peer regarding
251 * the current message that it's sending us. */
252struct tr_incoming
253{
254    uint8_t                id;
255    uint32_t               length; /* includes the +1 for id length */
256    struct peer_request    blockReq; /* metadata for incoming blocks */
257    struct evbuffer *      block; /* piece data for incoming blocks */
258};
259
260struct tr_peermsgs
261{
262    tr_bool         peerSentBitfield;
263    tr_bool         peerSupportsPex;
264    tr_bool         clientSentLtepHandshake;
265    tr_bool         peerSentLtepHandshake;
266    tr_bool         haveFastSet;
267
268    uint8_t         state;
269    uint8_t         ut_pex_id;
270    uint16_t        pexCount;
271    uint16_t        minActiveRequests;
272    uint16_t        maxActiveRequests;
273
274    size_t                 fastsetSize;
275    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
276
277    /* how long the outMessages batch should be allowed to grow before
278     * it's flushed -- some messages (like requests >:) should be sent
279     * very quickly; others aren't as urgent. */
280    int                    outMessagesBatchPeriod;
281
282    tr_peer *              info;
283
284    tr_session *           session;
285    tr_torrent *           torrent;
286    tr_peerIo *            io;
287
288    tr_publisher_t *       publisher;
289
290    struct evbuffer *      outMessages; /* all the non-piece messages */
291
292    struct request_list    peerAskedFor;
293    struct request_list    clientAskedFor;
294    struct request_list    clientWillAskFor;
295
296    tr_timer *             pexTimer;
297
298    time_t                 clientSentPexAt;
299    time_t                 clientSentAnythingAt;
300
301    /* when we started batching the outMessages */
302    time_t                outMessagesBatchedAt;
303
304    tr_bitfield *         peerAllowedPieces;
305
306    struct tr_incoming    incoming;
307
308    tr_pex *              pex;
309
310    /* if the peer supports the Extension Protocol in BEP 10 and
311       supplied a reqq argument, it's stored here.  otherwise the
312       value is zero and should be ignored. */
313    int64_t               reqq;
314};
315
316/**
317***
318**/
319
320static void
321myDebug( const char * file, int line,
322         const struct tr_peermsgs * msgs,
323         const char * fmt, ... )
324{
325    FILE * fp = tr_getLog( );
326
327    if( fp )
328    {
329        va_list           args;
330        char              timestr[64];
331        struct evbuffer * buf = evbuffer_new( );
332        char *            base = tr_basename( file );
333
334        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
335                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
336                             msgs->torrent->info.name,
337                             tr_peerIoGetAddrStr( msgs->io ),
338                             msgs->info->client );
339        va_start( args, fmt );
340        evbuffer_add_vprintf( buf, fmt, args );
341        va_end( args );
342        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
343        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
344
345        tr_free( base );
346        evbuffer_free( buf );
347    }
348}
349
350#define dbgmsg( msgs, ... ) \
351    do { \
352        if( tr_deepLoggingIsActive( ) ) \
353            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
354    } while( 0 )
355
356/**
357***
358**/
359
360static void
361pokeBatchPeriod( tr_peermsgs * msgs,
362                 int           interval )
363{
364    if( msgs->outMessagesBatchPeriod > interval )
365    {
366        msgs->outMessagesBatchPeriod = interval;
367        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
368    }
369}
370
371static void
372dbgOutMessageLen( tr_peermsgs * msgs )
373{
374    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
375}
376
377static void
378protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
379{
380    tr_peerIo       * io  = msgs->io;
381    struct evbuffer * out = msgs->outMessages;
382
383    assert( tr_peerIoSupportsFEXT( msgs->io ) );
384
385    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
386    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
387    tr_peerIoWriteUint32( io, out, req->index );
388    tr_peerIoWriteUint32( io, out, req->offset );
389    tr_peerIoWriteUint32( io, out, req->length );
390
391    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
392    dbgOutMessageLen( msgs );
393}
394
395static void
396protocolSendRequest( tr_peermsgs *               msgs,
397                     const struct peer_request * req )
398{
399    tr_peerIo       * io  = msgs->io;
400    struct evbuffer * out = msgs->outMessages;
401
402    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
403    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
404    tr_peerIoWriteUint32( io, out, req->index );
405    tr_peerIoWriteUint32( io, out, req->offset );
406    tr_peerIoWriteUint32( io, out, req->length );
407
408    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
409    dbgOutMessageLen( msgs );
410    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
411}
412
413static void
414protocolSendCancel( tr_peermsgs *               msgs,
415                    const struct peer_request * req )
416{
417    tr_peerIo       * io  = msgs->io;
418    struct evbuffer * out = msgs->outMessages;
419
420    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
421    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
422    tr_peerIoWriteUint32( io, out, req->index );
423    tr_peerIoWriteUint32( io, out, req->offset );
424    tr_peerIoWriteUint32( io, out, req->length );
425
426    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
427    dbgOutMessageLen( msgs );
428    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
429}
430
431static void
432protocolSendHave( tr_peermsgs * msgs,
433                  uint32_t      index )
434{
435    tr_peerIo       * io  = msgs->io;
436    struct evbuffer * out = msgs->outMessages;
437
438    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
439    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
440    tr_peerIoWriteUint32( io, out, index );
441
442    dbgmsg( msgs, "sending Have %u...", index );
443    dbgOutMessageLen( msgs );
444    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
445}
446
447#if 0
448static void
449protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
450{
451    tr_peerIo       * io  = msgs->io;
452    struct evbuffer * out = msgs->outMessages;
453
454    assert( tr_peerIoSupportsFEXT( msgs->io ) );
455
456    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
457    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
458    tr_peerIoWriteUint32( io, out, pieceIndex );
459
460    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
461    dbgOutMessageLen( msgs );
462}
463#endif
464
465static void
466protocolSendChoke( tr_peermsgs * msgs,
467                   int           choke )
468{
469    tr_peerIo       * io  = msgs->io;
470    struct evbuffer * out = msgs->outMessages;
471
472    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
473    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
474
475    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
476    dbgOutMessageLen( msgs );
477    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
478}
479
480static void
481protocolSendHaveAll( tr_peermsgs * msgs )
482{
483    tr_peerIo       * io  = msgs->io;
484    struct evbuffer * out = msgs->outMessages;
485
486    assert( tr_peerIoSupportsFEXT( msgs->io ) );
487
488    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
489    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
490
491    dbgmsg( msgs, "sending HAVE_ALL..." );
492    dbgOutMessageLen( msgs );
493    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
494}
495
496static void
497protocolSendHaveNone( tr_peermsgs * msgs )
498{
499    tr_peerIo       * io  = msgs->io;
500    struct evbuffer * out = msgs->outMessages;
501
502    assert( tr_peerIoSupportsFEXT( msgs->io ) );
503
504    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
505    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
506
507    dbgmsg( msgs, "sending HAVE_NONE..." );
508    dbgOutMessageLen( msgs );
509    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
510}
511
512/**
513***  EVENTS
514**/
515
516static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
517
518static void
519publish( tr_peermsgs * msgs, tr_peer_event * e )
520{
521    assert( msgs->info );
522    assert( msgs->info->msgs == msgs );
523
524    tr_publisherPublish( msgs->publisher, msgs->info, e );
525}
526
527static void
528fireError( tr_peermsgs * msgs, int err )
529{
530    tr_peer_event e = blankEvent;
531    e.eventType = TR_PEER_ERROR;
532    e.err = err;
533    publish( msgs, &e );
534}
535
536static void
537fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
538{
539    tr_peer_event e = blankEvent;
540    e.eventType = TR_PEER_UPLOAD_ONLY;
541    e.uploadOnly = uploadOnly;
542    publish( msgs, &e );
543}
544
545static void
546fireNeedReq( tr_peermsgs * msgs )
547{
548    tr_peer_event e = blankEvent;
549    e.eventType = TR_PEER_NEED_REQ;
550    publish( msgs, &e );
551}
552
553static void
554firePeerProgress( tr_peermsgs * msgs )
555{
556    tr_peer_event e = blankEvent;
557    e.eventType = TR_PEER_PEER_PROGRESS;
558    e.progress = msgs->info->progress;
559    publish( msgs, &e );
560}
561
562static void
563fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
564{
565    tr_peer_event e = blankEvent;
566    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
567    e.pieceIndex = req->index;
568    e.offset = req->offset;
569    e.length = req->length;
570    publish( msgs, &e );
571}
572
573static void
574fireClientGotData( tr_peermsgs * msgs,
575                   uint32_t      length,
576                   int           wasPieceData )
577{
578    tr_peer_event e = blankEvent;
579
580    e.length = length;
581    e.eventType = TR_PEER_CLIENT_GOT_DATA;
582    e.wasPieceData = wasPieceData;
583    publish( msgs, &e );
584}
585
586static void
587fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
588{
589    tr_peer_event e = blankEvent;
590    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
591    e.pieceIndex = pieceIndex;
592    publish( msgs, &e );
593}
594
595static void
596fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
597{
598    tr_peer_event e = blankEvent;
599    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
600    e.pieceIndex = pieceIndex;
601    publish( msgs, &e );
602}
603
604static void
605firePeerGotData( tr_peermsgs  * msgs,
606                 uint32_t       length,
607                 int            wasPieceData )
608{
609    tr_peer_event e = blankEvent;
610
611    e.length = length;
612    e.eventType = TR_PEER_PEER_GOT_DATA;
613    e.wasPieceData = wasPieceData;
614
615    publish( msgs, &e );
616}
617
618static void
619fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
620{
621    tr_peer_event e = blankEvent;
622    e.eventType = TR_PEER_CANCEL;
623    e.pieceIndex = req->index;
624    e.offset = req->offset;
625    e.length = req->length;
626    publish( msgs, &e );
627}
628
629/**
630***  ALLOWED FAST SET
631***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
632**/
633
634size_t
635tr_generateAllowedSet( tr_piece_index_t * setmePieces,
636                       size_t             desiredSetSize,
637                       size_t             pieceCount,
638                       const uint8_t    * infohash,
639                       const tr_address * addr )
640{
641    size_t setSize = 0;
642
643    assert( setmePieces );
644    assert( desiredSetSize <= pieceCount );
645    assert( desiredSetSize );
646    assert( pieceCount );
647    assert( infohash );
648    assert( addr );
649
650    if( addr->type == TR_AF_INET )
651    {
652        uint8_t w[SHA_DIGEST_LENGTH + 4];
653        uint8_t x[SHA_DIGEST_LENGTH];
654
655        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
656        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
657        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
658
659        while( setSize<desiredSetSize )
660        {
661            int i;
662            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
663            {
664                size_t k;
665                uint32_t j = i * 4;                                  /* (5) */
666                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
667                uint32_t index = y % pieceCount;                     /* (7) */
668
669                for( k=0; k<setSize; ++k )                           /* (8) */
670                    if( setmePieces[k] == index )
671                        break;
672
673                if( k == setSize )
674                    setmePieces[setSize++] = index;                  /* (9) */
675            }
676
677            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
678        }
679    }
680
681    return setSize;
682}
683
684static void
685updateFastSet( tr_peermsgs * msgs UNUSED )
686{
687#if 0
688    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
689    const int peerIsNeedy = msgs->info->progress < 0.10;
690
691    if( fext && peerIsNeedy && !msgs->haveFastSet )
692    {
693        size_t i;
694        const struct tr_address * addr = tr_peerIoGetAddress( msgs->io, NULL );
695        const tr_info * inf = &msgs->torrent->info;
696        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
697
698        /* build the fast set */
699        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
700        msgs->haveFastSet = 1;
701
702        /* send it to the peer */
703        for( i=0; i<msgs->fastsetSize; ++i )
704            protocolSendAllowedFast( msgs, msgs->fastset[i] );
705    }
706#endif
707}
708
709/**
710***  INTEREST
711**/
712
713static int
714isPieceInteresting( const tr_peermsgs * peer,
715                    tr_piece_index_t    piece )
716{
717    const tr_torrent * torrent = peer->torrent;
718
719    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
720          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
721          && ( tr_bitfieldHas( peer->info->have, piece ) );    /* peer has it */
722}
723
724/* "interested" means we'll ask for piece data if they unchoke us */
725static int
726isPeerInteresting( const tr_peermsgs * msgs )
727{
728    tr_piece_index_t    i;
729    const tr_torrent *  torrent;
730    const tr_bitfield * bitfield;
731    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
732
733    if( clientIsSeed )
734        return FALSE;
735
736    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
737        return FALSE;
738
739    torrent = msgs->torrent;
740    bitfield = tr_cpPieceBitfield( torrent->completion );
741
742    if( !msgs->info->have )
743        return TRUE;
744
745    assert( bitfield->byteCount == msgs->info->have->byteCount );
746
747    for( i = 0; i < torrent->info.pieceCount; ++i )
748        if( isPieceInteresting( msgs, i ) )
749            return TRUE;
750
751    return FALSE;
752}
753
754static void
755sendInterest( tr_peermsgs * msgs,
756              int           weAreInterested )
757{
758    struct evbuffer * out = msgs->outMessages;
759
760    assert( msgs );
761    assert( weAreInterested == 0 || weAreInterested == 1 );
762
763    msgs->info->clientIsInterested = weAreInterested;
764    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
765    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
766    tr_peerIoWriteUint8 ( msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
767    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
768    dbgOutMessageLen( msgs );
769}
770
771static void
772updateInterest( tr_peermsgs * msgs )
773{
774    const int i = isPeerInteresting( msgs );
775
776    if( i != msgs->info->clientIsInterested )
777        sendInterest( msgs, i );
778    if( i )
779        fireNeedReq( msgs );
780}
781
782static int
783popNextRequest( tr_peermsgs *         msgs,
784                struct peer_request * setme )
785{
786    return reqListPop( &msgs->peerAskedFor, setme );
787}
788
789static void
790cancelAllRequestsToClient( tr_peermsgs * msgs )
791{
792    struct peer_request req;
793    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->io );
794
795    while( popNextRequest( msgs, &req ) )
796        if( mustSendCancel )
797            protocolSendReject( msgs, &req );
798}
799
800void
801tr_peerMsgsSetChoke( tr_peermsgs * msgs,
802                     int           choke )
803{
804    const time_t now = time( NULL );
805    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
806
807    assert( msgs );
808    assert( msgs->info );
809    assert( choke == 0 || choke == 1 );
810
811    if( msgs->info->chokeChangedAt > fibrillationTime )
812    {
813        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
814    }
815    else if( msgs->info->peerIsChoked != choke )
816    {
817        msgs->info->peerIsChoked = choke;
818        if( choke )
819            cancelAllRequestsToClient( msgs );
820        protocolSendChoke( msgs, choke );
821        msgs->info->chokeChangedAt = now;
822    }
823}
824
825/**
826***
827**/
828
829void
830tr_peerMsgsHave( tr_peermsgs * msgs,
831                 uint32_t      index )
832{
833    protocolSendHave( msgs, index );
834
835    /* since we have more pieces now, we might not be interested in this peer */
836    updateInterest( msgs );
837}
838
839/**
840***
841**/
842
843static int
844reqIsValid( const tr_peermsgs * peer,
845            uint32_t            index,
846            uint32_t            offset,
847            uint32_t            length )
848{
849    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
850}
851
852static int
853requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
854{
855    return reqIsValid( msgs, req->index, req->offset, req->length );
856}
857
858static void
859expireOldRequests( tr_peermsgs * msgs, const time_t now  )
860{
861    int i;
862    time_t oldestAllowed;
863    struct request_list tmp = REQUEST_LIST_INIT;
864    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
865
866    /* cancel requests that have been queued for too long */
867    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
868    reqListCopy( &tmp, &msgs->clientWillAskFor );
869    for( i = 0; i < tmp.count; ++i ) {
870        const struct peer_request * req = &tmp.requests[i];
871        if( req->time_requested < oldestAllowed )
872            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
873    }
874    reqListClear( &tmp );
875
876    /* if the peer doesn't support "Reject Request",
877     * cancel requests that were sent too long ago. */
878    if( !fext ) {
879        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
880        reqListCopy( &tmp, &msgs->clientAskedFor );
881        for( i=0; i<tmp.count; ++i ) {
882            const struct peer_request * req = &tmp.requests[i];
883            if( req->time_requested < oldestAllowed )
884                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
885        }
886        reqListClear( &tmp );
887    }
888}
889
890static void
891pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
892{
893    const int           max = msgs->maxActiveRequests;
894    const int           min = msgs->minActiveRequests;
895    int                 sent = 0;
896    int                 count = msgs->clientAskedFor.count;
897    struct peer_request req;
898
899    if( count > min )
900        return;
901    if( msgs->info->clientIsChoked )
902        return;
903    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
904        return;
905
906    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
907    {
908        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
909
910        assert( requestIsValid( msgs, &req ) );
911        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
912
913        /* don't ask for it if we've already got it... this block may have
914         * come in from a different peer after we cancelled a request for it */
915        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
916        {
917            protocolSendRequest( msgs, &req );
918            req.time_requested = now;
919            reqListAppend( &msgs->clientAskedFor, &req );
920
921            ++count;
922            ++sent;
923        }
924    }
925
926    if( sent )
927        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
928                sent,
929                msgs->clientAskedFor.count,
930                msgs->clientWillAskFor.count );
931
932    if( count < max )
933        fireNeedReq( msgs );
934}
935
936static int
937requestQueueIsFull( const tr_peermsgs * msgs )
938{
939    const int req_max = msgs->maxActiveRequests;
940    return msgs->clientWillAskFor.count >= req_max;
941}
942
943tr_addreq_t
944tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
945                       uint32_t         index,
946                       uint32_t         offset,
947                       uint32_t         length,
948                       int              doForce )
949{
950    struct peer_request req;
951
952    assert( msgs );
953    assert( msgs->torrent );
954    assert( reqIsValid( msgs, index, offset, length ) );
955
956    /**
957    ***  Reasons to decline the request
958    **/
959
960    /* don't send requests to choked clients */
961    if( !doForce && msgs->info->clientIsChoked ) {
962        dbgmsg( msgs, "declining request because they're choking us" );
963        return TR_ADDREQ_CLIENT_CHOKED;
964    }
965
966    /* peer doesn't have this piece */
967    if( !doForce && !tr_bitfieldHas( msgs->info->have, index ) )
968        return TR_ADDREQ_MISSING;
969
970    /* peer's queue is full */
971    if( !doForce && requestQueueIsFull( msgs ) ) {
972        dbgmsg( msgs, "declining request because we're full" );
973        return TR_ADDREQ_FULL;
974    }
975
976    /* have we already asked for this piece? */
977    req.index = index;
978    req.offset = offset;
979    req.length = length;
980    if( !doForce ) {
981        if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
982            dbgmsg( msgs, "declining because it's a duplicate" );
983            return TR_ADDREQ_DUPLICATE;
984        }
985        if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
986            dbgmsg( msgs, "declining because it's a duplicate" );
987            return TR_ADDREQ_DUPLICATE;
988        }
989    }
990
991    /**
992    ***  Accept this request
993    **/
994
995    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
996            index, offset, length );
997    req.time_requested = time( NULL );
998    reqListAppend( &msgs->clientWillAskFor, &req );
999    return TR_ADDREQ_OK;
1000}
1001
1002static void
1003cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1004{
1005    int i;
1006    struct request_list a = msgs->clientWillAskFor;
1007    struct request_list b = msgs->clientAskedFor;
1008    dbgmsg( msgs, "cancelling all requests to peer" );
1009
1010    msgs->clientAskedFor = REQUEST_LIST_INIT;
1011    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1012
1013    for( i=0; i<a.count; ++i )
1014        fireCancelledReq( msgs, &a.requests[i] );
1015
1016    for( i = 0; i < b.count; ++i ) {
1017        fireCancelledReq( msgs, &b.requests[i] );
1018        if( sendCancel )
1019            protocolSendCancel( msgs, &b.requests[i] );
1020    }
1021
1022    reqListClear( &a );
1023    reqListClear( &b );
1024}
1025
1026void
1027tr_peerMsgsCancel( tr_peermsgs * msgs,
1028                   uint32_t      pieceIndex,
1029                   uint32_t      offset,
1030                   uint32_t      length )
1031{
1032    struct peer_request req;
1033
1034    assert( msgs != NULL );
1035    assert( length > 0 );
1036
1037
1038    /* have we asked the peer for this piece? */
1039    req.index = pieceIndex;
1040    req.offset = offset;
1041    req.length = length;
1042
1043    /* if it's only in the queue and hasn't been sent yet, free it */
1044    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1045        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1046        fireCancelledReq( msgs, &req );
1047    }
1048
1049    /* if it's already been sent, send a cancel message too */
1050    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1051        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1052        protocolSendCancel( msgs, &req );
1053        fireCancelledReq( msgs, &req );
1054    }
1055}
1056
1057
1058/**
1059***
1060**/
1061
1062static void
1063sendLtepHandshake( tr_peermsgs * msgs )
1064{
1065    tr_benc val, *m;
1066    char * buf;
1067    int len;
1068    int pex;
1069    struct evbuffer * out = msgs->outMessages;
1070
1071    if( msgs->clientSentLtepHandshake )
1072        return;
1073
1074    dbgmsg( msgs, "sending an ltep handshake" );
1075    msgs->clientSentLtepHandshake = 1;
1076
1077    /* decide if we want to advertise pex support */
1078    if( !tr_torrentAllowsPex( msgs->torrent ) )
1079        pex = 0;
1080    else if( msgs->peerSentLtepHandshake )
1081        pex = msgs->peerSupportsPex ? 1 : 0;
1082    else
1083        pex = 1;
1084
1085    tr_bencInitDict( &val, 5 );
1086    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1087    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1088    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1089    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1090    m  = tr_bencDictAddDict( &val, "m", 1 );
1091    if( pex )
1092        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1093    buf = tr_bencSave( &val, &len );
1094
1095    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1096    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1097    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1098    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1099    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1100    dbgOutMessageLen( msgs );
1101
1102    /* cleanup */
1103    tr_bencFree( &val );
1104    tr_free( buf );
1105}
1106
1107static void
1108parseLtepHandshake( tr_peermsgs *     msgs,
1109                    int               len,
1110                    struct evbuffer * inbuf )
1111{
1112    int64_t   i;
1113    tr_benc   val, * sub;
1114    uint8_t * tmp = tr_new( uint8_t, len );
1115
1116    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1117    msgs->peerSentLtepHandshake = 1;
1118
1119    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1120    {
1121        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1122        tr_free( tmp );
1123        return;
1124    }
1125
1126    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1127
1128    /* does the peer prefer encrypted connections? */
1129    if( tr_bencDictFindInt( &val, "e", &i ) )
1130        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1131                                              : ENCRYPTION_PREFERENCE_NO;
1132
1133    /* check supported messages for utorrent pex */
1134    msgs->peerSupportsPex = 0;
1135    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1136        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1137            msgs->ut_pex_id = (uint8_t) i;
1138            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1139            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1140        }
1141    }
1142
1143    /* look for upload_only (BEP 21) */
1144    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1145        fireUploadOnly( msgs, i!=0 );
1146
1147    /* get peer's listening port */
1148    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1149        msgs->info->port = htons( (uint16_t)i );
1150        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1151    }
1152
1153    /* get peer's maximum request queue size */
1154    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1155        msgs->reqq = i;
1156
1157    tr_bencFree( &val );
1158    tr_free( tmp );
1159}
1160
1161static void
1162parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1163{
1164    int loaded = 0;
1165    uint8_t * tmp = tr_new( uint8_t, msglen );
1166    tr_benc val;
1167    const tr_torrent * tor = msgs->torrent;
1168    const uint8_t * added;
1169    size_t added_len;
1170
1171    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1172
1173    if( tr_torrentAllowsPex( tor )
1174      && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
1175      && tr_bencDictFindRaw( &val, "added", &added, &added_len ))
1176    {
1177        const uint8_t * added_f = NULL;
1178        tr_pex *        pex;
1179        size_t          i, n;
1180        size_t          added_f_len = 0;
1181        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1182        pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1183        for( i=0; i<n; ++i )
1184            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1185                              TR_PEER_FROM_PEX, pex + i );
1186        tr_free( pex );
1187    }
1188
1189    if( loaded )
1190        tr_bencFree( &val );
1191    tr_free( tmp );
1192}
1193
1194static void sendPex( tr_peermsgs * msgs );
1195
1196static void
1197parseLtep( tr_peermsgs *     msgs,
1198           int               msglen,
1199           struct evbuffer * inbuf )
1200{
1201    uint8_t ltep_msgid;
1202
1203    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1204    msglen--;
1205
1206    if( ltep_msgid == LTEP_HANDSHAKE )
1207    {
1208        dbgmsg( msgs, "got ltep handshake" );
1209        parseLtepHandshake( msgs, msglen, inbuf );
1210        if( tr_peerIoSupportsLTEP( msgs->io ) )
1211        {
1212            sendLtepHandshake( msgs );
1213            sendPex( msgs );
1214        }
1215    }
1216    else if( ltep_msgid == TR_LTEP_PEX )
1217    {
1218        dbgmsg( msgs, "got ut pex" );
1219        msgs->peerSupportsPex = 1;
1220        parseUtPex( msgs, msglen, inbuf );
1221    }
1222    else
1223    {
1224        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1225        evbuffer_drain( inbuf, msglen );
1226    }
1227}
1228
1229static int
1230readBtLength( tr_peermsgs *     msgs,
1231              struct evbuffer * inbuf,
1232              size_t            inlen )
1233{
1234    uint32_t len;
1235
1236    if( inlen < sizeof( len ) )
1237        return READ_LATER;
1238
1239    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1240
1241    if( len == 0 ) /* peer sent us a keepalive message */
1242        dbgmsg( msgs, "got KeepAlive" );
1243    else
1244    {
1245        msgs->incoming.length = len;
1246        msgs->state = AWAITING_BT_ID;
1247    }
1248
1249    return READ_NOW;
1250}
1251
1252static int readBtMessage( tr_peermsgs *     msgs,
1253                          struct evbuffer * inbuf,
1254                          size_t            inlen );
1255
1256static int
1257readBtId( tr_peermsgs *     msgs,
1258          struct evbuffer * inbuf,
1259          size_t            inlen )
1260{
1261    uint8_t id;
1262
1263    if( inlen < sizeof( uint8_t ) )
1264        return READ_LATER;
1265
1266    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1267    msgs->incoming.id = id;
1268
1269    if( id == BT_PIECE )
1270    {
1271        msgs->state = AWAITING_BT_PIECE;
1272        return READ_NOW;
1273    }
1274    else if( msgs->incoming.length != 1 )
1275    {
1276        msgs->state = AWAITING_BT_MESSAGE;
1277        return READ_NOW;
1278    }
1279    else return readBtMessage( msgs, inbuf, inlen - 1 );
1280}
1281
1282static void
1283updatePeerProgress( tr_peermsgs * msgs )
1284{
1285    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
1286    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1287    updateFastSet( msgs );
1288    updateInterest( msgs );
1289    firePeerProgress( msgs );
1290}
1291
1292static void
1293peerMadeRequest( tr_peermsgs *               msgs,
1294                 const struct peer_request * req )
1295{
1296    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
1297    const int reqIsValid = requestIsValid( msgs, req );
1298    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1299    const int peerIsChoked = msgs->info->peerIsChoked;
1300
1301    int allow = FALSE;
1302
1303    if( !reqIsValid )
1304        dbgmsg( msgs, "rejecting an invalid request." );
1305    else if( !clientHasPiece )
1306        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1307    else if( peerIsChoked )
1308        dbgmsg( msgs, "rejecting request from choked peer" );
1309    else
1310        allow = TRUE;
1311
1312    if( allow )
1313        reqListAppend( &msgs->peerAskedFor, req );
1314    else if( fext )
1315        protocolSendReject( msgs, req );
1316}
1317
1318static int
1319messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1320{
1321    switch( id )
1322    {
1323        case BT_CHOKE:
1324        case BT_UNCHOKE:
1325        case BT_INTERESTED:
1326        case BT_NOT_INTERESTED:
1327        case BT_FEXT_HAVE_ALL:
1328        case BT_FEXT_HAVE_NONE:
1329            return len == 1;
1330
1331        case BT_HAVE:
1332        case BT_FEXT_SUGGEST:
1333        case BT_FEXT_ALLOWED_FAST:
1334            return len == 5;
1335
1336        case BT_BITFIELD:
1337            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1338
1339        case BT_REQUEST:
1340        case BT_CANCEL:
1341        case BT_FEXT_REJECT:
1342            return len == 13;
1343
1344        case BT_PIECE:
1345            return len > 9 && len <= 16393;
1346
1347        case BT_PORT:
1348            return len == 3;
1349
1350        case BT_LTEP:
1351            return len >= 2;
1352
1353        default:
1354            return FALSE;
1355    }
1356}
1357
1358static int clientGotBlock( tr_peermsgs *               msgs,
1359                           const uint8_t *             block,
1360                           const struct peer_request * req );
1361
1362static int
1363readBtPiece( tr_peermsgs      * msgs,
1364             struct evbuffer  * inbuf,
1365             size_t             inlen,
1366             size_t           * setme_piece_bytes_read )
1367{
1368    struct peer_request * req = &msgs->incoming.blockReq;
1369
1370    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1371    dbgmsg( msgs, "In readBtPiece" );
1372
1373    if( !req->length )
1374    {
1375        if( inlen < 8 )
1376            return READ_LATER;
1377
1378        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1379        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1380        req->length = msgs->incoming.length - 9;
1381        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1382        return READ_NOW;
1383    }
1384    else
1385    {
1386        int err;
1387
1388        /* read in another chunk of data */
1389        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1390        size_t n = MIN( nLeft, inlen );
1391        uint8_t * buf = tr_new( uint8_t, n );
1392        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1393        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1394        evbuffer_add( msgs->incoming.block, buf, n );
1395        fireClientGotData( msgs, n, TRUE );
1396        *setme_piece_bytes_read += n;
1397        tr_free( buf );
1398        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1399               n, req->index, req->offset, req->length,
1400               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1401        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1402            return READ_LATER;
1403
1404        /* we've got the whole block ... process it */
1405        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1406
1407        /* cleanup */
1408        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1409        req->length = 0;
1410        msgs->state = AWAITING_BT_LENGTH;
1411        if( !err )
1412            return READ_NOW;
1413        else {
1414            fireError( msgs, err );
1415            return READ_ERR;
1416        }
1417    }
1418}
1419
1420static int
1421readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1422{
1423    uint32_t      ui32;
1424    uint32_t      msglen = msgs->incoming.length;
1425    const uint8_t id = msgs->incoming.id;
1426    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1427    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
1428
1429    --msglen; /* id length */
1430
1431    if( inlen < msglen )
1432        return READ_LATER;
1433
1434    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1435
1436    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1437    {
1438        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1439        fireError( msgs, EMSGSIZE );
1440        return READ_ERR;
1441    }
1442
1443    switch( id )
1444    {
1445        case BT_CHOKE:
1446            dbgmsg( msgs, "got Choke" );
1447            msgs->info->clientIsChoked = 1;
1448            if( !fext )
1449                cancelAllRequestsToPeer( msgs, FALSE );
1450            break;
1451
1452        case BT_UNCHOKE:
1453            dbgmsg( msgs, "got Unchoke" );
1454            msgs->info->clientIsChoked = 0;
1455            fireNeedReq( msgs );
1456            break;
1457
1458        case BT_INTERESTED:
1459            dbgmsg( msgs, "got Interested" );
1460            msgs->info->peerIsInterested = 1;
1461            break;
1462
1463        case BT_NOT_INTERESTED:
1464            dbgmsg( msgs, "got Not Interested" );
1465            msgs->info->peerIsInterested = 0;
1466            break;
1467
1468        case BT_HAVE:
1469            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1470            dbgmsg( msgs, "got Have: %u", ui32 );
1471            if( tr_bitfieldAdd( msgs->info->have, ui32 ) ) {
1472                fireError( msgs, ERANGE );
1473                return READ_ERR;
1474            }
1475            updatePeerProgress( msgs );
1476            tr_rcTransferred( msgs->torrent->swarmSpeed,
1477                              msgs->torrent->info.pieceSize );
1478            break;
1479
1480        case BT_BITFIELD:
1481        {
1482            dbgmsg( msgs, "got a bitfield" );
1483            msgs->peerSentBitfield = 1;
1484            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1485            updatePeerProgress( msgs );
1486            fireNeedReq( msgs );
1487            break;
1488        }
1489
1490        case BT_REQUEST:
1491        {
1492            struct peer_request r;
1493            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1494            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1495            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1496            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1497            peerMadeRequest( msgs, &r );
1498            break;
1499        }
1500
1501        case BT_CANCEL:
1502        {
1503            struct peer_request r;
1504            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1505            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1506            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1507            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1508            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1509                protocolSendReject( msgs, &r );
1510            break;
1511        }
1512
1513        case BT_PIECE:
1514            assert( 0 ); /* handled elsewhere! */
1515            break;
1516
1517        case BT_PORT:
1518            dbgmsg( msgs, "Got a BT_PORT" );
1519            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1520            break;
1521
1522        case BT_FEXT_SUGGEST:
1523            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1524            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1525            if( fext )
1526                fireClientGotSuggest( msgs, ui32 );
1527            else {
1528                fireError( msgs, EMSGSIZE );
1529                return READ_ERR;
1530            }
1531            break;
1532
1533        case BT_FEXT_ALLOWED_FAST:
1534            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1535            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1536            if( fext )
1537                fireClientGotAllowedFast( msgs, ui32 );
1538            else {
1539                fireError( msgs, EMSGSIZE );
1540                return READ_ERR;
1541            }
1542            break;
1543
1544        case BT_FEXT_HAVE_ALL:
1545            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1546            if( fext ) {
1547                tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1548                updatePeerProgress( msgs );
1549            } else {
1550                fireError( msgs, EMSGSIZE );
1551                return READ_ERR;
1552            }
1553            break;
1554
1555
1556        case BT_FEXT_HAVE_NONE:
1557            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1558            if( fext ) {
1559                tr_bitfieldClear( msgs->info->have );
1560                updatePeerProgress( msgs );
1561            } else {
1562                fireError( msgs, EMSGSIZE );
1563                return READ_ERR;
1564            }
1565            break;
1566
1567        case BT_FEXT_REJECT:
1568        {
1569            struct peer_request r;
1570            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1571            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1572            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1573            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1574            if( fext )
1575                reqListRemove( &msgs->clientAskedFor, &r );
1576            else {
1577                fireError( msgs, EMSGSIZE );
1578                return READ_ERR;
1579            }
1580            break;
1581        }
1582
1583        case BT_LTEP:
1584            dbgmsg( msgs, "Got a BT_LTEP" );
1585            parseLtep( msgs, msglen, inbuf );
1586            break;
1587
1588        default:
1589            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1590            tr_peerIoDrain( msgs->io, inbuf, msglen );
1591            break;
1592    }
1593
1594    assert( msglen + 1 == msgs->incoming.length );
1595    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1596
1597    msgs->state = AWAITING_BT_LENGTH;
1598    return READ_NOW;
1599}
1600
1601static void
1602decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1603{
1604    tr_torrent * tor = msgs->torrent;
1605
1606    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1607}
1608
1609static void
1610clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1611{
1612    decrementDownloadedCount( msgs, req->length );
1613}
1614
1615static void
1616addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1617{
1618    if( !msgs->info->blame )
1619         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1620    tr_bitfieldAdd( msgs->info->blame, index );
1621}
1622
1623/* returns 0 on success, or an errno on failure */
1624static int
1625clientGotBlock( tr_peermsgs *               msgs,
1626                const uint8_t *             data,
1627                const struct peer_request * req )
1628{
1629    int err;
1630    tr_torrent * tor = msgs->torrent;
1631    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1632
1633    assert( msgs );
1634    assert( req );
1635
1636    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1637        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1638                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1639        return EMSGSIZE;
1640    }
1641
1642    /* save the block */
1643    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1644
1645    /**
1646    *** Remove the block from our `we asked for this' list
1647    **/
1648
1649    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1650        clientGotUnwantedBlock( msgs, req );
1651        dbgmsg( msgs, "we didn't ask for this message..." );
1652        return 0;
1653    }
1654
1655    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1656            msgs->clientAskedFor.count );
1657
1658    /**
1659    *** Error checks
1660    **/
1661
1662    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1663        dbgmsg( msgs, "we have this block already..." );
1664        clientGotUnwantedBlock( msgs, req );
1665        return 0;
1666    }
1667
1668    /**
1669    ***  Save the block
1670    **/
1671
1672    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1673        return err;
1674
1675    addPeerToBlamefield( msgs, req->index );
1676    fireGotBlock( msgs, req );
1677    return 0;
1678}
1679
1680static int peerPulse( void * vmsgs );
1681
1682static void
1683didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1684{
1685    tr_peermsgs * msgs = vmsgs;
1686    firePeerGotData( msgs, bytesWritten, wasPieceData );
1687    peerPulse( msgs );
1688}
1689
1690static ReadState
1691canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1692{
1693    ReadState         ret;
1694    tr_peermsgs *     msgs = vmsgs;
1695    struct evbuffer * in = tr_iobuf_input( iobuf );
1696    const size_t      inlen = EVBUFFER_LENGTH( in );
1697
1698    if( !inlen )
1699    {
1700        ret = READ_LATER;
1701    }
1702    else if( msgs->state == AWAITING_BT_PIECE )
1703    {
1704        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1705    }
1706    else switch( msgs->state )
1707    {
1708        case AWAITING_BT_LENGTH:
1709            ret = readBtLength ( msgs, in, inlen ); break;
1710
1711        case AWAITING_BT_ID:
1712            ret = readBtId     ( msgs, in, inlen ); break;
1713
1714        case AWAITING_BT_MESSAGE:
1715            ret = readBtMessage( msgs, in, inlen ); break;
1716
1717        default:
1718            assert( 0 );
1719    }
1720
1721    /* log the raw data that was read */
1722    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1723        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1724
1725    return ret;
1726}
1727
1728/**
1729***
1730**/
1731
1732static int
1733ratePulse( void * vmsgs )
1734{
1735    tr_peermsgs * msgs = vmsgs;
1736    const double rateToClient = tr_peerGetPieceSpeed( msgs->info, TR_PEER_TO_CLIENT );
1737    const int estimatedBlocksInNext30Seconds =
1738                  ( rateToClient * 30 * 1024 ) / msgs->torrent->blockSize;
1739    msgs->minActiveRequests = 8;
1740    msgs->maxActiveRequests = msgs->minActiveRequests + estimatedBlocksInNext30Seconds;
1741    if( msgs->reqq > 0 )
1742        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1743    return TRUE;
1744}
1745
1746static size_t
1747fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1748{
1749    size_t bytesWritten = 0;
1750    struct peer_request req;
1751    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1752    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
1753
1754    /**
1755    ***  Protocol messages
1756    **/
1757
1758    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1759    {
1760        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1761        msgs->outMessagesBatchedAt = now;
1762    }
1763    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1764    {
1765        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1766        /* flush the protocol messages */
1767        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len );
1768        tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1769        msgs->clientSentAnythingAt = now;
1770        msgs->outMessagesBatchedAt = 0;
1771        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1772        bytesWritten +=  len;
1773    }
1774
1775    /**
1776    ***  Blocks
1777    **/
1778
1779    if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
1780        && popNextRequest( msgs, &req ) )
1781    {
1782        if( requestIsValid( msgs, &req )
1783            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1784        {
1785            /* send a block */
1786            uint8_t * buf = tr_new( uint8_t, req.length );
1787            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1788            if( err ) {
1789                fireError( msgs, err );
1790                bytesWritten = 0;
1791                msgs = NULL;
1792            } else {
1793                tr_peerIo * io = msgs->io;
1794                struct evbuffer * out = evbuffer_new( );
1795                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1796                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1797                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1798                tr_peerIoWriteUint32( io, out, req.index );
1799                tr_peerIoWriteUint32( io, out, req.offset );
1800                tr_peerIoWriteBytes ( io, out, buf, req.length );
1801                tr_peerIoWriteBuf( io, out, TRUE );
1802                bytesWritten += EVBUFFER_LENGTH( out );
1803                evbuffer_free( out );
1804                msgs->clientSentAnythingAt = now;
1805            }
1806            tr_free( buf );
1807        }
1808        else if( fext ) /* peer needs a reject message */
1809        {
1810            protocolSendReject( msgs, &req );
1811        }
1812    }
1813
1814    /**
1815    ***  Keepalive
1816    **/
1817
1818    if( ( msgs != NULL )
1819        && ( msgs->clientSentAnythingAt != 0 )
1820        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1821    {
1822        dbgmsg( msgs, "sending a keepalive message" );
1823        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1824        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1825    }
1826
1827    return bytesWritten;
1828}
1829
1830static int
1831peerPulse( void * vmsgs )
1832{
1833    tr_peermsgs * msgs = vmsgs;
1834    const time_t  now = time( NULL );
1835
1836    ratePulse( msgs );
1837
1838    pumpRequestQueue( msgs, now );
1839    expireOldRequests( msgs, now );
1840
1841    for( ;; )
1842        if( fillOutputBuffer( msgs, now ) < 1 )
1843            break;
1844
1845    return TRUE; /* loop forever */
1846}
1847
1848void
1849tr_peerMsgsPulse( tr_peermsgs * msgs )
1850{
1851    if( msgs != NULL )
1852        peerPulse( msgs );
1853}
1854
1855static void
1856gotError( struct tr_iobuf  * iobuf UNUSED,
1857          short              what,
1858          void             * vmsgs )
1859{
1860    if( what & EVBUFFER_TIMEOUT )
1861        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1862    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1863        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1864               what, errno, tr_strerror( errno ) );
1865    fireError( vmsgs, ENOTCONN );
1866}
1867
1868static void
1869sendBitfield( tr_peermsgs * msgs )
1870{
1871    struct evbuffer * out = msgs->outMessages;
1872    tr_bitfield *     field;
1873    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1874    size_t            i;
1875    size_t            lazyCount = 0;
1876
1877    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1878
1879    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1880    {
1881        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1882            speed over a truly random sample -- let's limit the pool size to
1883            the first 1000 pieces so large torrents don't bog things down */
1884        size_t poolSize;
1885        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1886        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1887
1888        /* build the pool */
1889        for( i=poolSize=0; i<maxPoolSize; ++i )
1890            if( tr_bitfieldHas( field, i ) )
1891                pool[poolSize++] = i;
1892
1893        /* pull random piece indices from the pool */
1894        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1895        {
1896            const int pos = tr_cryptoWeakRandInt( poolSize );
1897            const tr_piece_index_t piece = pool[pos];
1898            tr_bitfieldRem( field, piece );
1899            lazyPieces[lazyCount++] = piece;
1900            pool[pos] = pool[--poolSize];
1901        }
1902
1903        /* cleanup */
1904        tr_free( pool );
1905    }
1906
1907    tr_peerIoWriteUint32( msgs->io, out,
1908                          sizeof( uint8_t ) + field->byteCount );
1909    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1910    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1911    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1912            EVBUFFER_LENGTH( out ) );
1913    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1914
1915    for( i = 0; i < lazyCount; ++i )
1916        protocolSendHave( msgs, lazyPieces[i] );
1917
1918    tr_bitfieldFree( field );
1919}
1920
1921static void
1922tellPeerWhatWeHave( tr_peermsgs * msgs )
1923{
1924    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
1925
1926    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1927    {
1928        protocolSendHaveAll( msgs );
1929    }
1930    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1931    {
1932        protocolSendHaveNone( msgs );
1933    }
1934    else
1935    {
1936        sendBitfield( msgs );
1937    }
1938}
1939
1940/**
1941***
1942**/
1943
1944/* some peers give us error messages if we send
1945   more than this many peers in a single pex message
1946   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1947#define MAX_PEX_ADDED 50
1948#define MAX_PEX_DROPPED 50
1949
1950typedef struct
1951{
1952    tr_pex *  added;
1953    tr_pex *  dropped;
1954    tr_pex *  elements;
1955    int       addedCount;
1956    int       droppedCount;
1957    int       elementCount;
1958}
1959PexDiffs;
1960
1961static void
1962pexAddedCb( void * vpex,
1963            void * userData )
1964{
1965    PexDiffs * diffs = userData;
1966    tr_pex *   pex = vpex;
1967
1968    if( diffs->addedCount < MAX_PEX_ADDED )
1969    {
1970        diffs->added[diffs->addedCount++] = *pex;
1971        diffs->elements[diffs->elementCount++] = *pex;
1972    }
1973}
1974
1975static void
1976pexDroppedCb( void * vpex,
1977              void * userData )
1978{
1979    PexDiffs * diffs = userData;
1980    tr_pex *   pex = vpex;
1981
1982    if( diffs->droppedCount < MAX_PEX_DROPPED )
1983    {
1984        diffs->dropped[diffs->droppedCount++] = *pex;
1985    }
1986}
1987
1988static void
1989pexElementCb( void * vpex,
1990              void * userData )
1991{
1992    PexDiffs * diffs = userData;
1993    tr_pex *   pex = vpex;
1994
1995    diffs->elements[diffs->elementCount++] = *pex;
1996}
1997
1998/* TODO: ipv6 pex */
1999static void
2000sendPex( tr_peermsgs * msgs )
2001{
2002    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2003    {
2004        PexDiffs diffs;
2005        tr_pex * newPex = NULL;
2006        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2007                                                 msgs->torrent->info.hash,
2008                                                 &newPex );
2009
2010        /* build the diffs */
2011        diffs.added = tr_new( tr_pex, newCount );
2012        diffs.addedCount = 0;
2013        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2014        diffs.droppedCount = 0;
2015        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2016        diffs.elementCount = 0;
2017        tr_set_compare( msgs->pex, msgs->pexCount,
2018                        newPex, newCount,
2019                        tr_pexCompare, sizeof( tr_pex ),
2020                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2021        dbgmsg(
2022            msgs,
2023            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2024            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
2025
2026        if( !diffs.addedCount && !diffs.droppedCount )
2027        {
2028            tr_free( diffs.elements );
2029        }
2030        else
2031        {
2032            int  i;
2033            tr_benc val;
2034            char * benc;
2035            int bencLen;
2036            uint8_t * tmp, *walk;
2037            struct evbuffer * out = msgs->outMessages;
2038
2039            /* update peer */
2040            tr_free( msgs->pex );
2041            msgs->pex = diffs.elements;
2042            msgs->pexCount = diffs.elementCount;
2043
2044            /* build the pex payload */
2045            tr_bencInitDict( &val, 3 );
2046
2047            /* "added" */
2048            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2049            for( i = 0; i < diffs.addedCount; ++i ) {
2050                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2051                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2052            }
2053            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2054            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2055            tr_free( tmp );
2056
2057            /* "added.f" */
2058            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2059            for( i = 0; i < diffs.addedCount; ++i )
2060                *walk++ = diffs.added[i].flags;
2061            assert( ( walk - tmp ) == diffs.addedCount );
2062            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2063            tr_free( tmp );
2064
2065            /* "dropped" */
2066            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2067            for( i = 0; i < diffs.droppedCount; ++i ) {
2068                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2069                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2070            }
2071            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2072            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2073            tr_free( tmp );
2074
2075            /* write the pex message */
2076            benc = tr_bencSave( &val, &bencLen );
2077            tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + bencLen );
2078            tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2079            tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2080            tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2081            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2082            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2083
2084            tr_free( benc );
2085            tr_bencFree( &val );
2086        }
2087
2088        /* cleanup */
2089        tr_free( diffs.added );
2090        tr_free( diffs.dropped );
2091        tr_free( newPex );
2092
2093        msgs->clientSentPexAt = time( NULL );
2094    }
2095}
2096
2097static int
2098pexPulse( void * vpeer )
2099{
2100    sendPex( vpeer );
2101    return TRUE;
2102}
2103
2104/**
2105***
2106**/
2107
2108tr_peermsgs*
2109tr_peerMsgsNew( struct tr_torrent * torrent,
2110                struct tr_peer *    peer,
2111                tr_delivery_func    func,
2112                void *              userData,
2113                tr_publisher_tag *  setme )
2114{
2115    tr_peermsgs * m;
2116
2117    assert( peer );
2118    assert( peer->io );
2119
2120    m = tr_new0( tr_peermsgs, 1 );
2121    m->publisher = tr_publisherNew( );
2122    m->info = peer;
2123    m->session = torrent->session;
2124    m->torrent = torrent;
2125    m->io = peer->io;
2126    m->info->clientIsChoked = 1;
2127    m->info->peerIsChoked = 1;
2128    m->info->clientIsInterested = 0;
2129    m->info->peerIsInterested = 0;
2130    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2131    m->state = AWAITING_BT_LENGTH;
2132    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2133    m->outMessages = evbuffer_new( );
2134    m->outMessagesBatchedAt = 0;
2135    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2136    m->incoming.block = evbuffer_new( );
2137    m->peerAllowedPieces = NULL;
2138    m->peerAskedFor = REQUEST_LIST_INIT;
2139    m->clientAskedFor = REQUEST_LIST_INIT;
2140    m->clientWillAskFor = REQUEST_LIST_INIT;
2141    peer->msgs = m;
2142
2143    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2144
2145    if( tr_peerIoSupportsLTEP( m->io ) )
2146        sendLtepHandshake( m );
2147
2148    tellPeerWhatWeHave( m );
2149
2150    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
2151    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2152    ratePulse( m );
2153
2154    return m;
2155}
2156
2157void
2158tr_peerMsgsFree( tr_peermsgs* msgs )
2159{
2160    if( msgs )
2161    {
2162        tr_timerFree( &msgs->pexTimer );
2163        tr_publisherFree( &msgs->publisher );
2164        reqListClear( &msgs->clientWillAskFor );
2165        reqListClear( &msgs->clientAskedFor );
2166        reqListClear( &msgs->peerAskedFor );
2167
2168        tr_bitfieldFree( msgs->peerAllowedPieces );
2169        evbuffer_free( msgs->incoming.block );
2170        evbuffer_free( msgs->outMessages );
2171        tr_free( msgs->pex );
2172
2173        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2174        tr_free( msgs );
2175    }
2176}
2177
2178void
2179tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2180                        tr_publisher_tag tag )
2181{
2182    tr_publisherUnsubscribe( peer->publisher, tag );
2183}
2184
Note: See TracBrowser for help on using the repository browser.