source: trunk/libtransmission/peer-msgs.c @ 7234

Last change on this file since 7234 was 7234, checked in by charles, 12 years ago

(libT) #1549: support fast exensions' "reject" and "have all/none" messages

  • Property svn:keywords set to Date Rev Author Id
File size: 61.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7234 2008-12-02 17:10:54Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57
58    BT_FEXT_SUGGEST         = 13,
59    BT_FEXT_HAVE_ALL        = 14,
60    BT_FEXT_HAVE_NONE       = 15,
61    BT_FEXT_REJECT          = 16,
62    BT_FEXT_ALLOWED_FAST    = 17,
63
64    BT_LTEP                 = 20,
65
66    LTEP_HANDSHAKE          = 0,
67
68    TR_LTEP_PEX             = 1,
69
70
71
72    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
73
74    /* idle seconds before we send a keepalive */
75    KEEPALIVE_INTERVAL_SECS = 100,
76
77    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
78    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
79                                               */
80
81    MAX_QUEUE_SIZE          = ( 100 ),
82
83    /* how long an unsent request can stay queued before it's returned
84       back to the peer-mgr's pool of requests */
85    QUEUED_REQUEST_TTL_SECS = 20,
86
87    /* how long a sent request can stay queued before it's returned
88       back to the peer-mgr's pool of requests */
89    SENT_REQUEST_TTL_SECS = 240,
90
91    /* used in lowering the outMessages queue period */
92    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
93    HIGH_PRIORITY_INTERVAL_SECS = 2,
94    LOW_PRIORITY_INTERVAL_SECS = 20,
95
96    /* number of pieces to remove from the bitfield when
97     * lazy bitfields are turned on */
98    LAZY_PIECE_COUNT = 26,
99
100    /* number of pieces we'll allow in our fast set */
101    MAX_FAST_SET_SIZE = 3
102};
103
104/**
105***  REQUEST MANAGEMENT
106**/
107
108enum
109{
110    AWAITING_BT_LENGTH,
111    AWAITING_BT_ID,
112    AWAITING_BT_MESSAGE,
113    AWAITING_BT_PIECE
114};
115
116struct peer_request
117{
118    uint32_t    index;
119    uint32_t    offset;
120    uint32_t    length;
121    time_t      time_requested;
122};
123
124static int
125compareRequest( const void * va,
126                const void * vb )
127{
128    const struct peer_request * a = va;
129    const struct peer_request * b = vb;
130
131    if( a->index != b->index )
132        return a->index < b->index ? -1 : 1;
133
134    if( a->offset != b->offset )
135        return a->offset < b->offset ? -1 : 1;
136
137    if( a->length != b->length )
138        return a->length < b->length ? -1 : 1;
139
140    return 0;
141}
142
143struct request_list
144{
145    uint16_t               count;
146    uint16_t               max;
147    struct peer_request *  requests;
148};
149
150static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
151
152static void
153reqListReserve( struct request_list * list,
154                uint16_t              max )
155{
156    if( list->max < max )
157    {
158        list->max = max;
159        list->requests = tr_renew( struct peer_request,
160                                   list->requests,
161                                   list->max );
162    }
163}
164
165static void
166reqListClear( struct request_list * list )
167{
168    tr_free( list->requests );
169    *list = REQUEST_LIST_INIT;
170}
171
172static void
173reqListCopy( struct request_list *       dest,
174             const struct request_list * src )
175{
176    dest->count = dest->max = src->count;
177    dest->requests =
178        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
179}
180
181static void
182reqListRemoveOne( struct request_list * list,
183                  int                   i )
184{
185    assert( 0 <= i && i < list->count );
186
187    memmove( &list->requests[i],
188            &list->requests[i + 1],
189            sizeof( struct peer_request ) * ( --list->count - i ) );
190}
191
192static void
193reqListAppend( struct request_list *       list,
194               const struct peer_request * req )
195{
196    if( ++list->count >= list->max )
197        reqListReserve( list, list->max + 8 );
198
199    list->requests[list->count - 1] = *req;
200}
201
202static int
203reqListPop( struct request_list * list,
204            struct peer_request * setme )
205{
206    int success;
207
208    if( !list->count )
209        success = FALSE;
210    else {
211        *setme = list->requests[0];
212        reqListRemoveOne( list, 0 );
213        success = TRUE;
214    }
215
216    return success;
217}
218
219static int
220reqListFind( struct request_list *       list,
221             const struct peer_request * key )
222{
223    uint16_t i;
224
225    for( i = 0; i < list->count; ++i )
226        if( !compareRequest( key, list->requests + i ) )
227            return i;
228
229    return -1;
230}
231
232static int
233reqListRemove( struct request_list *       list,
234               const struct peer_request * key )
235{
236    int success;
237    const int i = reqListFind( list, key );
238
239    if( i < 0 )
240        success = FALSE;
241    else {
242        reqListRemoveOne( list, i );
243        success = TRUE;
244    }
245
246    return success;
247}
248
249/**
250***
251**/
252
253/* this is raw, unchanged data from the peer regarding
254 * the current message that it's sending us. */
255struct tr_incoming
256{
257    uint8_t                id;
258    uint32_t               length; /* includes the +1 for id length */
259    struct peer_request    blockReq; /* metadata for incoming blocks */
260    struct evbuffer *      block; /* piece data for incoming blocks */
261};
262
263struct tr_peermsgs
264{
265    tr_bool         peerSentBitfield;
266    tr_bool         peerSupportsPex;
267    tr_bool         clientSentLtepHandshake;
268    tr_bool         peerSentLtepHandshake;
269    tr_bool         haveFastSet;
270
271    uint8_t         state;
272    uint8_t         ut_pex_id;
273    uint16_t        pexCount;
274    uint16_t        minActiveRequests;
275    uint16_t        maxActiveRequests;
276
277    size_t                 fastsetSize;
278    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
279
280    /* how long the outMessages batch should be allowed to grow before
281     * it's flushed -- some messages (like requests >:) should be sent
282     * very quickly; others aren't as urgent. */
283    int                    outMessagesBatchPeriod;
284
285    tr_peer *              info;
286
287    tr_session *           session;
288    tr_torrent *           torrent;
289    tr_peerIo *            io;
290
291    tr_publisher_t *       publisher;
292
293    struct evbuffer *      outMessages; /* all the non-piece messages */
294
295    struct request_list    peerAskedFor;
296    struct request_list    clientAskedFor;
297    struct request_list    clientWillAskFor;
298
299    tr_timer *             pexTimer;
300
301    time_t                 clientSentPexAt;
302    time_t                 clientSentAnythingAt;
303
304    /* when we started batching the outMessages */
305    time_t                outMessagesBatchedAt;
306
307    tr_bitfield *         peerAllowedPieces;
308
309    struct tr_incoming    incoming;
310
311    tr_pex *              pex;
312};
313
314/**
315***
316**/
317
318static void
319myDebug( const char *               file,
320         int                        line,
321         const struct tr_peermsgs * msgs,
322         const char *               fmt,
323         ... )
324{
325    FILE * fp = tr_getLog( );
326
327    if( fp )
328    {
329        va_list           args;
330        char              timestr[64];
331        struct evbuffer * buf = evbuffer_new( );
332        char *            base = tr_basename( file );
333
334        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
335                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
336                             msgs->torrent->info.name,
337                             tr_peerIoGetAddrStr( msgs->io ),
338                             msgs->info->client );
339        va_start( args, fmt );
340        evbuffer_add_vprintf( buf, fmt, args );
341        va_end( args );
342        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
343        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
344
345        tr_free( base );
346        evbuffer_free( buf );
347    }
348}
349
350#define dbgmsg( msgs, ... ) \
351    do { \
352        if( tr_deepLoggingIsActive( ) ) \
353            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
354    } while( 0 )
355
356/**
357***
358**/
359
360static void
361pokeBatchPeriod( tr_peermsgs * msgs,
362                 int           interval )
363{
364    if( msgs->outMessagesBatchPeriod > interval )
365    {
366        msgs->outMessagesBatchPeriod = interval;
367        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
368    }
369}
370
371static void
372dbgOutMessageLen( tr_peermsgs * msgs )
373{
374    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
375}
376
377static void
378protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
379{
380    tr_peerIo       * io  = msgs->io;
381    struct evbuffer * out = msgs->outMessages;
382
383    assert( tr_peerIoSupportsFEXT( msgs->io ) );
384
385    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
386    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
387    tr_peerIoWriteUint32( io, out, req->index );
388    tr_peerIoWriteUint32( io, out, req->offset );
389    tr_peerIoWriteUint32( io, out, req->length );
390
391    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
392    dbgOutMessageLen( msgs );
393}
394
395static void
396protocolSendRequest( tr_peermsgs *               msgs,
397                     const struct peer_request * req )
398{
399    tr_peerIo       * io  = msgs->io;
400    struct evbuffer * out = msgs->outMessages;
401
402    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
403    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
404    tr_peerIoWriteUint32( io, out, req->index );
405    tr_peerIoWriteUint32( io, out, req->offset );
406    tr_peerIoWriteUint32( io, out, req->length );
407
408    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
409    dbgOutMessageLen( msgs );
410    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
411}
412
413static void
414protocolSendCancel( tr_peermsgs *               msgs,
415                    const struct peer_request * req )
416{
417    tr_peerIo       * io  = msgs->io;
418    struct evbuffer * out = msgs->outMessages;
419
420    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
421    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
422    tr_peerIoWriteUint32( io, out, req->index );
423    tr_peerIoWriteUint32( io, out, req->offset );
424    tr_peerIoWriteUint32( io, out, req->length );
425
426    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
427    dbgOutMessageLen( msgs );
428    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
429}
430
431static void
432protocolSendHave( tr_peermsgs * msgs,
433                  uint32_t      index )
434{
435    tr_peerIo       * io  = msgs->io;
436    struct evbuffer * out = msgs->outMessages;
437
438    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
439    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
440    tr_peerIoWriteUint32( io, out, index );
441
442    dbgmsg( msgs, "sending Have %u...", index );
443    dbgOutMessageLen( msgs );
444    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
445}
446
447static void
448protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
449{
450    tr_peerIo       * io  = msgs->io;
451    struct evbuffer * out = msgs->outMessages;
452
453    assert( tr_peerIoSupportsFEXT( msgs->io ) );
454
455    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
456    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
457    tr_peerIoWriteUint32( io, out, pieceIndex );
458
459    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
460    dbgOutMessageLen( msgs );
461}
462
463static void
464protocolSendChoke( tr_peermsgs * msgs,
465                   int           choke )
466{
467    tr_peerIo       * io  = msgs->io;
468    struct evbuffer * out = msgs->outMessages;
469
470    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
471    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
472
473    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
474    dbgOutMessageLen( msgs );
475    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
476}
477
478static void
479protocolSendHaveAll( tr_peermsgs * msgs )
480{
481    tr_peerIo       * io  = msgs->io;
482    struct evbuffer * out = msgs->outMessages;
483
484    assert( tr_peerIoSupportsFEXT( msgs->io ) );
485
486    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
487    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
488
489    dbgmsg( msgs, "sending HAVE_ALL..." );
490    dbgOutMessageLen( msgs );
491    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
492}
493
494static void
495protocolSendHaveNone( tr_peermsgs * msgs )
496{
497    tr_peerIo       * io  = msgs->io;
498    struct evbuffer * out = msgs->outMessages;
499
500    assert( tr_peerIoSupportsFEXT( msgs->io ) );
501
502    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
503    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
504
505    dbgmsg( msgs, "sending HAVE_NONE..." );
506    dbgOutMessageLen( msgs );
507    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
508}
509
510/**
511***  EVENTS
512**/
513
514static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
515
516static void
517publish( tr_peermsgs *   msgs,
518         tr_peer_event * e )
519{
520    tr_publisherPublish( msgs->publisher, msgs->info, e );
521}
522
523static void
524fireError( tr_peermsgs * msgs,
525           int           err )
526{
527    tr_peer_event e = blankEvent;
528
529    e.eventType = TR_PEER_ERROR;
530    e.err = err;
531    publish( msgs, &e );
532}
533
534static void
535fireNeedReq( tr_peermsgs * msgs )
536{
537    tr_peer_event e = blankEvent;
538
539    e.eventType = TR_PEER_NEED_REQ;
540    publish( msgs, &e );
541}
542
543static void
544firePeerProgress( tr_peermsgs * msgs )
545{
546    tr_peer_event e = blankEvent;
547
548    e.eventType = TR_PEER_PEER_PROGRESS;
549    e.progress = msgs->info->progress;
550    publish( msgs, &e );
551}
552
553static void
554fireGotBlock( tr_peermsgs *               msgs,
555              const struct peer_request * req )
556{
557    tr_peer_event e = blankEvent;
558
559    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
560    e.pieceIndex = req->index;
561    e.offset = req->offset;
562    e.length = req->length;
563    publish( msgs, &e );
564}
565
566static void
567fireClientGotData( tr_peermsgs * msgs,
568                   uint32_t      length,
569                   int           wasPieceData )
570{
571    tr_peer_event e = blankEvent;
572
573    e.length = length;
574    e.eventType = TR_PEER_CLIENT_GOT_DATA;
575    e.wasPieceData = wasPieceData;
576    publish( msgs, &e );
577}
578
579static void
580fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
581{
582    tr_peer_event e = blankEvent;
583    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
584    e.pieceIndex = pieceIndex;
585    publish( msgs, &e );
586}
587
588static void
589fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
590{
591    tr_peer_event e = blankEvent;
592    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
593    e.pieceIndex = pieceIndex;
594    publish( msgs, &e );
595}
596
597static void
598firePeerGotData( tr_peermsgs  * msgs,
599                 uint32_t       length,
600                 int            wasPieceData )
601{
602    tr_peer_event e = blankEvent;
603
604    e.length = length;
605    e.eventType = TR_PEER_PEER_GOT_DATA;
606    e.wasPieceData = wasPieceData;
607
608    publish( msgs, &e );
609}
610
611static void
612fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
613{
614    tr_peer_event e = blankEvent;
615    e.eventType = TR_PEER_CANCEL;
616    e.pieceIndex = req->index;
617    e.offset = req->offset;
618    e.length = req->length;
619    publish( msgs, &e );
620}
621
622/**
623***  ALLOWED FAST SET
624***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
625**/
626
627size_t
628tr_generateAllowedSet( tr_piece_index_t * setmePieces,
629                       size_t             desiredSetSize,
630                       size_t             pieceCount,
631                       const uint8_t    * infohash,
632                       const tr_address * addr )
633{
634    size_t setSize = 0;
635
636    assert( setmePieces );
637    assert( desiredSetSize <= pieceCount );
638    assert( desiredSetSize );
639    assert( pieceCount );
640    assert( infohash );
641    assert( addr );
642
643    if( addr->type == TR_AF_INET )
644    {
645        uint8_t w[SHA_DIGEST_LENGTH + 4];
646        uint8_t x[SHA_DIGEST_LENGTH];
647
648        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
649        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
650        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
651
652        while( setSize<desiredSetSize )
653        {
654            int i;
655            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
656            {
657                size_t k;
658                uint32_t j = i * 4;                                  /* (5) */
659                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
660                uint32_t index = y % pieceCount;                     /* (7) */
661
662                for( k=0; k<setSize; ++k )                           /* (8) */
663                    if( setmePieces[k] == index )
664                        break;
665
666                if( k == setSize )
667                    setmePieces[setSize++] = index;                  /* (9) */
668            }
669
670            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
671        }
672    }
673
674    return setSize;
675}
676
677static void
678updateFastSet( tr_peermsgs * msgs UNUSED )
679{
680#if 0
681    const int fext = tr_peerIoSupportsFEXT( msgs->io );
682    const int peerIsNeedy = msgs->info->progress < 0.10;
683
684    if( fext && peerIsNeedy && !msgs->haveFastSet )
685    {
686        size_t i;
687        const struct tr_address * addr = tr_peerIoGetAddress( msgs->io, NULL );
688        const tr_info * inf = &msgs->torrent->info;
689        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
690
691        /* build the fast set */
692        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
693        msgs->haveFastSet = 1;
694
695        /* send it to the peer */
696        for( i=0; i<msgs->fastsetSize; ++i )
697            protocolSendAllowedFast( msgs, msgs->fastset[i] );
698    }
699#endif
700}
701
702/**
703***  INTEREST
704**/
705
706static int
707isPieceInteresting( const tr_peermsgs * peer,
708                    tr_piece_index_t    piece )
709{
710    const tr_torrent * torrent = peer->torrent;
711
712    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
713           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
714                                                                        */
715           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
716}
717
718/* "interested" means we'll ask for piece data if they unchoke us */
719static int
720isPeerInteresting( const tr_peermsgs * msgs )
721{
722    tr_piece_index_t    i;
723    const tr_torrent *  torrent;
724    const tr_bitfield * bitfield;
725    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
726
727    if( clientIsSeed )
728        return FALSE;
729
730    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
731        return FALSE;
732
733    torrent = msgs->torrent;
734    bitfield = tr_cpPieceBitfield( torrent->completion );
735
736    if( !msgs->info->have )
737        return TRUE;
738
739    assert( bitfield->byteCount == msgs->info->have->byteCount );
740
741    for( i = 0; i < torrent->info.pieceCount; ++i )
742        if( isPieceInteresting( msgs, i ) )
743            return TRUE;
744
745    return FALSE;
746}
747
748static void
749sendInterest( tr_peermsgs * msgs,
750              int           weAreInterested )
751{
752    struct evbuffer * out = msgs->outMessages;
753
754    assert( msgs );
755    assert( weAreInterested == 0 || weAreInterested == 1 );
756
757    msgs->info->clientIsInterested = weAreInterested;
758    dbgmsg( msgs, "Sending %s",
759            weAreInterested ? "Interested" : "Not Interested" );
760    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
761    tr_peerIoWriteUint8 (
762        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
763    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
764    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
765}
766
767static void
768updateInterest( tr_peermsgs * msgs )
769{
770    const int i = isPeerInteresting( msgs );
771
772    if( i != msgs->info->clientIsInterested )
773        sendInterest( msgs, i );
774    if( i )
775        fireNeedReq( msgs );
776}
777
778static int
779popNextRequest( tr_peermsgs *         msgs,
780                struct peer_request * setme )
781{
782    return reqListPop( &msgs->peerAskedFor, setme );
783}
784
785static void
786cancelAllRequestsToClient( tr_peermsgs * msgs )
787{
788    struct peer_request req;
789
790    while( popNextRequest( msgs, &req ) )
791        protocolSendReject( msgs, &req );
792}
793
794void
795tr_peerMsgsSetChoke( tr_peermsgs * msgs,
796                     int           choke )
797{
798    const time_t now = time( NULL );
799    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
800
801    assert( msgs );
802    assert( msgs->info );
803    assert( choke == 0 || choke == 1 );
804
805    if( msgs->info->chokeChangedAt > fibrillationTime )
806    {
807        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
808                choke );
809    }
810    else if( msgs->info->peerIsChoked != choke )
811    {
812        msgs->info->peerIsChoked = choke;
813        if( choke )
814            cancelAllRequestsToClient( msgs );
815        protocolSendChoke( msgs, choke );
816        msgs->info->chokeChangedAt = now;
817    }
818}
819
820/**
821***
822**/
823
824void
825tr_peerMsgsHave( tr_peermsgs * msgs,
826                 uint32_t      index )
827{
828    protocolSendHave( msgs, index );
829
830    /* since we have more pieces now, we might not be interested in this peer */
831    updateInterest( msgs );
832}
833
834/**
835***
836**/
837
838static int
839reqIsValid( const tr_peermsgs * peer,
840            uint32_t            index,
841            uint32_t            offset,
842            uint32_t            length )
843{
844    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
845}
846
847static int
848requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
849{
850    return reqIsValid( msgs, req->index, req->offset, req->length );
851}
852
853static void
854expireOldRequests( tr_peermsgs * msgs, const time_t now  )
855{
856    int                 i;
857    time_t              oldestAllowed;
858    struct request_list tmp = REQUEST_LIST_INIT;
859    const int fext = tr_peerIoSupportsFEXT( msgs->io );
860
861    /* cancel requests that have been queued for too long */
862    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
863    reqListCopy( &tmp, &msgs->clientWillAskFor );
864    for( i = 0; i < tmp.count; ++i ) {
865        const struct peer_request * req = &tmp.requests[i];
866        if( req->time_requested < oldestAllowed )
867            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
868    }
869    reqListClear( &tmp );
870
871    /* if the peer doesn't support "Reject Request",
872     * cancel requests that were sent too long ago. */
873    if( !fext ) {
874        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
875        reqListCopy( &tmp, &msgs->clientAskedFor );
876        for( i=0; i<tmp.count; ++i ) {
877            const struct peer_request * req = &tmp.requests[i];
878            if( req->time_requested < oldestAllowed )
879                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
880        }
881        reqListClear( &tmp );
882    }
883}
884
885static void
886pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
887{
888    const int           max = msgs->maxActiveRequests;
889    const int           min = msgs->minActiveRequests;
890    int                 sent = 0;
891    int                 count = msgs->clientAskedFor.count;
892    struct peer_request req;
893
894    if( count > min )
895        return;
896    if( msgs->info->clientIsChoked )
897        return;
898    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
899        return;
900
901    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
902    {
903        const tr_block_index_t block =
904            _tr_block( msgs->torrent, req.index, req.offset );
905
906        assert( requestIsValid( msgs, &req ) );
907        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
908
909        /* don't ask for it if we've already got it... this block may have
910         * come in from a different peer after we cancelled a request for it */
911        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
912        {
913            protocolSendRequest( msgs, &req );
914            req.time_requested = now;
915            reqListAppend( &msgs->clientAskedFor, &req );
916
917            ++count;
918            ++sent;
919        }
920    }
921
922    if( sent )
923        dbgmsg( msgs,
924                "pump sent %d requests, now have %d active and %d queued",
925                sent,
926                msgs->clientAskedFor.count,
927                msgs->clientWillAskFor.count );
928
929    if( count < max )
930        fireNeedReq( msgs );
931}
932
933static int
934requestQueueIsFull( const tr_peermsgs * msgs )
935{
936    const int req_max = msgs->maxActiveRequests;
937    return msgs->clientWillAskFor.count >= req_max;
938}
939
940tr_addreq_t
941tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
942                       uint32_t         index,
943                       uint32_t         offset,
944                       uint32_t         length,
945                       int              doForce )
946{
947    struct peer_request req;
948
949    assert( msgs );
950    assert( msgs->torrent );
951    assert( reqIsValid( msgs, index, offset, length ) );
952
953    /**
954    ***  Reasons to decline the request
955    **/
956
957    /* don't send requests to choked clients */
958    if( !doForce && msgs->info->clientIsChoked ) {
959        dbgmsg( msgs, "declining request because they're choking us" );
960        return TR_ADDREQ_CLIENT_CHOKED;
961    }
962
963    /* peer doesn't have this piece */
964    if( !doForce && !tr_bitfieldHas( msgs->info->have, index ) )
965        return TR_ADDREQ_MISSING;
966
967    /* peer's queue is full */
968    if( !doForce && requestQueueIsFull( msgs ) ) {
969        dbgmsg( msgs, "declining request because we're full" );
970        return TR_ADDREQ_FULL;
971    }
972
973    /* have we already asked for this piece? */
974    req.index = index;
975    req.offset = offset;
976    req.length = length;
977    if( !doForce ) {
978        if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
979            dbgmsg( msgs, "declining because it's a duplicate" );
980            return TR_ADDREQ_DUPLICATE;
981        }
982        if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
983            dbgmsg( msgs, "declining because it's a duplicate" );
984            return TR_ADDREQ_DUPLICATE;
985        }
986    }
987
988    /**
989    ***  Accept this request
990    **/
991
992    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
993            index, offset, length );
994    req.time_requested = time( NULL );
995    reqListAppend( &msgs->clientWillAskFor, &req );
996    return TR_ADDREQ_OK;
997}
998
999static void
1000cancelAllRequestsToPeer( tr_peermsgs * msgs )
1001{
1002    int                 i;
1003    struct request_list a = msgs->clientWillAskFor;
1004    struct request_list b = msgs->clientAskedFor;
1005    dbgmsg( msgs, "cancelling all requests to peer" );
1006
1007    msgs->clientAskedFor = REQUEST_LIST_INIT;
1008    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1009
1010    for( i=0; i<a.count; ++i )
1011        fireCancelledReq( msgs, &a.requests[i] );
1012
1013    for( i = 0; i < b.count; ++i ) {
1014        fireCancelledReq( msgs, &b.requests[i] );
1015        protocolSendCancel( msgs, &b.requests[i] );
1016    }
1017
1018    reqListClear( &a );
1019    reqListClear( &b );
1020}
1021
1022void
1023tr_peerMsgsCancel( tr_peermsgs * msgs,
1024                   uint32_t      pieceIndex,
1025                   uint32_t      offset,
1026                   uint32_t      length )
1027{
1028    struct peer_request req;
1029
1030    assert( msgs != NULL );
1031    assert( length > 0 );
1032
1033
1034    /* have we asked the peer for this piece? */
1035    req.index = pieceIndex;
1036    req.offset = offset;
1037    req.length = length;
1038
1039    /* if it's only in the queue and hasn't been sent yet, free it */
1040    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1041        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1042        fireCancelledReq( msgs, &req );
1043    }
1044
1045    /* if it's already been sent, send a cancel message too */
1046    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1047        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1048        protocolSendCancel( msgs, &req );
1049        fireCancelledReq( msgs, &req );
1050    }
1051}
1052
1053
1054/**
1055***
1056**/
1057
1058static void
1059sendLtepHandshake( tr_peermsgs * msgs )
1060{
1061    tr_benc           val, *m;
1062    char *            buf;
1063    int               len;
1064    int               pex;
1065    struct evbuffer * out = msgs->outMessages;
1066
1067    if( msgs->clientSentLtepHandshake )
1068        return;
1069
1070    dbgmsg( msgs, "sending an ltep handshake" );
1071    msgs->clientSentLtepHandshake = 1;
1072
1073    /* decide if we want to advertise pex support */
1074    if( !tr_torrentAllowsPex( msgs->torrent ) )
1075        pex = 0;
1076    else if( msgs->peerSentLtepHandshake )
1077        pex = msgs->peerSupportsPex ? 1 : 0;
1078    else
1079        pex = 1;
1080
1081    tr_bencInitDict( &val, 4 );
1082    tr_bencDictAddInt( &val, "e",
1083                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1084    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1085    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1086    m  = tr_bencDictAddDict( &val, "m", 1 );
1087    if( pex )
1088        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1089    buf = tr_bencSave( &val, &len );
1090
1091    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1092    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1093    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1094    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1095    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1096    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
1097
1098    /* cleanup */
1099    tr_bencFree( &val );
1100    tr_free( buf );
1101}
1102
1103static void
1104parseLtepHandshake( tr_peermsgs *     msgs,
1105                    int               len,
1106                    struct evbuffer * inbuf )
1107{
1108    int64_t   i;
1109    tr_benc   val, * sub;
1110    uint8_t * tmp = tr_new( uint8_t, len );
1111
1112    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1113    msgs->peerSentLtepHandshake = 1;
1114
1115    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
1116    {
1117        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1118        tr_free( tmp );
1119        return;
1120    }
1121
1122    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1123
1124    /* does the peer prefer encrypted connections? */
1125    if( tr_bencDictFindInt( &val, "e", &i ) )
1126        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1127                                            : ENCRYPTION_PREFERENCE_NO;
1128
1129    /* check supported messages for utorrent pex */
1130    msgs->peerSupportsPex = 0;
1131    if( tr_bencDictFindDict( &val, "m", &sub ) )
1132    {
1133        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
1134        {
1135            msgs->ut_pex_id = (uint8_t) i;
1136            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1137            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1138        }
1139    }
1140
1141    /* get peer's listening port */
1142    if( tr_bencDictFindInt( &val, "p", &i ) )
1143    {
1144        msgs->info->port = htons( (uint16_t)i );
1145        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1146    }
1147
1148    tr_bencFree( &val );
1149    tr_free( tmp );
1150}
1151
1152static void
1153parseUtPex( tr_peermsgs *     msgs,
1154            int               msglen,
1155            struct evbuffer * inbuf )
1156{
1157    int                loaded = 0;
1158    uint8_t *          tmp = tr_new( uint8_t, msglen );
1159    tr_benc            val;
1160    const tr_torrent * tor = msgs->torrent;
1161    const uint8_t *    added;
1162    size_t             added_len;
1163
1164    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1165
1166    if( tr_torrentAllowsPex( tor )
1167      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
1168      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1169    {
1170        const uint8_t * added_f = NULL;
1171        tr_pex *        pex;
1172        size_t          i, n;
1173        size_t          added_f_len = 0;
1174        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1175        pex =
1176            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1177                                    &n );
1178        for( i = 0; i < n; ++i )
1179            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1180                              TR_PEER_FROM_PEX, pex + i );
1181        tr_free( pex );
1182    }
1183
1184    if( loaded )
1185        tr_bencFree( &val );
1186    tr_free( tmp );
1187}
1188
1189static void sendPex( tr_peermsgs * msgs );
1190
1191static void
1192parseLtep( tr_peermsgs *     msgs,
1193           int               msglen,
1194           struct evbuffer * inbuf )
1195{
1196    uint8_t ltep_msgid;
1197
1198    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1199    msglen--;
1200
1201    if( ltep_msgid == LTEP_HANDSHAKE )
1202    {
1203        dbgmsg( msgs, "got ltep handshake" );
1204        parseLtepHandshake( msgs, msglen, inbuf );
1205        if( tr_peerIoSupportsLTEP( msgs->io ) )
1206        {
1207            sendLtepHandshake( msgs );
1208            sendPex( msgs );
1209        }
1210    }
1211    else if( ltep_msgid == TR_LTEP_PEX )
1212    {
1213        dbgmsg( msgs, "got ut pex" );
1214        msgs->peerSupportsPex = 1;
1215        parseUtPex( msgs, msglen, inbuf );
1216    }
1217    else
1218    {
1219        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1220        evbuffer_drain( inbuf, msglen );
1221    }
1222}
1223
1224static int
1225readBtLength( tr_peermsgs *     msgs,
1226              struct evbuffer * inbuf,
1227              size_t            inlen )
1228{
1229    uint32_t len;
1230
1231    if( inlen < sizeof( len ) )
1232        return READ_LATER;
1233
1234    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1235
1236    if( len == 0 ) /* peer sent us a keepalive message */
1237        dbgmsg( msgs, "got KeepAlive" );
1238    else
1239    {
1240        msgs->incoming.length = len;
1241        msgs->state = AWAITING_BT_ID;
1242    }
1243
1244    return READ_NOW;
1245}
1246
1247static int readBtMessage( tr_peermsgs *     msgs,
1248                          struct evbuffer * inbuf,
1249                          size_t            inlen );
1250
1251static int
1252readBtId( tr_peermsgs *     msgs,
1253          struct evbuffer * inbuf,
1254          size_t            inlen )
1255{
1256    uint8_t id;
1257
1258    if( inlen < sizeof( uint8_t ) )
1259        return READ_LATER;
1260
1261    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1262    msgs->incoming.id = id;
1263
1264    if( id == BT_PIECE )
1265    {
1266        msgs->state = AWAITING_BT_PIECE;
1267        return READ_NOW;
1268    }
1269    else if( msgs->incoming.length != 1 )
1270    {
1271        msgs->state = AWAITING_BT_MESSAGE;
1272        return READ_NOW;
1273    }
1274    else return readBtMessage( msgs, inbuf, inlen - 1 );
1275}
1276
1277static void
1278updatePeerProgress( tr_peermsgs * msgs )
1279{
1280    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
1281    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1282    updateFastSet( msgs );
1283    updateInterest( msgs );
1284    firePeerProgress( msgs );
1285}
1286
1287static void
1288peerMadeRequest( tr_peermsgs *               msgs,
1289                 const struct peer_request * req )
1290{
1291    const int fext = tr_peerIoSupportsFEXT( msgs->io );
1292    const int reqIsValid = requestIsValid( msgs, req );
1293    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1294    const int peerIsChoked = msgs->info->peerIsChoked;
1295
1296    int allow = FALSE;
1297
1298    if( !reqIsValid )
1299        dbgmsg( msgs, "rejecting an invalid request." );
1300    else if( !clientHasPiece )
1301        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1302    else if( peerIsChoked )
1303        dbgmsg( msgs, "rejecting request from choked peer" );
1304    else
1305        allow = TRUE;
1306
1307    if( allow )
1308        reqListAppend( &msgs->peerAskedFor, req );
1309    else if( fext )
1310        protocolSendReject( msgs, req );
1311}
1312
1313static int
1314messageLengthIsCorrect( const tr_peermsgs * msg,
1315                        uint8_t             id,
1316                        uint32_t            len )
1317{
1318    switch( id )
1319    {
1320        case BT_CHOKE:
1321        case BT_UNCHOKE:
1322        case BT_INTERESTED:
1323        case BT_NOT_INTERESTED:
1324        case BT_FEXT_HAVE_ALL:
1325        case BT_FEXT_HAVE_NONE:
1326            return len == 1;
1327
1328        case BT_HAVE:
1329        case BT_FEXT_SUGGEST:
1330        case BT_FEXT_ALLOWED_FAST:
1331            return len == 5;
1332
1333        case BT_BITFIELD:
1334            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1335
1336        case BT_REQUEST:
1337        case BT_CANCEL:
1338        case BT_FEXT_REJECT:
1339            return len == 13;
1340
1341        case BT_PIECE:
1342            return len > 9 && len <= 16393;
1343
1344        case BT_PORT:
1345            return len == 3;
1346
1347        case BT_LTEP:
1348            return len >= 2;
1349
1350        default:
1351            return FALSE;
1352    }
1353}
1354
1355static int clientGotBlock( tr_peermsgs *               msgs,
1356                           const uint8_t *             block,
1357                           const struct peer_request * req );
1358
1359static int
1360readBtPiece( tr_peermsgs      * msgs,
1361             struct evbuffer  * inbuf,
1362             size_t             inlen,
1363             size_t           * setme_piece_bytes_read )
1364{
1365    struct peer_request * req = &msgs->incoming.blockReq;
1366
1367    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1368    dbgmsg( msgs, "In readBtPiece" );
1369
1370    if( !req->length )
1371    {
1372        if( inlen < 8 )
1373            return READ_LATER;
1374
1375        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1376        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1377        req->length = msgs->incoming.length - 9;
1378        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1379                req->offset,
1380                req->length );
1381        return READ_NOW;
1382    }
1383    else
1384    {
1385        int          err;
1386
1387        /* read in another chunk of data */
1388        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1389            msgs->incoming.block );
1390        size_t       n = MIN( nLeft, inlen );
1391        uint8_t *    buf = tr_new( uint8_t, n );
1392        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1393        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1394        evbuffer_add( msgs->incoming.block, buf, n );
1395        fireClientGotData( msgs, n, TRUE );
1396        *setme_piece_bytes_read += n;
1397        tr_free( buf );
1398        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1399               (int)n, req->index, req->offset, req->length,
1400               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1401        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1402            return READ_LATER;
1403
1404        /* we've got the whole block ... process it */
1405        err = clientGotBlock( msgs, EVBUFFER_DATA(
1406                                  msgs->incoming.block ), req );
1407
1408        /* cleanup */
1409        evbuffer_drain( msgs->incoming.block,
1410                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1411        req->length = 0;
1412        msgs->state = AWAITING_BT_LENGTH;
1413        if( !err )
1414            return READ_NOW;
1415        else
1416        {
1417            fireError( msgs, err );
1418            return READ_ERR;
1419        }
1420    }
1421}
1422
1423static int
1424readBtMessage( tr_peermsgs *     msgs,
1425               struct evbuffer * inbuf,
1426               size_t            inlen )
1427{
1428    uint32_t      ui32;
1429    uint32_t      msglen = msgs->incoming.length;
1430    const uint8_t id = msgs->incoming.id;
1431    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1432    const int     fext = tr_peerIoSupportsFEXT( msgs->io );
1433
1434    --msglen; /* id length */
1435
1436    if( inlen < msglen )
1437        return READ_LATER;
1438
1439    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1440            (int)msglen,
1441            (int)inlen );
1442
1443    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1444    {
1445        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1446                (int)id, (int)msglen );
1447        fireError( msgs, EMSGSIZE );
1448        return READ_ERR;
1449    }
1450
1451    switch( id )
1452    {
1453        case BT_CHOKE:
1454            dbgmsg( msgs, "got Choke" );
1455            msgs->info->clientIsChoked = 1;
1456            if( !fext )
1457                cancelAllRequestsToPeer( msgs );
1458            cancelAllRequestsToClient( msgs );
1459            break;
1460
1461        case BT_UNCHOKE:
1462            dbgmsg( msgs, "got Unchoke" );
1463            msgs->info->clientIsChoked = 0;
1464            fireNeedReq( msgs );
1465            break;
1466
1467        case BT_INTERESTED:
1468            dbgmsg( msgs, "got Interested" );
1469            msgs->info->peerIsInterested = 1;
1470            break;
1471
1472        case BT_NOT_INTERESTED:
1473            dbgmsg( msgs, "got Not Interested" );
1474            msgs->info->peerIsInterested = 0;
1475            break;
1476
1477        case BT_HAVE:
1478            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1479            dbgmsg( msgs, "got Have: %u", ui32 );
1480            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1481            {
1482                fireError( msgs, ERANGE );
1483                return READ_ERR;
1484            }
1485            updatePeerProgress( msgs );
1486            tr_rcTransferred( msgs->torrent->swarmSpeed,
1487                              msgs->torrent->info.pieceSize );
1488            break;
1489
1490        case BT_BITFIELD:
1491        {
1492            dbgmsg( msgs, "got a bitfield" );
1493            msgs->peerSentBitfield = 1;
1494            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1495                                msglen );
1496            updatePeerProgress( msgs );
1497            fireNeedReq( msgs );
1498            break;
1499        }
1500
1501        case BT_REQUEST:
1502        {
1503            struct peer_request r;
1504            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1505            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1506            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1507            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1508            peerMadeRequest( msgs, &r );
1509            break;
1510        }
1511
1512        case BT_CANCEL:
1513        {
1514            struct peer_request r;
1515            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1516            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1517            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1518            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1519            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1520                protocolSendReject( msgs, &r );
1521            break;
1522        }
1523
1524        case BT_PIECE:
1525            assert( 0 ); /* handled elsewhere! */
1526            break;
1527
1528        case BT_PORT:
1529            dbgmsg( msgs, "Got a BT_PORT" );
1530            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1531            break;
1532
1533        case BT_FEXT_SUGGEST:
1534            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1535            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1536            if( fext )
1537                fireClientGotSuggest( msgs, ui32 );
1538            else {
1539                fireError( msgs, EPROTO );
1540                return READ_ERR;
1541            }
1542            break;
1543
1544        case BT_FEXT_ALLOWED_FAST:
1545            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1546            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1547            if( fext )
1548                fireClientGotAllowedFast( msgs, ui32 );
1549            else {
1550                fireError( msgs, EPROTO );
1551                return READ_ERR;
1552            }
1553            break;
1554
1555        case BT_FEXT_HAVE_ALL:
1556            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1557            if( fext ) {
1558                tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1559                updatePeerProgress( msgs );
1560            } else {
1561                fireError( msgs, EPROTO );
1562                return READ_ERR;
1563            }
1564            break;
1565
1566
1567        case BT_FEXT_HAVE_NONE:
1568            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1569            if( fext ) {
1570                tr_bitfieldClear( msgs->info->have );
1571                updatePeerProgress( msgs );
1572            } else {
1573                fireError( msgs, EPROTO );
1574                return READ_ERR;
1575            }
1576            break;
1577
1578        case BT_FEXT_REJECT:
1579        {
1580            struct peer_request r;
1581            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1582            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1583            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1584            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1585            if( fext )
1586                reqListRemove( &msgs->clientAskedFor, &r );
1587            else {
1588                fireError( msgs, EPROTO );
1589                return READ_ERR;
1590            }
1591            break;
1592        }
1593
1594        case BT_LTEP:
1595            dbgmsg( msgs, "Got a BT_LTEP" );
1596            parseLtep( msgs, msglen, inbuf );
1597            break;
1598
1599        default:
1600            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1601            tr_peerIoDrain( msgs->io, inbuf, msglen );
1602            break;
1603    }
1604
1605    assert( msglen + 1 == msgs->incoming.length );
1606    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1607
1608    msgs->state = AWAITING_BT_LENGTH;
1609    return READ_NOW;
1610}
1611
1612static void
1613decrementDownloadedCount( tr_peermsgs * msgs,
1614                          uint32_t      byteCount )
1615{
1616    tr_torrent * tor = msgs->torrent;
1617
1618    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1619}
1620
1621static void
1622clientGotUnwantedBlock( tr_peermsgs *               msgs,
1623                        const struct peer_request * req )
1624{
1625    decrementDownloadedCount( msgs, req->length );
1626}
1627
1628static void
1629addPeerToBlamefield( tr_peermsgs * msgs,
1630                     uint32_t      index )
1631{
1632    if( !msgs->info->blame )
1633        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1634    tr_bitfieldAdd( msgs->info->blame, index );
1635}
1636
1637/* returns 0 on success, or an errno on failure */
1638static int
1639clientGotBlock( tr_peermsgs *               msgs,
1640                const uint8_t *             data,
1641                const struct peer_request * req )
1642{
1643    int                    err;
1644    tr_torrent *           tor = msgs->torrent;
1645    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1646
1647    assert( msgs );
1648    assert( req );
1649
1650    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1651    {
1652        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1653                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1654        return EMSGSIZE;
1655    }
1656
1657    /* save the block */
1658    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1659            req->length );
1660
1661    /**
1662    *** Remove the block from our `we asked for this' list
1663    **/
1664
1665    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1666    {
1667        clientGotUnwantedBlock( msgs, req );
1668        dbgmsg( msgs, "we didn't ask for this message..." );
1669        return 0;
1670    }
1671
1672    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1673            msgs->clientAskedFor.count );
1674
1675    /**
1676    *** Error checks
1677    **/
1678
1679    if( tr_cpBlockIsComplete( tor->completion, block ) )
1680    {
1681        dbgmsg( msgs, "we have this block already..." );
1682        clientGotUnwantedBlock( msgs, req );
1683        return 0;
1684    }
1685
1686    /**
1687    ***  Save the block
1688    **/
1689
1690    msgs->info->peerSentPieceDataAt = time( NULL );
1691    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1692        return err;
1693
1694    addPeerToBlamefield( msgs, req->index );
1695    fireGotBlock( msgs, req );
1696    return 0;
1697}
1698
1699static int peerPulse( void * vmsgs );
1700
1701static void
1702didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1703{
1704    tr_peermsgs * msgs = vmsgs;
1705    firePeerGotData( msgs, bytesWritten, wasPieceData );
1706    peerPulse( msgs );
1707}
1708
1709static ReadState
1710canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1711{
1712    ReadState         ret;
1713    tr_peermsgs *     msgs = vmsgs;
1714    struct evbuffer * in = tr_iobuf_input( iobuf );
1715    const size_t      inlen = EVBUFFER_LENGTH( in );
1716
1717    if( !inlen )
1718    {
1719        ret = READ_LATER;
1720    }
1721    else if( msgs->state == AWAITING_BT_PIECE )
1722    {
1723        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1724    }
1725    else switch( msgs->state )
1726    {
1727        case AWAITING_BT_LENGTH:
1728            ret = readBtLength ( msgs, in, inlen ); break;
1729
1730        case AWAITING_BT_ID:
1731            ret = readBtId     ( msgs, in, inlen ); break;
1732
1733        case AWAITING_BT_MESSAGE:
1734            ret = readBtMessage( msgs, in, inlen ); break;
1735
1736        default:
1737            assert( 0 );
1738    }
1739
1740    /* log the raw data that was read */
1741    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1742        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1743
1744    return ret;
1745}
1746
1747/**
1748***
1749**/
1750
1751static int
1752ratePulse( void * vpeer )
1753{
1754    tr_peermsgs * peer = vpeer;
1755    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1756                                                      TR_PEER_TO_CLIENT );
1757    const int estimatedBlocksInNext30Seconds =
1758                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1759
1760    peer->minActiveRequests = 8;
1761    peer->maxActiveRequests = peer->minActiveRequests + estimatedBlocksInNext30Seconds;
1762    return TRUE;
1763}
1764
1765static size_t
1766fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1767{
1768    size_t bytesWritten = 0;
1769    struct peer_request req;
1770    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1771    const int fext = tr_peerIoSupportsFEXT( msgs->io );
1772
1773    /**
1774    ***  Protocol messages
1775    **/
1776
1777    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1778    {
1779        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1780        msgs->outMessagesBatchedAt = now;
1781    }
1782    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1783    {
1784        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1785        /* flush the protocol messages */
1786        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len );
1787        tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1788        msgs->clientSentAnythingAt = now;
1789        msgs->outMessagesBatchedAt = 0;
1790        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1791        bytesWritten +=  len;
1792    }
1793
1794    /**
1795    ***  Blocks
1796    **/
1797
1798    if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
1799        && popNextRequest( msgs, &req ) )
1800    {
1801        if( requestIsValid( msgs, &req )
1802            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1803        {
1804            /* send a block */
1805            uint8_t * buf = tr_new( uint8_t, req.length );
1806            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1807            if( err ) {
1808                fireError( msgs, err );
1809                bytesWritten = 0;
1810                msgs = NULL;
1811            } else {
1812                tr_peerIo * io = msgs->io;
1813                struct evbuffer * out = evbuffer_new( );
1814                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1815                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1816                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1817                tr_peerIoWriteUint32( io, out, req.index );
1818                tr_peerIoWriteUint32( io, out, req.offset );
1819                tr_peerIoWriteBytes ( io, out, buf, req.length );
1820                tr_peerIoWriteBuf( io, out, TRUE );
1821                bytesWritten += EVBUFFER_LENGTH( out );
1822                evbuffer_free( out );
1823                msgs->clientSentAnythingAt = now;
1824            }
1825            tr_free( buf );
1826        }
1827        else if( fext ) /* peer needs a reject message */
1828        {
1829            protocolSendReject( msgs, &req );
1830        }
1831    }
1832
1833    /**
1834    ***  Keepalive
1835    **/
1836
1837    if( ( msgs != NULL )
1838        && ( msgs->clientSentAnythingAt != 0 )
1839        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1840    {
1841        dbgmsg( msgs, "sending a keepalive message" );
1842        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1843        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1844    }
1845
1846    return bytesWritten;
1847}
1848
1849static int
1850peerPulse( void * vmsgs )
1851{
1852    tr_peermsgs * msgs = vmsgs;
1853    const time_t  now = time( NULL );
1854
1855    ratePulse( msgs );
1856
1857    pumpRequestQueue( msgs, now );
1858    expireOldRequests( msgs, now );
1859
1860    for( ;; )
1861        if( fillOutputBuffer( msgs, now ) < 1 )
1862            break;
1863
1864    return TRUE; /* loop forever */
1865}
1866
1867void
1868tr_peerMsgsPulse( tr_peermsgs * msgs )
1869{
1870    if( msgs != NULL )
1871        peerPulse( msgs );
1872}
1873
1874static void
1875gotError( struct tr_iobuf  * iobuf UNUSED,
1876          short              what,
1877          void             * vmsgs )
1878{
1879    if( what & EVBUFFER_TIMEOUT )
1880        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1881    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1882        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1883               what, errno, tr_strerror( errno ) );
1884    fireError( vmsgs, ENOTCONN );
1885}
1886
1887static void
1888sendBitfield( tr_peermsgs * msgs )
1889{
1890    struct evbuffer * out = msgs->outMessages;
1891    tr_bitfield *     field;
1892    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1893    size_t            i;
1894    size_t            lazyCount = 0;
1895
1896    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1897
1898    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1899    {
1900        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1901            speed over a truly random sample -- let's limit the pool size to
1902            the first 1000 pieces so large torrents don't bog things down */
1903        size_t poolSize;
1904        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1905        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1906
1907        /* build the pool */
1908        for( i=poolSize=0; i<maxPoolSize; ++i )
1909            if( tr_bitfieldHas( field, i ) )
1910                pool[poolSize++] = i;
1911
1912        /* pull random piece indices from the pool */
1913        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1914        {
1915            const int pos = tr_cryptoWeakRandInt( poolSize );
1916            const tr_piece_index_t piece = pool[pos];
1917            tr_bitfieldRem( field, piece );
1918            lazyPieces[lazyCount++] = piece;
1919            pool[pos] = pool[--poolSize];
1920        }
1921
1922        /* cleanup */
1923        tr_free( pool );
1924    }
1925
1926    tr_peerIoWriteUint32( msgs->io, out,
1927                          sizeof( uint8_t ) + field->byteCount );
1928    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1929    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1930    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1931           (int)EVBUFFER_LENGTH( out ) );
1932    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1933
1934    for( i = 0; i < lazyCount; ++i )
1935        protocolSendHave( msgs, lazyPieces[i] );
1936
1937    tr_bitfieldFree( field );
1938}
1939
1940static void
1941tellPeerWhatWeHave( tr_peermsgs * msgs )
1942{
1943    const int fext = tr_peerIoSupportsFEXT( msgs->io );
1944
1945    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1946    {
1947        protocolSendHaveAll( msgs );
1948    }
1949    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1950    {
1951        protocolSendHaveNone( msgs );
1952    }
1953    else
1954    {
1955        sendBitfield( msgs );
1956    }
1957}
1958
1959/**
1960***
1961**/
1962
1963/* some peers give us error messages if we send
1964   more than this many peers in a single pex message
1965   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1966#define MAX_PEX_ADDED 50
1967#define MAX_PEX_DROPPED 50
1968
1969typedef struct
1970{
1971    tr_pex *  added;
1972    tr_pex *  dropped;
1973    tr_pex *  elements;
1974    int       addedCount;
1975    int       droppedCount;
1976    int       elementCount;
1977}
1978PexDiffs;
1979
1980static void
1981pexAddedCb( void * vpex,
1982            void * userData )
1983{
1984    PexDiffs * diffs = userData;
1985    tr_pex *   pex = vpex;
1986
1987    if( diffs->addedCount < MAX_PEX_ADDED )
1988    {
1989        diffs->added[diffs->addedCount++] = *pex;
1990        diffs->elements[diffs->elementCount++] = *pex;
1991    }
1992}
1993
1994static void
1995pexDroppedCb( void * vpex,
1996              void * userData )
1997{
1998    PexDiffs * diffs = userData;
1999    tr_pex *   pex = vpex;
2000
2001    if( diffs->droppedCount < MAX_PEX_DROPPED )
2002    {
2003        diffs->dropped[diffs->droppedCount++] = *pex;
2004    }
2005}
2006
2007static void
2008pexElementCb( void * vpex,
2009              void * userData )
2010{
2011    PexDiffs * diffs = userData;
2012    tr_pex *   pex = vpex;
2013
2014    diffs->elements[diffs->elementCount++] = *pex;
2015}
2016
2017/* TODO: ipv6 pex */
2018static void
2019sendPex( tr_peermsgs * msgs )
2020{
2021    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2022    {
2023        PexDiffs diffs;
2024        tr_pex * newPex = NULL;
2025        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2026                                                 msgs->torrent->info.hash,
2027                                                 &newPex );
2028
2029        /* build the diffs */
2030        diffs.added = tr_new( tr_pex, newCount );
2031        diffs.addedCount = 0;
2032        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2033        diffs.droppedCount = 0;
2034        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2035        diffs.elementCount = 0;
2036        tr_set_compare( msgs->pex, msgs->pexCount,
2037                        newPex, newCount,
2038                        tr_pexCompare, sizeof( tr_pex ),
2039                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2040        dbgmsg(
2041            msgs,
2042            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2043            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
2044
2045        if( !diffs.addedCount && !diffs.droppedCount )
2046        {
2047            tr_free( diffs.elements );
2048        }
2049        else
2050        {
2051            int  i;
2052            tr_benc val;
2053            char * benc;
2054            int bencLen;
2055            uint8_t * tmp, *walk;
2056            struct evbuffer * out = msgs->outMessages;
2057
2058            /* update peer */
2059            tr_free( msgs->pex );
2060            msgs->pex = diffs.elements;
2061            msgs->pexCount = diffs.elementCount;
2062
2063            /* build the pex payload */
2064            tr_bencInitDict( &val, 3 );
2065
2066            /* "added" */
2067            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2068            for( i = 0; i < diffs.addedCount; ++i ) {
2069                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2070                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2071            }
2072            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2073            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2074            tr_free( tmp );
2075
2076            /* "added.f" */
2077            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2078            for( i = 0; i < diffs.addedCount; ++i )
2079                *walk++ = diffs.added[i].flags;
2080            assert( ( walk - tmp ) == diffs.addedCount );
2081            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2082            tr_free( tmp );
2083
2084            /* "dropped" */
2085            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2086            for( i = 0; i < diffs.droppedCount; ++i ) {
2087                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2088                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2089            }
2090            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2091            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2092            tr_free( tmp );
2093
2094            /* write the pex message */
2095            benc = tr_bencSave( &val, &bencLen );
2096            tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + bencLen );
2097            tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2098            tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2099            tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2100            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2101            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2102
2103            tr_free( benc );
2104            tr_bencFree( &val );
2105        }
2106
2107        /* cleanup */
2108        tr_free( diffs.added );
2109        tr_free( diffs.dropped );
2110        tr_free( newPex );
2111
2112        msgs->clientSentPexAt = time( NULL );
2113    }
2114}
2115
2116static int
2117pexPulse( void * vpeer )
2118{
2119    sendPex( vpeer );
2120    return TRUE;
2121}
2122
2123/**
2124***
2125**/
2126
2127tr_peermsgs*
2128tr_peerMsgsNew( struct tr_torrent * torrent,
2129                struct tr_peer *    info,
2130                tr_delivery_func    func,
2131                void *              userData,
2132                tr_publisher_tag *  setme )
2133{
2134    tr_peermsgs * m;
2135
2136    assert( info );
2137    assert( info->io );
2138
2139    m = tr_new0( tr_peermsgs, 1 );
2140    m->publisher = tr_publisherNew( );
2141    m->info = info;
2142    m->session = torrent->session;
2143    m->torrent = torrent;
2144    m->io = info->io;
2145    m->info->clientIsChoked = 1;
2146    m->info->peerIsChoked = 1;
2147    m->info->clientIsInterested = 0;
2148    m->info->peerIsInterested = 0;
2149    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2150    m->state = AWAITING_BT_LENGTH;
2151    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2152    m->outMessages = evbuffer_new( );
2153    m->outMessagesBatchedAt = 0;
2154    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2155    m->incoming.block = evbuffer_new( );
2156    m->peerAllowedPieces = NULL;
2157    m->peerAskedFor = REQUEST_LIST_INIT;
2158    m->clientAskedFor = REQUEST_LIST_INIT;
2159    m->clientWillAskFor = REQUEST_LIST_INIT;
2160    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2161
2162    if( tr_peerIoSupportsLTEP( m->io ) )
2163        sendLtepHandshake( m );
2164
2165    tellPeerWhatWeHave( m );
2166
2167    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
2168                                             inactivity */
2169    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2170    ratePulse( m );
2171
2172    return m;
2173}
2174
2175void
2176tr_peerMsgsFree( tr_peermsgs* msgs )
2177{
2178    if( msgs )
2179    {
2180        tr_timerFree( &msgs->pexTimer );
2181        tr_publisherFree( &msgs->publisher );
2182        reqListClear( &msgs->clientWillAskFor );
2183        reqListClear( &msgs->clientAskedFor );
2184        reqListClear( &msgs->peerAskedFor );
2185
2186        tr_bitfieldFree( msgs->peerAllowedPieces );
2187        evbuffer_free( msgs->incoming.block );
2188        evbuffer_free( msgs->outMessages );
2189        tr_free( msgs->pex );
2190
2191        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2192        tr_free( msgs );
2193    }
2194}
2195
2196void
2197tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2198                        tr_publisher_tag tag )
2199{
2200    tr_publisherUnsubscribe( peer->publisher, tag );
2201}
2202
Note: See TracBrowser for help on using the repository browser.