source: trunk/libtransmission/peer-msgs.c @ 7323

Last change on this file since 7323 was 7323, checked in by charles, 12 years ago

(trunk libT) #1565: give the peer requests a higher send priority. thanks to jusid for testing and sleuthing above & beyond the call of duty

  • Property svn:keywords set to Date Rev Author Id
File size: 61.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7323 2008-12-08 20:36:36Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57
58    BT_FEXT_SUGGEST         = 13,
59    BT_FEXT_HAVE_ALL        = 14,
60    BT_FEXT_HAVE_NONE       = 15,
61    BT_FEXT_REJECT          = 16,
62    BT_FEXT_ALLOWED_FAST    = 17,
63
64    BT_LTEP                 = 20,
65
66    LTEP_HANDSHAKE          = 0,
67
68    TR_LTEP_PEX             = 1,
69
70
71
72    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
73
74    /* idle seconds before we send a keepalive */
75    KEEPALIVE_INTERVAL_SECS = 100,
76
77    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
78    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
79                                               */
80
81    MAX_QUEUE_SIZE          = ( 100 ),
82
83    /* how long an unsent request can stay queued before it's returned
84       back to the peer-mgr's pool of requests */
85    QUEUED_REQUEST_TTL_SECS = 20,
86
87    /* how long a sent request can stay queued before it's returned
88       back to the peer-mgr's pool of requests */
89    SENT_REQUEST_TTL_SECS = 240,
90
91    /* used in lowering the outMessages queue period */
92    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
93    HIGH_PRIORITY_INTERVAL_SECS = 2,
94    LOW_PRIORITY_INTERVAL_SECS = 20,
95
96    /* number of pieces to remove from the bitfield when
97     * lazy bitfields are turned on */
98    LAZY_PIECE_COUNT = 26,
99
100    /* number of pieces we'll allow in our fast set */
101    MAX_FAST_SET_SIZE = 3
102};
103
104/**
105***  REQUEST MANAGEMENT
106**/
107
108enum
109{
110    AWAITING_BT_LENGTH,
111    AWAITING_BT_ID,
112    AWAITING_BT_MESSAGE,
113    AWAITING_BT_PIECE
114};
115
116struct peer_request
117{
118    uint32_t    index;
119    uint32_t    offset;
120    uint32_t    length;
121    time_t      time_requested;
122};
123
124static int
125compareRequest( const void * va, const void * vb )
126{
127    const struct peer_request * a = va;
128    const struct peer_request * b = vb;
129
130    if( a->index != b->index )
131        return a->index < b->index ? -1 : 1;
132
133    if( a->offset != b->offset )
134        return a->offset < b->offset ? -1 : 1;
135
136    if( a->length != b->length )
137        return a->length < b->length ? -1 : 1;
138
139    return 0;
140}
141
142struct request_list
143{
144    uint16_t               count;
145    uint16_t               max;
146    struct peer_request *  requests;
147};
148
149static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
150
151static void
152reqListReserve( struct request_list * list,
153                uint16_t              max )
154{
155    if( list->max < max )
156    {
157        list->max = max;
158        list->requests = tr_renew( struct peer_request,
159                                   list->requests,
160                                   list->max );
161    }
162}
163
164static void
165reqListClear( struct request_list * list )
166{
167    tr_free( list->requests );
168    *list = REQUEST_LIST_INIT;
169}
170
171static void
172reqListCopy( struct request_list * dest, const struct request_list * src )
173{
174    dest->count = dest->max = src->count;
175    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
176}
177
178static void
179reqListRemoveOne( struct request_list * list,
180                  int                   i )
181{
182    assert( 0 <= i && i < list->count );
183
184    memmove( &list->requests[i],
185             &list->requests[i + 1],
186             sizeof( struct peer_request ) * ( --list->count - i ) );
187}
188
189static void
190reqListAppend( struct request_list *       list,
191               const struct peer_request * req )
192{
193    if( ++list->count >= list->max )
194        reqListReserve( list, list->max + 8 );
195
196    list->requests[list->count - 1] = *req;
197}
198
199static int
200reqListPop( struct request_list * list,
201            struct peer_request * setme )
202{
203    int success;
204
205    if( !list->count )
206        success = FALSE;
207    else {
208        *setme = list->requests[0];
209        reqListRemoveOne( list, 0 );
210        success = TRUE;
211    }
212
213    return success;
214}
215
216static int
217reqListFind( struct request_list *       list,
218             const struct peer_request * key )
219{
220    uint16_t i;
221
222    for( i = 0; i < list->count; ++i )
223        if( !compareRequest( key, list->requests + i ) )
224            return i;
225
226    return -1;
227}
228
229static int
230reqListRemove( struct request_list *       list,
231               const struct peer_request * key )
232{
233    int success;
234    const int i = reqListFind( list, key );
235
236    if( i < 0 )
237        success = FALSE;
238    else {
239        reqListRemoveOne( list, i );
240        success = TRUE;
241    }
242
243    return success;
244}
245
246/**
247***
248**/
249
250/* this is raw, unchanged data from the peer regarding
251 * the current message that it's sending us. */
252struct tr_incoming
253{
254    uint8_t                id;
255    uint32_t               length; /* includes the +1 for id length */
256    struct peer_request    blockReq; /* metadata for incoming blocks */
257    struct evbuffer *      block; /* piece data for incoming blocks */
258};
259
260struct tr_peermsgs
261{
262    tr_bool         peerSentBitfield;
263    tr_bool         peerSupportsPex;
264    tr_bool         clientSentLtepHandshake;
265    tr_bool         peerSentLtepHandshake;
266    tr_bool         haveFastSet;
267
268    uint8_t         state;
269    uint8_t         ut_pex_id;
270    uint16_t        pexCount;
271    uint16_t        minActiveRequests;
272    uint16_t        maxActiveRequests;
273
274    size_t                 fastsetSize;
275    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
276
277    /* how long the outMessages batch should be allowed to grow before
278     * it's flushed -- some messages (like requests >:) should be sent
279     * very quickly; others aren't as urgent. */
280    int                    outMessagesBatchPeriod;
281
282    tr_peer *              info;
283
284    tr_session *           session;
285    tr_torrent *           torrent;
286    tr_peerIo *            io;
287
288    tr_publisher_t *       publisher;
289
290    struct evbuffer *      outMessages; /* all the non-piece messages */
291
292    struct request_list    peerAskedFor;
293    struct request_list    clientAskedFor;
294    struct request_list    clientWillAskFor;
295
296    tr_timer *             pexTimer;
297
298    time_t                 clientSentPexAt;
299    time_t                 clientSentAnythingAt;
300
301    /* when we started batching the outMessages */
302    time_t                outMessagesBatchedAt;
303
304    tr_bitfield *         peerAllowedPieces;
305
306    struct tr_incoming    incoming;
307
308    tr_pex *              pex;
309
310    /* if the peer supports the Extension Protocol in BEP 10 and
311       supplied a reqq argument, it's stored here.  otherwise the
312       value is zero and should be ignored. */
313    int64_t               reqq;
314};
315
316/**
317***
318**/
319
320static void
321myDebug( const char * file, int line,
322         const struct tr_peermsgs * msgs,
323         const char * fmt, ... )
324{
325    FILE * fp = tr_getLog( );
326
327    if( fp )
328    {
329        va_list           args;
330        char              timestr[64];
331        struct evbuffer * buf = evbuffer_new( );
332        char *            base = tr_basename( file );
333
334        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
335                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
336                             msgs->torrent->info.name,
337                             tr_peerIoGetAddrStr( msgs->io ),
338                             msgs->info->client );
339        va_start( args, fmt );
340        evbuffer_add_vprintf( buf, fmt, args );
341        va_end( args );
342        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
343        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
344
345        tr_free( base );
346        evbuffer_free( buf );
347    }
348}
349
350#define dbgmsg( msgs, ... ) \
351    do { \
352        if( tr_deepLoggingIsActive( ) ) \
353            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
354    } while( 0 )
355
356/**
357***
358**/
359
360static void
361pokeBatchPeriod( tr_peermsgs * msgs,
362                 int           interval )
363{
364    if( msgs->outMessagesBatchPeriod > interval )
365    {
366        msgs->outMessagesBatchPeriod = interval;
367        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
368    }
369}
370
371static void
372dbgOutMessageLen( tr_peermsgs * msgs )
373{
374    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
375}
376
377static void
378protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
379{
380    tr_peerIo       * io  = msgs->io;
381    struct evbuffer * out = msgs->outMessages;
382
383    assert( tr_peerIoSupportsFEXT( msgs->io ) );
384
385    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
386    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
387    tr_peerIoWriteUint32( io, out, req->index );
388    tr_peerIoWriteUint32( io, out, req->offset );
389    tr_peerIoWriteUint32( io, out, req->length );
390
391    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
392    dbgOutMessageLen( msgs );
393}
394
395static void
396protocolSendRequest( tr_peermsgs *               msgs,
397                     const struct peer_request * req )
398{
399    tr_peerIo       * io  = msgs->io;
400    struct evbuffer * out = msgs->outMessages;
401
402    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
403    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
404    tr_peerIoWriteUint32( io, out, req->index );
405    tr_peerIoWriteUint32( io, out, req->offset );
406    tr_peerIoWriteUint32( io, out, req->length );
407
408    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
409    dbgOutMessageLen( msgs );
410    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
411}
412
413static void
414protocolSendCancel( tr_peermsgs *               msgs,
415                    const struct peer_request * req )
416{
417    tr_peerIo       * io  = msgs->io;
418    struct evbuffer * out = msgs->outMessages;
419
420    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
421    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
422    tr_peerIoWriteUint32( io, out, req->index );
423    tr_peerIoWriteUint32( io, out, req->offset );
424    tr_peerIoWriteUint32( io, out, req->length );
425
426    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
427    dbgOutMessageLen( msgs );
428    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
429}
430
431static void
432protocolSendHave( tr_peermsgs * msgs,
433                  uint32_t      index )
434{
435    tr_peerIo       * io  = msgs->io;
436    struct evbuffer * out = msgs->outMessages;
437
438    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
439    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
440    tr_peerIoWriteUint32( io, out, index );
441
442    dbgmsg( msgs, "sending Have %u...", index );
443    dbgOutMessageLen( msgs );
444    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
445}
446
447#if 0
448static void
449protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
450{
451    tr_peerIo       * io  = msgs->io;
452    struct evbuffer * out = msgs->outMessages;
453
454    assert( tr_peerIoSupportsFEXT( msgs->io ) );
455
456    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
457    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
458    tr_peerIoWriteUint32( io, out, pieceIndex );
459
460    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
461    dbgOutMessageLen( msgs );
462}
463#endif
464
465static void
466protocolSendChoke( tr_peermsgs * msgs,
467                   int           choke )
468{
469    tr_peerIo       * io  = msgs->io;
470    struct evbuffer * out = msgs->outMessages;
471
472    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
473    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
474
475    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
476    dbgOutMessageLen( msgs );
477    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
478}
479
480static void
481protocolSendHaveAll( tr_peermsgs * msgs )
482{
483    tr_peerIo       * io  = msgs->io;
484    struct evbuffer * out = msgs->outMessages;
485
486    assert( tr_peerIoSupportsFEXT( msgs->io ) );
487
488    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
489    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
490
491    dbgmsg( msgs, "sending HAVE_ALL..." );
492    dbgOutMessageLen( msgs );
493    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
494}
495
496static void
497protocolSendHaveNone( tr_peermsgs * msgs )
498{
499    tr_peerIo       * io  = msgs->io;
500    struct evbuffer * out = msgs->outMessages;
501
502    assert( tr_peerIoSupportsFEXT( msgs->io ) );
503
504    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
505    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
506
507    dbgmsg( msgs, "sending HAVE_NONE..." );
508    dbgOutMessageLen( msgs );
509    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
510}
511
512/**
513***  EVENTS
514**/
515
516static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
517
518static void
519publish( tr_peermsgs * msgs, tr_peer_event * e )
520{
521    assert( msgs->info );
522    assert( msgs->info->msgs == msgs );
523
524    tr_publisherPublish( msgs->publisher, msgs->info, e );
525}
526
527static void
528fireError( tr_peermsgs * msgs, int err )
529{
530    tr_peer_event e = blankEvent;
531    e.eventType = TR_PEER_ERROR;
532    e.err = err;
533    publish( msgs, &e );
534}
535
536static void
537fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
538{
539    tr_peer_event e = blankEvent;
540    e.eventType = TR_PEER_UPLOAD_ONLY;
541    e.uploadOnly = uploadOnly;
542    publish( msgs, &e );
543}
544
545static void
546fireNeedReq( tr_peermsgs * msgs )
547{
548    tr_peer_event e = blankEvent;
549    e.eventType = TR_PEER_NEED_REQ;
550    publish( msgs, &e );
551}
552
553static void
554firePeerProgress( tr_peermsgs * msgs )
555{
556    tr_peer_event e = blankEvent;
557    e.eventType = TR_PEER_PEER_PROGRESS;
558    e.progress = msgs->info->progress;
559    publish( msgs, &e );
560}
561
562static void
563fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
564{
565    tr_peer_event e = blankEvent;
566    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
567    e.pieceIndex = req->index;
568    e.offset = req->offset;
569    e.length = req->length;
570    publish( msgs, &e );
571}
572
573static void
574fireClientGotData( tr_peermsgs * msgs,
575                   uint32_t      length,
576                   int           wasPieceData )
577{
578    tr_peer_event e = blankEvent;
579
580    e.length = length;
581    e.eventType = TR_PEER_CLIENT_GOT_DATA;
582    e.wasPieceData = wasPieceData;
583    publish( msgs, &e );
584}
585
586static void
587fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
588{
589    tr_peer_event e = blankEvent;
590    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
591    e.pieceIndex = pieceIndex;
592    publish( msgs, &e );
593}
594
595static void
596fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
597{
598    tr_peer_event e = blankEvent;
599    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
600    e.pieceIndex = pieceIndex;
601    publish( msgs, &e );
602}
603
604static void
605firePeerGotData( tr_peermsgs  * msgs,
606                 uint32_t       length,
607                 int            wasPieceData )
608{
609    tr_peer_event e = blankEvent;
610
611    e.length = length;
612    e.eventType = TR_PEER_PEER_GOT_DATA;
613    e.wasPieceData = wasPieceData;
614
615    publish( msgs, &e );
616}
617
618static void
619fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
620{
621    tr_peer_event e = blankEvent;
622    e.eventType = TR_PEER_CANCEL;
623    e.pieceIndex = req->index;
624    e.offset = req->offset;
625    e.length = req->length;
626    publish( msgs, &e );
627}
628
629/**
630***  ALLOWED FAST SET
631***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
632**/
633
634size_t
635tr_generateAllowedSet( tr_piece_index_t * setmePieces,
636                       size_t             desiredSetSize,
637                       size_t             pieceCount,
638                       const uint8_t    * infohash,
639                       const tr_address * addr )
640{
641    size_t setSize = 0;
642
643    assert( setmePieces );
644    assert( desiredSetSize <= pieceCount );
645    assert( desiredSetSize );
646    assert( pieceCount );
647    assert( infohash );
648    assert( addr );
649
650    if( addr->type == TR_AF_INET )
651    {
652        uint8_t w[SHA_DIGEST_LENGTH + 4];
653        uint8_t x[SHA_DIGEST_LENGTH];
654
655        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
656        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
657        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
658
659        while( setSize<desiredSetSize )
660        {
661            int i;
662            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
663            {
664                size_t k;
665                uint32_t j = i * 4;                                  /* (5) */
666                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
667                uint32_t index = y % pieceCount;                     /* (7) */
668
669                for( k=0; k<setSize; ++k )                           /* (8) */
670                    if( setmePieces[k] == index )
671                        break;
672
673                if( k == setSize )
674                    setmePieces[setSize++] = index;                  /* (9) */
675            }
676
677            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
678        }
679    }
680
681    return setSize;
682}
683
684static void
685updateFastSet( tr_peermsgs * msgs UNUSED )
686{
687#if 0
688    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
689    const int peerIsNeedy = msgs->info->progress < 0.10;
690
691    if( fext && peerIsNeedy && !msgs->haveFastSet )
692    {
693        size_t i;
694        const struct tr_address * addr = tr_peerIoGetAddress( msgs->io, NULL );
695        const tr_info * inf = &msgs->torrent->info;
696        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
697
698        /* build the fast set */
699        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
700        msgs->haveFastSet = 1;
701
702        /* send it to the peer */
703        for( i=0; i<msgs->fastsetSize; ++i )
704            protocolSendAllowedFast( msgs, msgs->fastset[i] );
705    }
706#endif
707}
708
709/**
710***  INTEREST
711**/
712
713static int
714isPieceInteresting( const tr_peermsgs * peer,
715                    tr_piece_index_t    piece )
716{
717    const tr_torrent * torrent = peer->torrent;
718
719    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
720          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
721          && ( tr_bitfieldHas( peer->info->have, piece ) );    /* peer has it */
722}
723
724/* "interested" means we'll ask for piece data if they unchoke us */
725static int
726isPeerInteresting( const tr_peermsgs * msgs )
727{
728    tr_piece_index_t    i;
729    const tr_torrent *  torrent;
730    const tr_bitfield * bitfield;
731    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
732
733    if( clientIsSeed )
734        return FALSE;
735
736    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
737        return FALSE;
738
739    torrent = msgs->torrent;
740    bitfield = tr_cpPieceBitfield( torrent->completion );
741
742    if( !msgs->info->have )
743        return TRUE;
744
745    assert( bitfield->byteCount == msgs->info->have->byteCount );
746
747    for( i = 0; i < torrent->info.pieceCount; ++i )
748        if( isPieceInteresting( msgs, i ) )
749            return TRUE;
750
751    return FALSE;
752}
753
754static void
755sendInterest( tr_peermsgs * msgs,
756              int           weAreInterested )
757{
758    struct evbuffer * out = msgs->outMessages;
759
760    assert( msgs );
761    assert( weAreInterested == 0 || weAreInterested == 1 );
762
763    msgs->info->clientIsInterested = weAreInterested;
764    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
765    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
766    tr_peerIoWriteUint8 ( msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
767    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
768    dbgOutMessageLen( msgs );
769}
770
771static void
772updateInterest( tr_peermsgs * msgs )
773{
774    const int i = isPeerInteresting( msgs );
775
776    if( i != msgs->info->clientIsInterested )
777        sendInterest( msgs, i );
778    if( i )
779        fireNeedReq( msgs );
780}
781
782static int
783popNextRequest( tr_peermsgs *         msgs,
784                struct peer_request * setme )
785{
786    return reqListPop( &msgs->peerAskedFor, setme );
787}
788
789static void
790cancelAllRequestsToClient( tr_peermsgs * msgs )
791{
792    struct peer_request req;
793    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->io );
794
795    while( popNextRequest( msgs, &req ) )
796        if( mustSendCancel )
797            protocolSendReject( msgs, &req );
798}
799
800void
801tr_peerMsgsSetChoke( tr_peermsgs * msgs,
802                     int           choke )
803{
804    const time_t now = time( NULL );
805    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
806
807    assert( msgs );
808    assert( msgs->info );
809    assert( choke == 0 || choke == 1 );
810
811    if( msgs->info->chokeChangedAt > fibrillationTime )
812    {
813        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
814    }
815    else if( msgs->info->peerIsChoked != choke )
816    {
817        msgs->info->peerIsChoked = choke;
818        if( choke )
819            cancelAllRequestsToClient( msgs );
820        protocolSendChoke( msgs, choke );
821        msgs->info->chokeChangedAt = now;
822    }
823}
824
825/**
826***
827**/
828
829void
830tr_peerMsgsHave( tr_peermsgs * msgs,
831                 uint32_t      index )
832{
833    protocolSendHave( msgs, index );
834
835    /* since we have more pieces now, we might not be interested in this peer */
836    updateInterest( msgs );
837}
838
839/**
840***
841**/
842
843static int
844reqIsValid( const tr_peermsgs * peer,
845            uint32_t            index,
846            uint32_t            offset,
847            uint32_t            length )
848{
849    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
850}
851
852static int
853requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
854{
855    return reqIsValid( msgs, req->index, req->offset, req->length );
856}
857
858static void
859expireOldRequests( tr_peermsgs * msgs, const time_t now  )
860{
861    int i;
862    time_t oldestAllowed;
863    struct request_list tmp = REQUEST_LIST_INIT;
864    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
865
866    /* cancel requests that have been queued for too long */
867    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
868    reqListCopy( &tmp, &msgs->clientWillAskFor );
869    for( i = 0; i < tmp.count; ++i ) {
870        const struct peer_request * req = &tmp.requests[i];
871        if( req->time_requested < oldestAllowed )
872            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
873    }
874    reqListClear( &tmp );
875
876    /* if the peer doesn't support "Reject Request",
877     * cancel requests that were sent too long ago. */
878    if( !fext ) {
879        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
880        reqListCopy( &tmp, &msgs->clientAskedFor );
881        for( i=0; i<tmp.count; ++i ) {
882            const struct peer_request * req = &tmp.requests[i];
883            if( req->time_requested < oldestAllowed )
884                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
885        }
886        reqListClear( &tmp );
887    }
888}
889
890static void
891pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
892{
893    const int           max = msgs->maxActiveRequests;
894    const int           min = msgs->minActiveRequests;
895    int                 sent = 0;
896    int                 count = msgs->clientAskedFor.count;
897    struct peer_request req;
898
899    if( count > min )
900        return;
901    if( msgs->info->clientIsChoked )
902        return;
903    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
904        return;
905
906    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
907    {
908        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
909
910        assert( requestIsValid( msgs, &req ) );
911        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
912
913        /* don't ask for it if we've already got it... this block may have
914         * come in from a different peer after we cancelled a request for it */
915        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
916        {
917            protocolSendRequest( msgs, &req );
918            req.time_requested = now;
919            reqListAppend( &msgs->clientAskedFor, &req );
920
921            ++count;
922            ++sent;
923        }
924    }
925
926    if( sent )
927        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
928                sent,
929                msgs->clientAskedFor.count,
930                msgs->clientWillAskFor.count );
931
932    if( count < max )
933        fireNeedReq( msgs );
934}
935
936static int
937requestQueueIsFull( const tr_peermsgs * msgs )
938{
939    const int req_max = msgs->maxActiveRequests;
940    return msgs->clientWillAskFor.count >= req_max;
941}
942
943tr_addreq_t
944tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
945                       uint32_t         index,
946                       uint32_t         offset,
947                       uint32_t         length,
948                       int              doForce )
949{
950    struct peer_request req;
951
952    assert( msgs );
953    assert( msgs->torrent );
954    assert( reqIsValid( msgs, index, offset, length ) );
955
956    /**
957    ***  Reasons to decline the request
958    **/
959
960    /* don't send requests to choked clients */
961    if( !doForce && msgs->info->clientIsChoked ) {
962        dbgmsg( msgs, "declining request because they're choking us" );
963        return TR_ADDREQ_CLIENT_CHOKED;
964    }
965
966    /* peer doesn't have this piece */
967    if( !doForce && !tr_bitfieldHas( msgs->info->have, index ) )
968        return TR_ADDREQ_MISSING;
969
970    /* peer's queue is full */
971    if( !doForce && requestQueueIsFull( msgs ) ) {
972        dbgmsg( msgs, "declining request because we're full" );
973        return TR_ADDREQ_FULL;
974    }
975
976    /* have we already asked for this piece? */
977    req.index = index;
978    req.offset = offset;
979    req.length = length;
980    if( !doForce ) {
981        if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
982            dbgmsg( msgs, "declining because it's a duplicate" );
983            return TR_ADDREQ_DUPLICATE;
984        }
985        if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
986            dbgmsg( msgs, "declining because it's a duplicate" );
987            return TR_ADDREQ_DUPLICATE;
988        }
989    }
990
991    /**
992    ***  Accept this request
993    **/
994
995    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
996            index, offset, length );
997    req.time_requested = time( NULL );
998    reqListAppend( &msgs->clientWillAskFor, &req );
999    return TR_ADDREQ_OK;
1000}
1001
1002static void
1003cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1004{
1005    int i;
1006    struct request_list a = msgs->clientWillAskFor;
1007    struct request_list b = msgs->clientAskedFor;
1008    dbgmsg( msgs, "cancelling all requests to peer" );
1009
1010    msgs->clientAskedFor = REQUEST_LIST_INIT;
1011    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1012
1013    for( i=0; i<a.count; ++i )
1014        fireCancelledReq( msgs, &a.requests[i] );
1015
1016    for( i = 0; i < b.count; ++i ) {
1017        fireCancelledReq( msgs, &b.requests[i] );
1018        if( sendCancel )
1019            protocolSendCancel( msgs, &b.requests[i] );
1020    }
1021
1022    reqListClear( &a );
1023    reqListClear( &b );
1024}
1025
1026void
1027tr_peerMsgsCancel( tr_peermsgs * msgs,
1028                   uint32_t      pieceIndex,
1029                   uint32_t      offset,
1030                   uint32_t      length )
1031{
1032    struct peer_request req;
1033
1034    assert( msgs != NULL );
1035    assert( length > 0 );
1036
1037
1038    /* have we asked the peer for this piece? */
1039    req.index = pieceIndex;
1040    req.offset = offset;
1041    req.length = length;
1042
1043    /* if it's only in the queue and hasn't been sent yet, free it */
1044    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1045        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1046        fireCancelledReq( msgs, &req );
1047    }
1048
1049    /* if it's already been sent, send a cancel message too */
1050    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1051        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1052        protocolSendCancel( msgs, &req );
1053        fireCancelledReq( msgs, &req );
1054    }
1055}
1056
1057
1058/**
1059***
1060**/
1061
1062static void
1063sendLtepHandshake( tr_peermsgs * msgs )
1064{
1065    tr_benc val, *m;
1066    char * buf;
1067    int len;
1068    int pex;
1069    struct evbuffer * out = msgs->outMessages;
1070
1071    if( msgs->clientSentLtepHandshake )
1072        return;
1073
1074    dbgmsg( msgs, "sending an ltep handshake" );
1075    msgs->clientSentLtepHandshake = 1;
1076
1077    /* decide if we want to advertise pex support */
1078    if( !tr_torrentAllowsPex( msgs->torrent ) )
1079        pex = 0;
1080    else if( msgs->peerSentLtepHandshake )
1081        pex = msgs->peerSupportsPex ? 1 : 0;
1082    else
1083        pex = 1;
1084
1085    tr_bencInitDict( &val, 5 );
1086    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1087    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1088    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1089    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1090    m  = tr_bencDictAddDict( &val, "m", 1 );
1091    if( pex )
1092        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1093    buf = tr_bencSave( &val, &len );
1094
1095    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1096    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1097    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1098    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1099    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1100    dbgOutMessageLen( msgs );
1101
1102    /* cleanup */
1103    tr_bencFree( &val );
1104    tr_free( buf );
1105}
1106
1107static void
1108parseLtepHandshake( tr_peermsgs *     msgs,
1109                    int               len,
1110                    struct evbuffer * inbuf )
1111{
1112    int64_t   i;
1113    tr_benc   val, * sub;
1114    uint8_t * tmp = tr_new( uint8_t, len );
1115
1116    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1117    msgs->peerSentLtepHandshake = 1;
1118
1119    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1120    {
1121        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1122        tr_free( tmp );
1123        return;
1124    }
1125
1126    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1127
1128    /* does the peer prefer encrypted connections? */
1129    if( tr_bencDictFindInt( &val, "e", &i ) )
1130        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1131                                              : ENCRYPTION_PREFERENCE_NO;
1132
1133    /* check supported messages for utorrent pex */
1134    msgs->peerSupportsPex = 0;
1135    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1136        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1137            msgs->ut_pex_id = (uint8_t) i;
1138            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1139            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1140        }
1141    }
1142
1143    /* look for upload_only (BEP 21) */
1144    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1145        fireUploadOnly( msgs, i!=0 );
1146
1147    /* get peer's listening port */
1148    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1149        msgs->info->port = htons( (uint16_t)i );
1150        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1151    }
1152
1153    /* get peer's maximum request queue size */
1154    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1155        msgs->reqq = i;
1156
1157    tr_bencFree( &val );
1158    tr_free( tmp );
1159}
1160
1161static void
1162parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1163{
1164    int loaded = 0;
1165    uint8_t * tmp = tr_new( uint8_t, msglen );
1166    tr_benc val;
1167    const tr_torrent * tor = msgs->torrent;
1168    const uint8_t * added;
1169    size_t added_len;
1170
1171    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1172
1173    if( tr_torrentAllowsPex( tor )
1174      && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
1175      && tr_bencDictFindRaw( &val, "added", &added, &added_len ))
1176    {
1177        const uint8_t * added_f = NULL;
1178        tr_pex *        pex;
1179        size_t          i, n;
1180        size_t          added_f_len = 0;
1181        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1182        pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1183        for( i=0; i<n; ++i )
1184            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1185                              TR_PEER_FROM_PEX, pex + i );
1186        tr_free( pex );
1187    }
1188
1189    if( loaded )
1190        tr_bencFree( &val );
1191    tr_free( tmp );
1192}
1193
1194static void sendPex( tr_peermsgs * msgs );
1195
1196static void
1197parseLtep( tr_peermsgs *     msgs,
1198           int               msglen,
1199           struct evbuffer * inbuf )
1200{
1201    uint8_t ltep_msgid;
1202
1203    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1204    msglen--;
1205
1206    if( ltep_msgid == LTEP_HANDSHAKE )
1207    {
1208        dbgmsg( msgs, "got ltep handshake" );
1209        parseLtepHandshake( msgs, msglen, inbuf );
1210        if( tr_peerIoSupportsLTEP( msgs->io ) )
1211        {
1212            sendLtepHandshake( msgs );
1213            sendPex( msgs );
1214        }
1215    }
1216    else if( ltep_msgid == TR_LTEP_PEX )
1217    {
1218        dbgmsg( msgs, "got ut pex" );
1219        msgs->peerSupportsPex = 1;
1220        parseUtPex( msgs, msglen, inbuf );
1221    }
1222    else
1223    {
1224        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1225        evbuffer_drain( inbuf, msglen );
1226    }
1227}
1228
1229static int
1230readBtLength( tr_peermsgs *     msgs,
1231              struct evbuffer * inbuf,
1232              size_t            inlen )
1233{
1234    uint32_t len;
1235
1236    if( inlen < sizeof( len ) )
1237        return READ_LATER;
1238
1239    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1240
1241    if( len == 0 ) /* peer sent us a keepalive message */
1242        dbgmsg( msgs, "got KeepAlive" );
1243    else
1244    {
1245        msgs->incoming.length = len;
1246        msgs->state = AWAITING_BT_ID;
1247    }
1248
1249    return READ_NOW;
1250}
1251
1252static int readBtMessage( tr_peermsgs *     msgs,
1253                          struct evbuffer * inbuf,
1254                          size_t            inlen );
1255
1256static int
1257readBtId( tr_peermsgs *     msgs,
1258          struct evbuffer * inbuf,
1259          size_t            inlen )
1260{
1261    uint8_t id;
1262
1263    if( inlen < sizeof( uint8_t ) )
1264        return READ_LATER;
1265
1266    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1267    msgs->incoming.id = id;
1268
1269    if( id == BT_PIECE )
1270    {
1271        msgs->state = AWAITING_BT_PIECE;
1272        return READ_NOW;
1273    }
1274    else if( msgs->incoming.length != 1 )
1275    {
1276        msgs->state = AWAITING_BT_MESSAGE;
1277        return READ_NOW;
1278    }
1279    else return readBtMessage( msgs, inbuf, inlen - 1 );
1280}
1281
1282static void
1283updatePeerProgress( tr_peermsgs * msgs )
1284{
1285    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
1286    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1287    updateFastSet( msgs );
1288    updateInterest( msgs );
1289    firePeerProgress( msgs );
1290}
1291
1292static void
1293peerMadeRequest( tr_peermsgs *               msgs,
1294                 const struct peer_request * req )
1295{
1296    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
1297    const int reqIsValid = requestIsValid( msgs, req );
1298    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1299    const int peerIsChoked = msgs->info->peerIsChoked;
1300
1301    int allow = FALSE;
1302
1303    if( !reqIsValid )
1304        dbgmsg( msgs, "rejecting an invalid request." );
1305    else if( !clientHasPiece )
1306        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1307    else if( peerIsChoked )
1308        dbgmsg( msgs, "rejecting request from choked peer" );
1309    else
1310        allow = TRUE;
1311
1312    if( allow )
1313        reqListAppend( &msgs->peerAskedFor, req );
1314    else if( fext )
1315        protocolSendReject( msgs, req );
1316}
1317
1318static int
1319messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1320{
1321    switch( id )
1322    {
1323        case BT_CHOKE:
1324        case BT_UNCHOKE:
1325        case BT_INTERESTED:
1326        case BT_NOT_INTERESTED:
1327        case BT_FEXT_HAVE_ALL:
1328        case BT_FEXT_HAVE_NONE:
1329            return len == 1;
1330
1331        case BT_HAVE:
1332        case BT_FEXT_SUGGEST:
1333        case BT_FEXT_ALLOWED_FAST:
1334            return len == 5;
1335
1336        case BT_BITFIELD:
1337            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1338
1339        case BT_REQUEST:
1340        case BT_CANCEL:
1341        case BT_FEXT_REJECT:
1342            return len == 13;
1343
1344        case BT_PIECE:
1345            return len > 9 && len <= 16393;
1346
1347        case BT_PORT:
1348            return len == 3;
1349
1350        case BT_LTEP:
1351            return len >= 2;
1352
1353        default:
1354            return FALSE;
1355    }
1356}
1357
1358static int clientGotBlock( tr_peermsgs *               msgs,
1359                           const uint8_t *             block,
1360                           const struct peer_request * req );
1361
1362static int
1363readBtPiece( tr_peermsgs      * msgs,
1364             struct evbuffer  * inbuf,
1365             size_t             inlen,
1366             size_t           * setme_piece_bytes_read )
1367{
1368    struct peer_request * req = &msgs->incoming.blockReq;
1369
1370    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1371    dbgmsg( msgs, "In readBtPiece" );
1372
1373    if( !req->length )
1374    {
1375        if( inlen < 8 )
1376            return READ_LATER;
1377
1378        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1379        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1380        req->length = msgs->incoming.length - 9;
1381        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1382        return READ_NOW;
1383    }
1384    else
1385    {
1386        int err;
1387
1388        /* read in another chunk of data */
1389        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1390        size_t n = MIN( nLeft, inlen );
1391        uint8_t * buf = tr_new( uint8_t, n );
1392        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1393        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1394        evbuffer_add( msgs->incoming.block, buf, n );
1395        fireClientGotData( msgs, n, TRUE );
1396        *setme_piece_bytes_read += n;
1397        tr_free( buf );
1398        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1399               n, req->index, req->offset, req->length,
1400               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1401        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1402            return READ_LATER;
1403
1404        /* we've got the whole block ... process it */
1405        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1406
1407        /* cleanup */
1408        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1409        req->length = 0;
1410        msgs->state = AWAITING_BT_LENGTH;
1411        if( !err )
1412            return READ_NOW;
1413        else {
1414            fireError( msgs, err );
1415            return READ_ERR;
1416        }
1417    }
1418}
1419
1420static int
1421readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1422{
1423    uint32_t      ui32;
1424    uint32_t      msglen = msgs->incoming.length;
1425    const uint8_t id = msgs->incoming.id;
1426    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1427    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
1428
1429    --msglen; /* id length */
1430
1431    if( inlen < msglen )
1432        return READ_LATER;
1433
1434    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1435
1436    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1437    {
1438        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1439        fireError( msgs, EMSGSIZE );
1440        return READ_ERR;
1441    }
1442
1443    switch( id )
1444    {
1445        case BT_CHOKE:
1446            dbgmsg( msgs, "got Choke" );
1447            msgs->info->clientIsChoked = 1;
1448            if( !fext )
1449                cancelAllRequestsToPeer( msgs, FALSE );
1450            break;
1451
1452        case BT_UNCHOKE:
1453            dbgmsg( msgs, "got Unchoke" );
1454            msgs->info->clientIsChoked = 0;
1455            fireNeedReq( msgs );
1456            break;
1457
1458        case BT_INTERESTED:
1459            dbgmsg( msgs, "got Interested" );
1460            msgs->info->peerIsInterested = 1;
1461            break;
1462
1463        case BT_NOT_INTERESTED:
1464            dbgmsg( msgs, "got Not Interested" );
1465            msgs->info->peerIsInterested = 0;
1466            break;
1467
1468        case BT_HAVE:
1469            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1470            dbgmsg( msgs, "got Have: %u", ui32 );
1471            if( tr_bitfieldAdd( msgs->info->have, ui32 ) ) {
1472                fireError( msgs, ERANGE );
1473                return READ_ERR;
1474            }
1475            updatePeerProgress( msgs );
1476            tr_rcTransferred( msgs->torrent->swarmSpeed,
1477                              msgs->torrent->info.pieceSize );
1478            break;
1479
1480        case BT_BITFIELD:
1481        {
1482            dbgmsg( msgs, "got a bitfield" );
1483            msgs->peerSentBitfield = 1;
1484            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1485            updatePeerProgress( msgs );
1486            fireNeedReq( msgs );
1487            break;
1488        }
1489
1490        case BT_REQUEST:
1491        {
1492            struct peer_request r;
1493            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1494            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1495            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1496            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1497            peerMadeRequest( msgs, &r );
1498            break;
1499        }
1500
1501        case BT_CANCEL:
1502        {
1503            struct peer_request r;
1504            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1505            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1506            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1507            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1508            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1509                protocolSendReject( msgs, &r );
1510            break;
1511        }
1512
1513        case BT_PIECE:
1514            assert( 0 ); /* handled elsewhere! */
1515            break;
1516
1517        case BT_PORT:
1518            dbgmsg( msgs, "Got a BT_PORT" );
1519            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1520            break;
1521
1522        case BT_FEXT_SUGGEST:
1523            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1524            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1525            if( fext )
1526                fireClientGotSuggest( msgs, ui32 );
1527            else {
1528                fireError( msgs, EMSGSIZE );
1529                return READ_ERR;
1530            }
1531            break;
1532
1533        case BT_FEXT_ALLOWED_FAST:
1534            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1535            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1536            if( fext )
1537                fireClientGotAllowedFast( msgs, ui32 );
1538            else {
1539                fireError( msgs, EMSGSIZE );
1540                return READ_ERR;
1541            }
1542            break;
1543
1544        case BT_FEXT_HAVE_ALL:
1545            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1546            if( fext ) {
1547                tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1548                updatePeerProgress( msgs );
1549            } else {
1550                fireError( msgs, EMSGSIZE );
1551                return READ_ERR;
1552            }
1553            break;
1554
1555
1556        case BT_FEXT_HAVE_NONE:
1557            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1558            if( fext ) {
1559                tr_bitfieldClear( msgs->info->have );
1560                updatePeerProgress( msgs );
1561            } else {
1562                fireError( msgs, EMSGSIZE );
1563                return READ_ERR;
1564            }
1565            break;
1566
1567        case BT_FEXT_REJECT:
1568        {
1569            struct peer_request r;
1570            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1571            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1572            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1573            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1574            if( fext )
1575                reqListRemove( &msgs->clientAskedFor, &r );
1576            else {
1577                fireError( msgs, EMSGSIZE );
1578                return READ_ERR;
1579            }
1580            break;
1581        }
1582
1583        case BT_LTEP:
1584            dbgmsg( msgs, "Got a BT_LTEP" );
1585            parseLtep( msgs, msglen, inbuf );
1586            break;
1587
1588        default:
1589            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1590            tr_peerIoDrain( msgs->io, inbuf, msglen );
1591            break;
1592    }
1593
1594    assert( msglen + 1 == msgs->incoming.length );
1595    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1596
1597    msgs->state = AWAITING_BT_LENGTH;
1598    return READ_NOW;
1599}
1600
1601static void
1602decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1603{
1604    tr_torrent * tor = msgs->torrent;
1605
1606    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1607}
1608
1609static void
1610clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1611{
1612    decrementDownloadedCount( msgs, req->length );
1613}
1614
1615static void
1616addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1617{
1618    if( !msgs->info->blame )
1619         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1620    tr_bitfieldAdd( msgs->info->blame, index );
1621}
1622
1623/* returns 0 on success, or an errno on failure */
1624static int
1625clientGotBlock( tr_peermsgs *               msgs,
1626                const uint8_t *             data,
1627                const struct peer_request * req )
1628{
1629    int err;
1630    tr_torrent * tor = msgs->torrent;
1631    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1632
1633    assert( msgs );
1634    assert( req );
1635
1636    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1637        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1638                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1639        return EMSGSIZE;
1640    }
1641
1642    /* save the block */
1643    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1644
1645    /**
1646    *** Remove the block from our `we asked for this' list
1647    **/
1648
1649    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1650        clientGotUnwantedBlock( msgs, req );
1651        dbgmsg( msgs, "we didn't ask for this message..." );
1652        return 0;
1653    }
1654
1655    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1656            msgs->clientAskedFor.count );
1657
1658    /**
1659    *** Error checks
1660    **/
1661
1662    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1663        dbgmsg( msgs, "we have this block already..." );
1664        clientGotUnwantedBlock( msgs, req );
1665        return 0;
1666    }
1667
1668    /**
1669    ***  Save the block
1670    **/
1671
1672    msgs->info->peerSentPieceDataAt = time( NULL );
1673    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1674        return err;
1675
1676    addPeerToBlamefield( msgs, req->index );
1677    fireGotBlock( msgs, req );
1678    return 0;
1679}
1680
1681static int peerPulse( void * vmsgs );
1682
1683static void
1684didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1685{
1686    tr_peermsgs * msgs = vmsgs;
1687    firePeerGotData( msgs, bytesWritten, wasPieceData );
1688    peerPulse( msgs );
1689}
1690
1691static ReadState
1692canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1693{
1694    ReadState         ret;
1695    tr_peermsgs *     msgs = vmsgs;
1696    struct evbuffer * in = tr_iobuf_input( iobuf );
1697    const size_t      inlen = EVBUFFER_LENGTH( in );
1698
1699    if( !inlen )
1700    {
1701        ret = READ_LATER;
1702    }
1703    else if( msgs->state == AWAITING_BT_PIECE )
1704    {
1705        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1706    }
1707    else switch( msgs->state )
1708    {
1709        case AWAITING_BT_LENGTH:
1710            ret = readBtLength ( msgs, in, inlen ); break;
1711
1712        case AWAITING_BT_ID:
1713            ret = readBtId     ( msgs, in, inlen ); break;
1714
1715        case AWAITING_BT_MESSAGE:
1716            ret = readBtMessage( msgs, in, inlen ); break;
1717
1718        default:
1719            assert( 0 );
1720    }
1721
1722    /* log the raw data that was read */
1723    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1724        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1725
1726    return ret;
1727}
1728
1729/**
1730***
1731**/
1732
1733static int
1734ratePulse( void * vmsgs )
1735{
1736    tr_peermsgs * msgs = vmsgs;
1737    const double rateToClient = tr_peerGetPieceSpeed( msgs->info, TR_PEER_TO_CLIENT );
1738    const int estimatedBlocksInNext30Seconds =
1739                  ( rateToClient * 30 * 1024 ) / msgs->torrent->blockSize;
1740    msgs->minActiveRequests = 8;
1741    msgs->maxActiveRequests = msgs->minActiveRequests + estimatedBlocksInNext30Seconds;
1742    if( msgs->reqq > 0 )
1743        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1744    return TRUE;
1745}
1746
1747static size_t
1748fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1749{
1750    size_t bytesWritten = 0;
1751    struct peer_request req;
1752    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1753    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
1754
1755    /**
1756    ***  Protocol messages
1757    **/
1758
1759    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1760    {
1761        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1762        msgs->outMessagesBatchedAt = now;
1763    }
1764    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1765    {
1766        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1767        /* flush the protocol messages */
1768        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len );
1769        tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1770        msgs->clientSentAnythingAt = now;
1771        msgs->outMessagesBatchedAt = 0;
1772        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1773        bytesWritten +=  len;
1774    }
1775
1776    /**
1777    ***  Blocks
1778    **/
1779
1780    if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
1781        && popNextRequest( msgs, &req ) )
1782    {
1783        if( requestIsValid( msgs, &req )
1784            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1785        {
1786            /* send a block */
1787            uint8_t * buf = tr_new( uint8_t, req.length );
1788            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1789            if( err ) {
1790                fireError( msgs, err );
1791                bytesWritten = 0;
1792                msgs = NULL;
1793            } else {
1794                tr_peerIo * io = msgs->io;
1795                struct evbuffer * out = evbuffer_new( );
1796                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1797                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1798                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1799                tr_peerIoWriteUint32( io, out, req.index );
1800                tr_peerIoWriteUint32( io, out, req.offset );
1801                tr_peerIoWriteBytes ( io, out, buf, req.length );
1802                tr_peerIoWriteBuf( io, out, TRUE );
1803                bytesWritten += EVBUFFER_LENGTH( out );
1804                evbuffer_free( out );
1805                msgs->clientSentAnythingAt = now;
1806            }
1807            tr_free( buf );
1808        }
1809        else if( fext ) /* peer needs a reject message */
1810        {
1811            protocolSendReject( msgs, &req );
1812        }
1813    }
1814
1815    /**
1816    ***  Keepalive
1817    **/
1818
1819    if( ( msgs != NULL )
1820        && ( msgs->clientSentAnythingAt != 0 )
1821        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1822    {
1823        dbgmsg( msgs, "sending a keepalive message" );
1824        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1825        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1826    }
1827
1828    return bytesWritten;
1829}
1830
1831static int
1832peerPulse( void * vmsgs )
1833{
1834    tr_peermsgs * msgs = vmsgs;
1835    const time_t  now = time( NULL );
1836
1837    ratePulse( msgs );
1838
1839    pumpRequestQueue( msgs, now );
1840    expireOldRequests( msgs, now );
1841
1842    for( ;; )
1843        if( fillOutputBuffer( msgs, now ) < 1 )
1844            break;
1845
1846    return TRUE; /* loop forever */
1847}
1848
1849void
1850tr_peerMsgsPulse( tr_peermsgs * msgs )
1851{
1852    if( msgs != NULL )
1853        peerPulse( msgs );
1854}
1855
1856static void
1857gotError( struct tr_iobuf  * iobuf UNUSED,
1858          short              what,
1859          void             * vmsgs )
1860{
1861    if( what & EVBUFFER_TIMEOUT )
1862        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1863    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1864        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1865               what, errno, tr_strerror( errno ) );
1866    fireError( vmsgs, ENOTCONN );
1867}
1868
1869static void
1870sendBitfield( tr_peermsgs * msgs )
1871{
1872    struct evbuffer * out = msgs->outMessages;
1873    tr_bitfield *     field;
1874    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1875    size_t            i;
1876    size_t            lazyCount = 0;
1877
1878    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1879
1880    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1881    {
1882        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1883            speed over a truly random sample -- let's limit the pool size to
1884            the first 1000 pieces so large torrents don't bog things down */
1885        size_t poolSize;
1886        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1887        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1888
1889        /* build the pool */
1890        for( i=poolSize=0; i<maxPoolSize; ++i )
1891            if( tr_bitfieldHas( field, i ) )
1892                pool[poolSize++] = i;
1893
1894        /* pull random piece indices from the pool */
1895        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1896        {
1897            const int pos = tr_cryptoWeakRandInt( poolSize );
1898            const tr_piece_index_t piece = pool[pos];
1899            tr_bitfieldRem( field, piece );
1900            lazyPieces[lazyCount++] = piece;
1901            pool[pos] = pool[--poolSize];
1902        }
1903
1904        /* cleanup */
1905        tr_free( pool );
1906    }
1907
1908    tr_peerIoWriteUint32( msgs->io, out,
1909                          sizeof( uint8_t ) + field->byteCount );
1910    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1911    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1912    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1913            EVBUFFER_LENGTH( out ) );
1914    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1915
1916    for( i = 0; i < lazyCount; ++i )
1917        protocolSendHave( msgs, lazyPieces[i] );
1918
1919    tr_bitfieldFree( field );
1920}
1921
1922static void
1923tellPeerWhatWeHave( tr_peermsgs * msgs )
1924{
1925    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->io );
1926
1927    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1928    {
1929        protocolSendHaveAll( msgs );
1930    }
1931    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1932    {
1933        protocolSendHaveNone( msgs );
1934    }
1935    else
1936    {
1937        sendBitfield( msgs );
1938    }
1939}
1940
1941/**
1942***
1943**/
1944
1945/* some peers give us error messages if we send
1946   more than this many peers in a single pex message
1947   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1948#define MAX_PEX_ADDED 50
1949#define MAX_PEX_DROPPED 50
1950
1951typedef struct
1952{
1953    tr_pex *  added;
1954    tr_pex *  dropped;
1955    tr_pex *  elements;
1956    int       addedCount;
1957    int       droppedCount;
1958    int       elementCount;
1959}
1960PexDiffs;
1961
1962static void
1963pexAddedCb( void * vpex,
1964            void * userData )
1965{
1966    PexDiffs * diffs = userData;
1967    tr_pex *   pex = vpex;
1968
1969    if( diffs->addedCount < MAX_PEX_ADDED )
1970    {
1971        diffs->added[diffs->addedCount++] = *pex;
1972        diffs->elements[diffs->elementCount++] = *pex;
1973    }
1974}
1975
1976static void
1977pexDroppedCb( void * vpex,
1978              void * userData )
1979{
1980    PexDiffs * diffs = userData;
1981    tr_pex *   pex = vpex;
1982
1983    if( diffs->droppedCount < MAX_PEX_DROPPED )
1984    {
1985        diffs->dropped[diffs->droppedCount++] = *pex;
1986    }
1987}
1988
1989static void
1990pexElementCb( void * vpex,
1991              void * userData )
1992{
1993    PexDiffs * diffs = userData;
1994    tr_pex *   pex = vpex;
1995
1996    diffs->elements[diffs->elementCount++] = *pex;
1997}
1998
1999/* TODO: ipv6 pex */
2000static void
2001sendPex( tr_peermsgs * msgs )
2002{
2003    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2004    {
2005        PexDiffs diffs;
2006        tr_pex * newPex = NULL;
2007        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2008                                                 msgs->torrent->info.hash,
2009                                                 &newPex );
2010
2011        /* build the diffs */
2012        diffs.added = tr_new( tr_pex, newCount );
2013        diffs.addedCount = 0;
2014        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2015        diffs.droppedCount = 0;
2016        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2017        diffs.elementCount = 0;
2018        tr_set_compare( msgs->pex, msgs->pexCount,
2019                        newPex, newCount,
2020                        tr_pexCompare, sizeof( tr_pex ),
2021                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2022        dbgmsg(
2023            msgs,
2024            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2025            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
2026
2027        if( !diffs.addedCount && !diffs.droppedCount )
2028        {
2029            tr_free( diffs.elements );
2030        }
2031        else
2032        {
2033            int  i;
2034            tr_benc val;
2035            char * benc;
2036            int bencLen;
2037            uint8_t * tmp, *walk;
2038            struct evbuffer * out = msgs->outMessages;
2039
2040            /* update peer */
2041            tr_free( msgs->pex );
2042            msgs->pex = diffs.elements;
2043            msgs->pexCount = diffs.elementCount;
2044
2045            /* build the pex payload */
2046            tr_bencInitDict( &val, 3 );
2047
2048            /* "added" */
2049            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2050            for( i = 0; i < diffs.addedCount; ++i ) {
2051                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2052                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2053            }
2054            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2055            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2056            tr_free( tmp );
2057
2058            /* "added.f" */
2059            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2060            for( i = 0; i < diffs.addedCount; ++i )
2061                *walk++ = diffs.added[i].flags;
2062            assert( ( walk - tmp ) == diffs.addedCount );
2063            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2064            tr_free( tmp );
2065
2066            /* "dropped" */
2067            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2068            for( i = 0; i < diffs.droppedCount; ++i ) {
2069                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2070                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2071            }
2072            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2073            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2074            tr_free( tmp );
2075
2076            /* write the pex message */
2077            benc = tr_bencSave( &val, &bencLen );
2078            tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + bencLen );
2079            tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2080            tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2081            tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2082            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2083            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2084
2085            tr_free( benc );
2086            tr_bencFree( &val );
2087        }
2088
2089        /* cleanup */
2090        tr_free( diffs.added );
2091        tr_free( diffs.dropped );
2092        tr_free( newPex );
2093
2094        msgs->clientSentPexAt = time( NULL );
2095    }
2096}
2097
2098static int
2099pexPulse( void * vpeer )
2100{
2101    sendPex( vpeer );
2102    return TRUE;
2103}
2104
2105/**
2106***
2107**/
2108
2109tr_peermsgs*
2110tr_peerMsgsNew( struct tr_torrent * torrent,
2111                struct tr_peer *    peer,
2112                tr_delivery_func    func,
2113                void *              userData,
2114                tr_publisher_tag *  setme )
2115{
2116    tr_peermsgs * m;
2117
2118    assert( peer );
2119    assert( peer->io );
2120
2121    m = tr_new0( tr_peermsgs, 1 );
2122    m->publisher = tr_publisherNew( );
2123    m->info = peer;
2124    m->session = torrent->session;
2125    m->torrent = torrent;
2126    m->io = peer->io;
2127    m->info->clientIsChoked = 1;
2128    m->info->peerIsChoked = 1;
2129    m->info->clientIsInterested = 0;
2130    m->info->peerIsInterested = 0;
2131    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2132    m->state = AWAITING_BT_LENGTH;
2133    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2134    m->outMessages = evbuffer_new( );
2135    m->outMessagesBatchedAt = 0;
2136    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2137    m->incoming.block = evbuffer_new( );
2138    m->peerAllowedPieces = NULL;
2139    m->peerAskedFor = REQUEST_LIST_INIT;
2140    m->clientAskedFor = REQUEST_LIST_INIT;
2141    m->clientWillAskFor = REQUEST_LIST_INIT;
2142    peer->msgs = m;
2143
2144    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2145
2146    if( tr_peerIoSupportsLTEP( m->io ) )
2147        sendLtepHandshake( m );
2148
2149    tellPeerWhatWeHave( m );
2150
2151    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
2152    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2153    ratePulse( m );
2154
2155    return m;
2156}
2157
2158void
2159tr_peerMsgsFree( tr_peermsgs* msgs )
2160{
2161    if( msgs )
2162    {
2163        tr_timerFree( &msgs->pexTimer );
2164        tr_publisherFree( &msgs->publisher );
2165        reqListClear( &msgs->clientWillAskFor );
2166        reqListClear( &msgs->clientAskedFor );
2167        reqListClear( &msgs->peerAskedFor );
2168
2169        tr_bitfieldFree( msgs->peerAllowedPieces );
2170        evbuffer_free( msgs->incoming.block );
2171        evbuffer_free( msgs->outMessages );
2172        tr_free( msgs->pex );
2173
2174        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2175        tr_free( msgs );
2176    }
2177}
2178
2179void
2180tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2181                        tr_publisher_tag tag )
2182{
2183    tr_publisherUnsubscribe( peer->publisher, tag );
2184}
2185
Note: See TracBrowser for help on using the repository browser.