source: trunk/libtransmission/peer-msgs.c @ 7361

Last change on this file since 7361 was 7361, checked in by charles, 12 years ago

(trunk libT) remove a couple of redundant and/or unused fields from struct peer_atom and tr_peermsgs.

  • Property svn:keywords set to Date Rev Author Id
File size: 61.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7361 2008-12-12 02:44:21Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57
58    BT_FEXT_SUGGEST         = 13,
59    BT_FEXT_HAVE_ALL        = 14,
60    BT_FEXT_HAVE_NONE       = 15,
61    BT_FEXT_REJECT          = 16,
62    BT_FEXT_ALLOWED_FAST    = 17,
63
64    BT_LTEP                 = 20,
65
66    LTEP_HANDSHAKE          = 0,
67
68    TR_LTEP_PEX             = 1,
69
70
71
72    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
73
74    /* idle seconds before we send a keepalive */
75    KEEPALIVE_INTERVAL_SECS = 100,
76
77    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
78    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
79                                               */
80
81    MAX_QUEUE_SIZE          = ( 100 ),
82
83    /* how long an unsent request can stay queued before it's returned
84       back to the peer-mgr's pool of requests */
85    QUEUED_REQUEST_TTL_SECS = 20,
86
87    /* how long a sent request can stay queued before it's returned
88       back to the peer-mgr's pool of requests */
89    SENT_REQUEST_TTL_SECS = 240,
90
91    /* used in lowering the outMessages queue period */
92    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
93    HIGH_PRIORITY_INTERVAL_SECS = 2,
94    LOW_PRIORITY_INTERVAL_SECS = 20,
95
96    /* number of pieces to remove from the bitfield when
97     * lazy bitfields are turned on */
98    LAZY_PIECE_COUNT = 26,
99
100    /* number of pieces we'll allow in our fast set */
101    MAX_FAST_SET_SIZE = 3
102};
103
104/**
105***  REQUEST MANAGEMENT
106**/
107
108enum
109{
110    AWAITING_BT_LENGTH,
111    AWAITING_BT_ID,
112    AWAITING_BT_MESSAGE,
113    AWAITING_BT_PIECE
114};
115
116struct peer_request
117{
118    uint32_t    index;
119    uint32_t    offset;
120    uint32_t    length;
121    time_t      time_requested;
122};
123
124static int
125compareRequest( const void * va, const void * vb )
126{
127    const struct peer_request * a = va;
128    const struct peer_request * b = vb;
129
130    if( a->index != b->index )
131        return a->index < b->index ? -1 : 1;
132
133    if( a->offset != b->offset )
134        return a->offset < b->offset ? -1 : 1;
135
136    if( a->length != b->length )
137        return a->length < b->length ? -1 : 1;
138
139    return 0;
140}
141
142struct request_list
143{
144    uint16_t               count;
145    uint16_t               max;
146    struct peer_request *  requests;
147};
148
149static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
150
151static void
152reqListReserve( struct request_list * list,
153                uint16_t              max )
154{
155    if( list->max < max )
156    {
157        list->max = max;
158        list->requests = tr_renew( struct peer_request,
159                                   list->requests,
160                                   list->max );
161    }
162}
163
164static void
165reqListClear( struct request_list * list )
166{
167    tr_free( list->requests );
168    *list = REQUEST_LIST_INIT;
169}
170
171static void
172reqListCopy( struct request_list * dest, const struct request_list * src )
173{
174    dest->count = dest->max = src->count;
175    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
176}
177
178static void
179reqListRemoveOne( struct request_list * list,
180                  int                   i )
181{
182    assert( 0 <= i && i < list->count );
183
184    memmove( &list->requests[i],
185             &list->requests[i + 1],
186             sizeof( struct peer_request ) * ( --list->count - i ) );
187}
188
189static void
190reqListAppend( struct request_list *       list,
191               const struct peer_request * req )
192{
193    if( ++list->count >= list->max )
194        reqListReserve( list, list->max + 8 );
195
196    list->requests[list->count - 1] = *req;
197}
198
199static int
200reqListPop( struct request_list * list,
201            struct peer_request * setme )
202{
203    int success;
204
205    if( !list->count )
206        success = FALSE;
207    else {
208        *setme = list->requests[0];
209        reqListRemoveOne( list, 0 );
210        success = TRUE;
211    }
212
213    return success;
214}
215
216static int
217reqListFind( struct request_list *       list,
218             const struct peer_request * key )
219{
220    uint16_t i;
221
222    for( i = 0; i < list->count; ++i )
223        if( !compareRequest( key, list->requests + i ) )
224            return i;
225
226    return -1;
227}
228
229static int
230reqListRemove( struct request_list *       list,
231               const struct peer_request * key )
232{
233    int success;
234    const int i = reqListFind( list, key );
235
236    if( i < 0 )
237        success = FALSE;
238    else {
239        reqListRemoveOne( list, i );
240        success = TRUE;
241    }
242
243    return success;
244}
245
246/**
247***
248**/
249
250/* this is raw, unchanged data from the peer regarding
251 * the current message that it's sending us. */
252struct tr_incoming
253{
254    uint8_t                id;
255    uint32_t               length; /* includes the +1 for id length */
256    struct peer_request    blockReq; /* metadata for incoming blocks */
257    struct evbuffer *      block; /* piece data for incoming blocks */
258};
259
260/**
261 * Low-level communication state information about a connected peer.
262 *
263 * This structure remembers the low-level protocol states that we're
264 * in with this peer, such as active requests, pex messages, and so on.
265 * Its fields are all private to peer-msgs.c.
266 *
267 * Data not directly involved with sending & receiving messages is
268 * stored in tr_peer, where it can be accessed by both peermsgs and
269 * the peer manager.
270 *
271 * @see struct peer_atom
272 * @see tr_peer
273 */
274struct tr_peermsgs
275{
276    tr_bool         peerSentBitfield;
277    tr_bool         peerSupportsPex;
278    tr_bool         clientSentLtepHandshake;
279    tr_bool         peerSentLtepHandshake;
280    tr_bool         haveFastSet;
281
282    uint8_t         state;
283    uint8_t         ut_pex_id;
284    uint16_t        pexCount;
285    uint16_t        minActiveRequests;
286    uint16_t        maxActiveRequests;
287
288    size_t                 fastsetSize;
289    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
290
291    /* how long the outMessages batch should be allowed to grow before
292     * it's flushed -- some messages (like requests >:) should be sent
293     * very quickly; others aren't as urgent. */
294    int                    outMessagesBatchPeriod;
295
296    tr_peer *              peer;
297
298    tr_session *           session;
299    tr_torrent *           torrent;
300
301    tr_publisher_t *       publisher;
302
303    struct evbuffer *      outMessages; /* all the non-piece messages */
304
305    struct request_list    peerAskedFor;
306    struct request_list    clientAskedFor;
307    struct request_list    clientWillAskFor;
308
309    tr_timer             * pexTimer;
310    tr_pex               * pex;
311
312    time_t                 clientSentPexAt;
313    time_t                 clientSentAnythingAt;
314
315    /* when we started batching the outMessages */
316    time_t                outMessagesBatchedAt;
317
318    struct tr_incoming    incoming;
319
320    /* if the peer supports the Extension Protocol in BEP 10 and
321       supplied a reqq argument, it's stored here.  otherwise the
322       value is zero and should be ignored. */
323    int64_t               reqq;
324};
325
326/**
327***
328**/
329
330static void
331myDebug( const char * file, int line,
332         const struct tr_peermsgs * msgs,
333         const char * fmt, ... )
334{
335    FILE * fp = tr_getLog( );
336
337    if( fp )
338    {
339        va_list           args;
340        char              timestr[64];
341        struct evbuffer * buf = evbuffer_new( );
342        char *            base = tr_basename( file );
343
344        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
345                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
346                             msgs->torrent->info.name,
347                             tr_peerIoGetAddrStr( msgs->peer->io ),
348                             msgs->peer->client );
349        va_start( args, fmt );
350        evbuffer_add_vprintf( buf, fmt, args );
351        va_end( args );
352        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
353        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
354
355        tr_free( base );
356        evbuffer_free( buf );
357    }
358}
359
360#define dbgmsg( msgs, ... ) \
361    do { \
362        if( tr_deepLoggingIsActive( ) ) \
363            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
364    } while( 0 )
365
366/**
367***
368**/
369
370static void
371pokeBatchPeriod( tr_peermsgs * msgs,
372                 int           interval )
373{
374    if( msgs->outMessagesBatchPeriod > interval )
375    {
376        msgs->outMessagesBatchPeriod = interval;
377        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
378    }
379}
380
381static void
382dbgOutMessageLen( tr_peermsgs * msgs )
383{
384    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
385}
386
387static void
388protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
389{
390    tr_peerIo       * io  = msgs->peer->io;
391    struct evbuffer * out = msgs->outMessages;
392
393    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
394
395    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
396    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
397    tr_peerIoWriteUint32( io, out, req->index );
398    tr_peerIoWriteUint32( io, out, req->offset );
399    tr_peerIoWriteUint32( io, out, req->length );
400
401    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
402    dbgOutMessageLen( msgs );
403}
404
405static void
406protocolSendRequest( tr_peermsgs *               msgs,
407                     const struct peer_request * req )
408{
409    tr_peerIo       * io  = msgs->peer->io;
410    struct evbuffer * out = msgs->outMessages;
411
412    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
413    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
414    tr_peerIoWriteUint32( io, out, req->index );
415    tr_peerIoWriteUint32( io, out, req->offset );
416    tr_peerIoWriteUint32( io, out, req->length );
417
418    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
419    dbgOutMessageLen( msgs );
420    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
421}
422
423static void
424protocolSendCancel( tr_peermsgs *               msgs,
425                    const struct peer_request * req )
426{
427    tr_peerIo       * io  = msgs->peer->io;
428    struct evbuffer * out = msgs->outMessages;
429
430    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
431    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
432    tr_peerIoWriteUint32( io, out, req->index );
433    tr_peerIoWriteUint32( io, out, req->offset );
434    tr_peerIoWriteUint32( io, out, req->length );
435
436    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
437    dbgOutMessageLen( msgs );
438    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
439}
440
441static void
442protocolSendHave( tr_peermsgs * msgs,
443                  uint32_t      index )
444{
445    tr_peerIo       * io  = msgs->peer->io;
446    struct evbuffer * out = msgs->outMessages;
447
448    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
449    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
450    tr_peerIoWriteUint32( io, out, index );
451
452    dbgmsg( msgs, "sending Have %u...", index );
453    dbgOutMessageLen( msgs );
454    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
455}
456
457#if 0
458static void
459protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
460{
461    tr_peerIo       * io  = msgs->peer->io;
462    struct evbuffer * out = msgs->outMessages;
463
464    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
465
466    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
467    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
468    tr_peerIoWriteUint32( io, out, pieceIndex );
469
470    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
471    dbgOutMessageLen( msgs );
472}
473#endif
474
475static void
476protocolSendChoke( tr_peermsgs * msgs,
477                   int           choke )
478{
479    tr_peerIo       * io  = msgs->peer->io;
480    struct evbuffer * out = msgs->outMessages;
481
482    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
483    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
484
485    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
486    dbgOutMessageLen( msgs );
487    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
488}
489
490static void
491protocolSendHaveAll( tr_peermsgs * msgs )
492{
493    tr_peerIo       * io  = msgs->peer->io;
494    struct evbuffer * out = msgs->outMessages;
495
496    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
497
498    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
499    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
500
501    dbgmsg( msgs, "sending HAVE_ALL..." );
502    dbgOutMessageLen( msgs );
503    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
504}
505
506static void
507protocolSendHaveNone( tr_peermsgs * msgs )
508{
509    tr_peerIo       * io  = msgs->peer->io;
510    struct evbuffer * out = msgs->outMessages;
511
512    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
513
514    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
515    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
516
517    dbgmsg( msgs, "sending HAVE_NONE..." );
518    dbgOutMessageLen( msgs );
519    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
520}
521
522/**
523***  EVENTS
524**/
525
526static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
527
528static void
529publish( tr_peermsgs * msgs, tr_peer_event * e )
530{
531    assert( msgs->peer );
532    assert( msgs->peer->msgs == msgs );
533
534    tr_publisherPublish( msgs->publisher, msgs->peer, e );
535}
536
537static void
538fireError( tr_peermsgs * msgs, int err )
539{
540    tr_peer_event e = blankEvent;
541    e.eventType = TR_PEER_ERROR;
542    e.err = err;
543    publish( msgs, &e );
544}
545
546static void
547fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
548{
549    tr_peer_event e = blankEvent;
550    e.eventType = TR_PEER_UPLOAD_ONLY;
551    e.uploadOnly = uploadOnly;
552    publish( msgs, &e );
553}
554
555static void
556fireNeedReq( tr_peermsgs * msgs )
557{
558    tr_peer_event e = blankEvent;
559    e.eventType = TR_PEER_NEED_REQ;
560    publish( msgs, &e );
561}
562
563static void
564firePeerProgress( tr_peermsgs * msgs )
565{
566    tr_peer_event e = blankEvent;
567    e.eventType = TR_PEER_PEER_PROGRESS;
568    e.progress = msgs->peer->progress;
569    publish( msgs, &e );
570}
571
572static void
573fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
574{
575    tr_peer_event e = blankEvent;
576    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
577    e.pieceIndex = req->index;
578    e.offset = req->offset;
579    e.length = req->length;
580    publish( msgs, &e );
581}
582
583static void
584fireClientGotData( tr_peermsgs * msgs,
585                   uint32_t      length,
586                   int           wasPieceData )
587{
588    tr_peer_event e = blankEvent;
589
590    e.length = length;
591    e.eventType = TR_PEER_CLIENT_GOT_DATA;
592    e.wasPieceData = wasPieceData;
593    publish( msgs, &e );
594}
595
596static void
597fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
598{
599    tr_peer_event e = blankEvent;
600    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
601    e.pieceIndex = pieceIndex;
602    publish( msgs, &e );
603}
604
605static void
606fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
607{
608    tr_peer_event e = blankEvent;
609    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
610    e.pieceIndex = pieceIndex;
611    publish( msgs, &e );
612}
613
614static void
615firePeerGotData( tr_peermsgs  * msgs,
616                 uint32_t       length,
617                 int            wasPieceData )
618{
619    tr_peer_event e = blankEvent;
620
621    e.length = length;
622    e.eventType = TR_PEER_PEER_GOT_DATA;
623    e.wasPieceData = wasPieceData;
624
625    publish( msgs, &e );
626}
627
628static void
629fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
630{
631    tr_peer_event e = blankEvent;
632    e.eventType = TR_PEER_CANCEL;
633    e.pieceIndex = req->index;
634    e.offset = req->offset;
635    e.length = req->length;
636    publish( msgs, &e );
637}
638
639/**
640***  ALLOWED FAST SET
641***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
642**/
643
644size_t
645tr_generateAllowedSet( tr_piece_index_t * setmePieces,
646                       size_t             desiredSetSize,
647                       size_t             pieceCount,
648                       const uint8_t    * infohash,
649                       const tr_address * addr )
650{
651    size_t setSize = 0;
652
653    assert( setmePieces );
654    assert( desiredSetSize <= pieceCount );
655    assert( desiredSetSize );
656    assert( pieceCount );
657    assert( infohash );
658    assert( addr );
659
660    if( addr->type == TR_AF_INET )
661    {
662        uint8_t w[SHA_DIGEST_LENGTH + 4];
663        uint8_t x[SHA_DIGEST_LENGTH];
664
665        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
666        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
667        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
668
669        while( setSize<desiredSetSize )
670        {
671            int i;
672            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
673            {
674                size_t k;
675                uint32_t j = i * 4;                                  /* (5) */
676                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
677                uint32_t index = y % pieceCount;                     /* (7) */
678
679                for( k=0; k<setSize; ++k )                           /* (8) */
680                    if( setmePieces[k] == index )
681                        break;
682
683                if( k == setSize )
684                    setmePieces[setSize++] = index;                  /* (9) */
685            }
686
687            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
688        }
689    }
690
691    return setSize;
692}
693
694static void
695updateFastSet( tr_peermsgs * msgs UNUSED )
696{
697#if 0
698    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
699    const int peerIsNeedy = msgs->peer->progress < 0.10;
700
701    if( fext && peerIsNeedy && !msgs->haveFastSet )
702    {
703        size_t i;
704        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
705        const tr_info * inf = &msgs->torrent->info;
706        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
707
708        /* build the fast set */
709        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
710        msgs->haveFastSet = 1;
711
712        /* send it to the peer */
713        for( i=0; i<msgs->fastsetSize; ++i )
714            protocolSendAllowedFast( msgs, msgs->fastset[i] );
715    }
716#endif
717}
718
719/**
720***  INTEREST
721**/
722
723static int
724isPieceInteresting( const tr_peermsgs * msgs,
725                    tr_piece_index_t    piece )
726{
727    const tr_torrent * torrent = msgs->torrent;
728
729    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
730          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
731          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
732}
733
734/* "interested" means we'll ask for piece data if they unchoke us */
735static int
736isPeerInteresting( const tr_peermsgs * msgs )
737{
738    tr_piece_index_t    i;
739    const tr_torrent *  torrent;
740    const tr_bitfield * bitfield;
741    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
742
743    if( clientIsSeed )
744        return FALSE;
745
746    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
747        return FALSE;
748
749    torrent = msgs->torrent;
750    bitfield = tr_cpPieceBitfield( torrent->completion );
751
752    if( !msgs->peer->have )
753        return TRUE;
754
755    assert( bitfield->byteCount == msgs->peer->have->byteCount );
756
757    for( i = 0; i < torrent->info.pieceCount; ++i )
758        if( isPieceInteresting( msgs, i ) )
759            return TRUE;
760
761    return FALSE;
762}
763
764static void
765sendInterest( tr_peermsgs * msgs,
766              int           weAreInterested )
767{
768    struct evbuffer * out = msgs->outMessages;
769
770    assert( msgs );
771    assert( weAreInterested == 0 || weAreInterested == 1 );
772
773    msgs->peer->clientIsInterested = weAreInterested;
774    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
775    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
776    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
777    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
778    dbgOutMessageLen( msgs );
779}
780
781static void
782updateInterest( tr_peermsgs * msgs )
783{
784    const int i = isPeerInteresting( msgs );
785
786    if( i != msgs->peer->clientIsInterested )
787        sendInterest( msgs, i );
788    if( i )
789        fireNeedReq( msgs );
790}
791
792static int
793popNextRequest( tr_peermsgs *         msgs,
794                struct peer_request * setme )
795{
796    return reqListPop( &msgs->peerAskedFor, setme );
797}
798
799static void
800cancelAllRequestsToClient( tr_peermsgs * msgs )
801{
802    struct peer_request req;
803    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
804
805    while( popNextRequest( msgs, &req ) )
806        if( mustSendCancel )
807            protocolSendReject( msgs, &req );
808}
809
810void
811tr_peerMsgsSetChoke( tr_peermsgs * msgs,
812                     int           choke )
813{
814    const time_t now = time( NULL );
815    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
816
817    assert( msgs );
818    assert( msgs->peer );
819    assert( choke == 0 || choke == 1 );
820
821    if( msgs->peer->chokeChangedAt > fibrillationTime )
822    {
823        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
824    }
825    else if( msgs->peer->peerIsChoked != choke )
826    {
827        msgs->peer->peerIsChoked = choke;
828        if( choke )
829            cancelAllRequestsToClient( msgs );
830        protocolSendChoke( msgs, choke );
831        msgs->peer->chokeChangedAt = now;
832    }
833}
834
835/**
836***
837**/
838
839void
840tr_peerMsgsHave( tr_peermsgs * msgs,
841                 uint32_t      index )
842{
843    protocolSendHave( msgs, index );
844
845    /* since we have more pieces now, we might not be interested in this peer */
846    updateInterest( msgs );
847}
848
849/**
850***
851**/
852
853static int
854reqIsValid( const tr_peermsgs * peer,
855            uint32_t            index,
856            uint32_t            offset,
857            uint32_t            length )
858{
859    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
860}
861
862static int
863requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
864{
865    return reqIsValid( msgs, req->index, req->offset, req->length );
866}
867
868static void
869expireOldRequests( tr_peermsgs * msgs, const time_t now  )
870{
871    int i;
872    time_t oldestAllowed;
873    struct request_list tmp = REQUEST_LIST_INIT;
874    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
875
876    /* cancel requests that have been queued for too long */
877    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
878    reqListCopy( &tmp, &msgs->clientWillAskFor );
879    for( i = 0; i < tmp.count; ++i ) {
880        const struct peer_request * req = &tmp.requests[i];
881        if( req->time_requested < oldestAllowed )
882            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
883    }
884    reqListClear( &tmp );
885
886    /* if the peer doesn't support "Reject Request",
887     * cancel requests that were sent too long ago. */
888    if( !fext ) {
889        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
890        reqListCopy( &tmp, &msgs->clientAskedFor );
891        for( i=0; i<tmp.count; ++i ) {
892            const struct peer_request * req = &tmp.requests[i];
893            if( req->time_requested < oldestAllowed )
894                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
895        }
896        reqListClear( &tmp );
897    }
898}
899
900static void
901pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
902{
903    const int           max = msgs->maxActiveRequests;
904    const int           min = msgs->minActiveRequests;
905    int                 sent = 0;
906    int                 count = msgs->clientAskedFor.count;
907    struct peer_request req;
908
909    if( count > min )
910        return;
911    if( msgs->peer->clientIsChoked )
912        return;
913    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
914        return;
915
916    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
917    {
918        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
919
920        assert( requestIsValid( msgs, &req ) );
921        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
922
923        /* don't ask for it if we've already got it... this block may have
924         * come in from a different peer after we cancelled a request for it */
925        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
926        {
927            protocolSendRequest( msgs, &req );
928            req.time_requested = now;
929            reqListAppend( &msgs->clientAskedFor, &req );
930
931            ++count;
932            ++sent;
933        }
934    }
935
936    if( sent )
937        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
938                sent,
939                msgs->clientAskedFor.count,
940                msgs->clientWillAskFor.count );
941
942    if( count < max )
943        fireNeedReq( msgs );
944}
945
946static int
947requestQueueIsFull( const tr_peermsgs * msgs )
948{
949    const int req_max = msgs->maxActiveRequests;
950    return msgs->clientWillAskFor.count >= req_max;
951}
952
953tr_addreq_t
954tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
955                       uint32_t         index,
956                       uint32_t         offset,
957                       uint32_t         length,
958                       int              doForce )
959{
960    struct peer_request req;
961
962    assert( msgs );
963    assert( msgs->torrent );
964    assert( reqIsValid( msgs, index, offset, length ) );
965
966    /**
967    ***  Reasons to decline the request
968    **/
969
970    /* don't send requests to choked clients */
971    if( !doForce && msgs->peer->clientIsChoked ) {
972        dbgmsg( msgs, "declining request because they're choking us" );
973        return TR_ADDREQ_CLIENT_CHOKED;
974    }
975
976    /* peer doesn't have this piece */
977    if( !doForce && !tr_bitfieldHas( msgs->peer->have, index ) )
978        return TR_ADDREQ_MISSING;
979
980    /* peer's queue is full */
981    if( !doForce && requestQueueIsFull( msgs ) ) {
982        dbgmsg( msgs, "declining request because we're full" );
983        return TR_ADDREQ_FULL;
984    }
985
986    /* have we already asked for this piece? */
987    req.index = index;
988    req.offset = offset;
989    req.length = length;
990    if( !doForce ) {
991        if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
992            dbgmsg( msgs, "declining because it's a duplicate" );
993            return TR_ADDREQ_DUPLICATE;
994        }
995        if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
996            dbgmsg( msgs, "declining because it's a duplicate" );
997            return TR_ADDREQ_DUPLICATE;
998        }
999    }
1000
1001    /**
1002    ***  Accept this request
1003    **/
1004
1005    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1006            index, offset, length );
1007    req.time_requested = time( NULL );
1008    reqListAppend( &msgs->clientWillAskFor, &req );
1009    return TR_ADDREQ_OK;
1010}
1011
1012static void
1013cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1014{
1015    int i;
1016    struct request_list a = msgs->clientWillAskFor;
1017    struct request_list b = msgs->clientAskedFor;
1018    dbgmsg( msgs, "cancelling all requests to peer" );
1019
1020    msgs->clientAskedFor = REQUEST_LIST_INIT;
1021    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1022
1023    for( i=0; i<a.count; ++i )
1024        fireCancelledReq( msgs, &a.requests[i] );
1025
1026    for( i = 0; i < b.count; ++i ) {
1027        fireCancelledReq( msgs, &b.requests[i] );
1028        if( sendCancel )
1029            protocolSendCancel( msgs, &b.requests[i] );
1030    }
1031
1032    reqListClear( &a );
1033    reqListClear( &b );
1034}
1035
1036void
1037tr_peerMsgsCancel( tr_peermsgs * msgs,
1038                   uint32_t      pieceIndex,
1039                   uint32_t      offset,
1040                   uint32_t      length )
1041{
1042    struct peer_request req;
1043
1044    assert( msgs != NULL );
1045    assert( length > 0 );
1046
1047
1048    /* have we asked the peer for this piece? */
1049    req.index = pieceIndex;
1050    req.offset = offset;
1051    req.length = length;
1052
1053    /* if it's only in the queue and hasn't been sent yet, free it */
1054    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1055        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1056        fireCancelledReq( msgs, &req );
1057    }
1058
1059    /* if it's already been sent, send a cancel message too */
1060    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1061        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1062        protocolSendCancel( msgs, &req );
1063        fireCancelledReq( msgs, &req );
1064    }
1065}
1066
1067
1068/**
1069***
1070**/
1071
1072static void
1073sendLtepHandshake( tr_peermsgs * msgs )
1074{
1075    tr_benc val, *m;
1076    char * buf;
1077    int len;
1078    int pex;
1079    struct evbuffer * out = msgs->outMessages;
1080
1081    if( msgs->clientSentLtepHandshake )
1082        return;
1083
1084    dbgmsg( msgs, "sending an ltep handshake" );
1085    msgs->clientSentLtepHandshake = 1;
1086
1087    /* decide if we want to advertise pex support */
1088    if( !tr_torrentAllowsPex( msgs->torrent ) )
1089        pex = 0;
1090    else if( msgs->peerSentLtepHandshake )
1091        pex = msgs->peerSupportsPex ? 1 : 0;
1092    else
1093        pex = 1;
1094
1095    tr_bencInitDict( &val, 5 );
1096    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1097    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1098    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1099    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1100    m  = tr_bencDictAddDict( &val, "m", 1 );
1101    if( pex )
1102        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1103    buf = tr_bencSave( &val, &len );
1104
1105    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1106    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1107    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1108    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1109    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1110    dbgOutMessageLen( msgs );
1111
1112    /* cleanup */
1113    tr_bencFree( &val );
1114    tr_free( buf );
1115}
1116
1117static void
1118parseLtepHandshake( tr_peermsgs *     msgs,
1119                    int               len,
1120                    struct evbuffer * inbuf )
1121{
1122    int64_t   i;
1123    tr_benc   val, * sub;
1124    uint8_t * tmp = tr_new( uint8_t, len );
1125
1126    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1127    msgs->peerSentLtepHandshake = 1;
1128
1129    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1130    {
1131        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1132        tr_free( tmp );
1133        return;
1134    }
1135
1136    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1137
1138    /* does the peer prefer encrypted connections? */
1139    if( tr_bencDictFindInt( &val, "e", &i ) )
1140        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1141                                              : ENCRYPTION_PREFERENCE_NO;
1142
1143    /* check supported messages for utorrent pex */
1144    msgs->peerSupportsPex = 0;
1145    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1146        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1147            msgs->ut_pex_id = (uint8_t) i;
1148            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1149            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1150        }
1151    }
1152
1153    /* look for upload_only (BEP 21) */
1154    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1155        fireUploadOnly( msgs, i!=0 );
1156
1157    /* get peer's listening port */
1158    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1159        msgs->peer->port = htons( (uint16_t)i );
1160        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1161    }
1162
1163    /* get peer's maximum request queue size */
1164    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1165        msgs->reqq = i;
1166
1167    tr_bencFree( &val );
1168    tr_free( tmp );
1169}
1170
1171static void
1172parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1173{
1174    int loaded = 0;
1175    uint8_t * tmp = tr_new( uint8_t, msglen );
1176    tr_benc val;
1177    const tr_torrent * tor = msgs->torrent;
1178    const uint8_t * added;
1179    size_t added_len;
1180
1181    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1182
1183    if( tr_torrentAllowsPex( tor )
1184      && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
1185      && tr_bencDictFindRaw( &val, "added", &added, &added_len ))
1186    {
1187        const uint8_t * added_f = NULL;
1188        tr_pex *        pex;
1189        size_t          i, n;
1190        size_t          added_f_len = 0;
1191        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1192        pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1193        for( i=0; i<n; ++i )
1194            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1195                              TR_PEER_FROM_PEX, pex + i );
1196        tr_free( pex );
1197    }
1198
1199    if( loaded )
1200        tr_bencFree( &val );
1201    tr_free( tmp );
1202}
1203
1204static void sendPex( tr_peermsgs * msgs );
1205
1206static void
1207parseLtep( tr_peermsgs *     msgs,
1208           int               msglen,
1209           struct evbuffer * inbuf )
1210{
1211    uint8_t ltep_msgid;
1212
1213    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1214    msglen--;
1215
1216    if( ltep_msgid == LTEP_HANDSHAKE )
1217    {
1218        dbgmsg( msgs, "got ltep handshake" );
1219        parseLtepHandshake( msgs, msglen, inbuf );
1220        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1221        {
1222            sendLtepHandshake( msgs );
1223            sendPex( msgs );
1224        }
1225    }
1226    else if( ltep_msgid == TR_LTEP_PEX )
1227    {
1228        dbgmsg( msgs, "got ut pex" );
1229        msgs->peerSupportsPex = 1;
1230        parseUtPex( msgs, msglen, inbuf );
1231    }
1232    else
1233    {
1234        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1235        evbuffer_drain( inbuf, msglen );
1236    }
1237}
1238
1239static int
1240readBtLength( tr_peermsgs *     msgs,
1241              struct evbuffer * inbuf,
1242              size_t            inlen )
1243{
1244    uint32_t len;
1245
1246    if( inlen < sizeof( len ) )
1247        return READ_LATER;
1248
1249    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1250
1251    if( len == 0 ) /* peer sent us a keepalive message */
1252        dbgmsg( msgs, "got KeepAlive" );
1253    else
1254    {
1255        msgs->incoming.length = len;
1256        msgs->state = AWAITING_BT_ID;
1257    }
1258
1259    return READ_NOW;
1260}
1261
1262static int readBtMessage( tr_peermsgs *     msgs,
1263                          struct evbuffer * inbuf,
1264                          size_t            inlen );
1265
1266static int
1267readBtId( tr_peermsgs *     msgs,
1268          struct evbuffer * inbuf,
1269          size_t            inlen )
1270{
1271    uint8_t id;
1272
1273    if( inlen < sizeof( uint8_t ) )
1274        return READ_LATER;
1275
1276    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1277    msgs->incoming.id = id;
1278
1279    if( id == BT_PIECE )
1280    {
1281        msgs->state = AWAITING_BT_PIECE;
1282        return READ_NOW;
1283    }
1284    else if( msgs->incoming.length != 1 )
1285    {
1286        msgs->state = AWAITING_BT_MESSAGE;
1287        return READ_NOW;
1288    }
1289    else return readBtMessage( msgs, inbuf, inlen - 1 );
1290}
1291
1292static void
1293updatePeerProgress( tr_peermsgs * msgs )
1294{
1295    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1296    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1297    updateFastSet( msgs );
1298    updateInterest( msgs );
1299    firePeerProgress( msgs );
1300}
1301
1302static void
1303peerMadeRequest( tr_peermsgs *               msgs,
1304                 const struct peer_request * req )
1305{
1306    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1307    const int reqIsValid = requestIsValid( msgs, req );
1308    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1309    const int peerIsChoked = msgs->peer->peerIsChoked;
1310
1311    int allow = FALSE;
1312
1313    if( !reqIsValid )
1314        dbgmsg( msgs, "rejecting an invalid request." );
1315    else if( !clientHasPiece )
1316        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1317    else if( peerIsChoked )
1318        dbgmsg( msgs, "rejecting request from choked peer" );
1319    else
1320        allow = TRUE;
1321
1322    if( allow )
1323        reqListAppend( &msgs->peerAskedFor, req );
1324    else if( fext )
1325        protocolSendReject( msgs, req );
1326}
1327
1328static int
1329messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1330{
1331    switch( id )
1332    {
1333        case BT_CHOKE:
1334        case BT_UNCHOKE:
1335        case BT_INTERESTED:
1336        case BT_NOT_INTERESTED:
1337        case BT_FEXT_HAVE_ALL:
1338        case BT_FEXT_HAVE_NONE:
1339            return len == 1;
1340
1341        case BT_HAVE:
1342        case BT_FEXT_SUGGEST:
1343        case BT_FEXT_ALLOWED_FAST:
1344            return len == 5;
1345
1346        case BT_BITFIELD:
1347            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1348
1349        case BT_REQUEST:
1350        case BT_CANCEL:
1351        case BT_FEXT_REJECT:
1352            return len == 13;
1353
1354        case BT_PIECE:
1355            return len > 9 && len <= 16393;
1356
1357        case BT_PORT:
1358            return len == 3;
1359
1360        case BT_LTEP:
1361            return len >= 2;
1362
1363        default:
1364            return FALSE;
1365    }
1366}
1367
1368static int clientGotBlock( tr_peermsgs *               msgs,
1369                           const uint8_t *             block,
1370                           const struct peer_request * req );
1371
1372static int
1373readBtPiece( tr_peermsgs      * msgs,
1374             struct evbuffer  * inbuf,
1375             size_t             inlen,
1376             size_t           * setme_piece_bytes_read )
1377{
1378    struct peer_request * req = &msgs->incoming.blockReq;
1379
1380    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1381    dbgmsg( msgs, "In readBtPiece" );
1382
1383    if( !req->length )
1384    {
1385        if( inlen < 8 )
1386            return READ_LATER;
1387
1388        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1389        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1390        req->length = msgs->incoming.length - 9;
1391        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1392        return READ_NOW;
1393    }
1394    else
1395    {
1396        int err;
1397
1398        /* read in another chunk of data */
1399        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1400        size_t n = MIN( nLeft, inlen );
1401        uint8_t * buf = tr_new( uint8_t, n );
1402        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1403        tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, n );
1404        evbuffer_add( msgs->incoming.block, buf, n );
1405        fireClientGotData( msgs, n, TRUE );
1406        *setme_piece_bytes_read += n;
1407        tr_free( buf );
1408        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1409               n, req->index, req->offset, req->length,
1410               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1411        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1412            return READ_LATER;
1413
1414        /* we've got the whole block ... process it */
1415        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1416
1417        /* cleanup */
1418        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1419        req->length = 0;
1420        msgs->state = AWAITING_BT_LENGTH;
1421        if( !err )
1422            return READ_NOW;
1423        else {
1424            fireError( msgs, err );
1425            return READ_ERR;
1426        }
1427    }
1428}
1429
1430static int
1431readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1432{
1433    uint32_t      ui32;
1434    uint32_t      msglen = msgs->incoming.length;
1435    const uint8_t id = msgs->incoming.id;
1436    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1437    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1438
1439    --msglen; /* id length */
1440
1441    if( inlen < msglen )
1442        return READ_LATER;
1443
1444    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1445
1446    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1447    {
1448        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1449        fireError( msgs, EMSGSIZE );
1450        return READ_ERR;
1451    }
1452
1453    switch( id )
1454    {
1455        case BT_CHOKE:
1456            dbgmsg( msgs, "got Choke" );
1457            msgs->peer->clientIsChoked = 1;
1458            if( !fext )
1459                cancelAllRequestsToPeer( msgs, FALSE );
1460            break;
1461
1462        case BT_UNCHOKE:
1463            dbgmsg( msgs, "got Unchoke" );
1464            msgs->peer->clientIsChoked = 0;
1465            fireNeedReq( msgs );
1466            break;
1467
1468        case BT_INTERESTED:
1469            dbgmsg( msgs, "got Interested" );
1470            msgs->peer->peerIsInterested = 1;
1471            break;
1472
1473        case BT_NOT_INTERESTED:
1474            dbgmsg( msgs, "got Not Interested" );
1475            msgs->peer->peerIsInterested = 0;
1476            break;
1477
1478        case BT_HAVE:
1479            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1480            dbgmsg( msgs, "got Have: %u", ui32 );
1481            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1482                fireError( msgs, ERANGE );
1483                return READ_ERR;
1484            }
1485            updatePeerProgress( msgs );
1486            tr_rcTransferred( msgs->torrent->swarmSpeed,
1487                              msgs->torrent->info.pieceSize );
1488            break;
1489
1490        case BT_BITFIELD:
1491        {
1492            dbgmsg( msgs, "got a bitfield" );
1493            msgs->peerSentBitfield = 1;
1494            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1495            updatePeerProgress( msgs );
1496            fireNeedReq( msgs );
1497            break;
1498        }
1499
1500        case BT_REQUEST:
1501        {
1502            struct peer_request r;
1503            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1504            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1505            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1506            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1507            peerMadeRequest( msgs, &r );
1508            break;
1509        }
1510
1511        case BT_CANCEL:
1512        {
1513            struct peer_request r;
1514            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1515            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1516            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1517            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1518            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1519                protocolSendReject( msgs, &r );
1520            break;
1521        }
1522
1523        case BT_PIECE:
1524            assert( 0 ); /* handled elsewhere! */
1525            break;
1526
1527        case BT_PORT:
1528            dbgmsg( msgs, "Got a BT_PORT" );
1529            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1530            break;
1531
1532        case BT_FEXT_SUGGEST:
1533            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1534            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1535            if( fext )
1536                fireClientGotSuggest( msgs, ui32 );
1537            else {
1538                fireError( msgs, EMSGSIZE );
1539                return READ_ERR;
1540            }
1541            break;
1542
1543        case BT_FEXT_ALLOWED_FAST:
1544            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1545            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1546            if( fext )
1547                fireClientGotAllowedFast( msgs, ui32 );
1548            else {
1549                fireError( msgs, EMSGSIZE );
1550                return READ_ERR;
1551            }
1552            break;
1553
1554        case BT_FEXT_HAVE_ALL:
1555            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1556            if( fext ) {
1557                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1558                updatePeerProgress( msgs );
1559            } else {
1560                fireError( msgs, EMSGSIZE );
1561                return READ_ERR;
1562            }
1563            break;
1564
1565
1566        case BT_FEXT_HAVE_NONE:
1567            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1568            if( fext ) {
1569                tr_bitfieldClear( msgs->peer->have );
1570                updatePeerProgress( msgs );
1571            } else {
1572                fireError( msgs, EMSGSIZE );
1573                return READ_ERR;
1574            }
1575            break;
1576
1577        case BT_FEXT_REJECT:
1578        {
1579            struct peer_request r;
1580            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1581            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1582            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1583            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1584            if( fext )
1585                reqListRemove( &msgs->clientAskedFor, &r );
1586            else {
1587                fireError( msgs, EMSGSIZE );
1588                return READ_ERR;
1589            }
1590            break;
1591        }
1592
1593        case BT_LTEP:
1594            dbgmsg( msgs, "Got a BT_LTEP" );
1595            parseLtep( msgs, msglen, inbuf );
1596            break;
1597
1598        default:
1599            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1600            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1601            break;
1602    }
1603
1604    assert( msglen + 1 == msgs->incoming.length );
1605    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1606
1607    msgs->state = AWAITING_BT_LENGTH;
1608    return READ_NOW;
1609}
1610
1611static void
1612decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1613{
1614    tr_torrent * tor = msgs->torrent;
1615
1616    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1617}
1618
1619static void
1620clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1621{
1622    decrementDownloadedCount( msgs, req->length );
1623}
1624
1625static void
1626addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1627{
1628    if( !msgs->peer->blame )
1629         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1630    tr_bitfieldAdd( msgs->peer->blame, index );
1631}
1632
1633/* returns 0 on success, or an errno on failure */
1634static int
1635clientGotBlock( tr_peermsgs *               msgs,
1636                const uint8_t *             data,
1637                const struct peer_request * req )
1638{
1639    int err;
1640    tr_torrent * tor = msgs->torrent;
1641    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1642
1643    assert( msgs );
1644    assert( req );
1645
1646    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1647        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1648                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1649        return EMSGSIZE;
1650    }
1651
1652    /* save the block */
1653    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1654
1655    /**
1656    *** Remove the block from our `we asked for this' list
1657    **/
1658
1659    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1660        clientGotUnwantedBlock( msgs, req );
1661        dbgmsg( msgs, "we didn't ask for this message..." );
1662        return 0;
1663    }
1664
1665    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1666            msgs->clientAskedFor.count );
1667
1668    /**
1669    *** Error checks
1670    **/
1671
1672    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1673        dbgmsg( msgs, "we have this block already..." );
1674        clientGotUnwantedBlock( msgs, req );
1675        return 0;
1676    }
1677
1678    /**
1679    ***  Save the block
1680    **/
1681
1682    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1683        return err;
1684
1685    addPeerToBlamefield( msgs, req->index );
1686    fireGotBlock( msgs, req );
1687    return 0;
1688}
1689
1690static int peerPulse( void * vmsgs );
1691
1692static void
1693didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1694{
1695    tr_peermsgs * msgs = vmsgs;
1696    firePeerGotData( msgs, bytesWritten, wasPieceData );
1697    peerPulse( msgs );
1698}
1699
1700static ReadState
1701canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1702{
1703    ReadState         ret;
1704    tr_peermsgs *     msgs = vmsgs;
1705    struct evbuffer * in = tr_iobuf_input( iobuf );
1706    const size_t      inlen = EVBUFFER_LENGTH( in );
1707
1708    if( !inlen )
1709    {
1710        ret = READ_LATER;
1711    }
1712    else if( msgs->state == AWAITING_BT_PIECE )
1713    {
1714        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1715    }
1716    else switch( msgs->state )
1717    {
1718        case AWAITING_BT_LENGTH:
1719            ret = readBtLength ( msgs, in, inlen ); break;
1720
1721        case AWAITING_BT_ID:
1722            ret = readBtId     ( msgs, in, inlen ); break;
1723
1724        case AWAITING_BT_MESSAGE:
1725            ret = readBtMessage( msgs, in, inlen ); break;
1726
1727        default:
1728            assert( 0 );
1729    }
1730
1731    /* log the raw data that was read */
1732    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1733        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1734
1735    return ret;
1736}
1737
1738/**
1739***
1740**/
1741
1742static int
1743ratePulse( void * vmsgs )
1744{
1745    tr_peermsgs * msgs = vmsgs;
1746    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1747    const int estimatedBlocksInNext30Seconds =
1748                  ( rateToClient * 30 * 1024 ) / msgs->torrent->blockSize;
1749    msgs->minActiveRequests = 8;
1750    msgs->maxActiveRequests = msgs->minActiveRequests + estimatedBlocksInNext30Seconds;
1751    if( msgs->reqq > 0 )
1752        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1753    return TRUE;
1754}
1755
1756static size_t
1757fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1758{
1759    size_t bytesWritten = 0;
1760    struct peer_request req;
1761    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1762    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1763
1764    /**
1765    ***  Protocol messages
1766    **/
1767
1768    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1769    {
1770        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1771        msgs->outMessagesBatchedAt = now;
1772    }
1773    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1774    {
1775        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1776        /* flush the protocol messages */
1777        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1778        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1779        msgs->clientSentAnythingAt = now;
1780        msgs->outMessagesBatchedAt = 0;
1781        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1782        bytesWritten +=  len;
1783    }
1784
1785    /**
1786    ***  Blocks
1787    **/
1788
1789    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1790        && popNextRequest( msgs, &req ) )
1791    {
1792        if( requestIsValid( msgs, &req )
1793            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1794        {
1795            /* send a block */
1796            uint8_t * buf = tr_new( uint8_t, req.length );
1797            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1798            if( err ) {
1799                fireError( msgs, err );
1800                bytesWritten = 0;
1801                msgs = NULL;
1802            } else {
1803                tr_peerIo * io = msgs->peer->io;
1804                struct evbuffer * out = evbuffer_new( );
1805                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1806                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1807                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1808                tr_peerIoWriteUint32( io, out, req.index );
1809                tr_peerIoWriteUint32( io, out, req.offset );
1810                tr_peerIoWriteBytes ( io, out, buf, req.length );
1811                tr_peerIoWriteBuf( io, out, TRUE );
1812                bytesWritten += EVBUFFER_LENGTH( out );
1813                evbuffer_free( out );
1814                msgs->clientSentAnythingAt = now;
1815            }
1816            tr_free( buf );
1817        }
1818        else if( fext ) /* peer needs a reject message */
1819        {
1820            protocolSendReject( msgs, &req );
1821        }
1822    }
1823
1824    /**
1825    ***  Keepalive
1826    **/
1827
1828    if( ( msgs != NULL )
1829        && ( msgs->clientSentAnythingAt != 0 )
1830        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1831    {
1832        dbgmsg( msgs, "sending a keepalive message" );
1833        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1834        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1835    }
1836
1837    return bytesWritten;
1838}
1839
1840static int
1841peerPulse( void * vmsgs )
1842{
1843    tr_peermsgs * msgs = vmsgs;
1844    const time_t  now = time( NULL );
1845
1846    ratePulse( msgs );
1847
1848    pumpRequestQueue( msgs, now );
1849    expireOldRequests( msgs, now );
1850
1851    for( ;; )
1852        if( fillOutputBuffer( msgs, now ) < 1 )
1853            break;
1854
1855    return TRUE; /* loop forever */
1856}
1857
1858void
1859tr_peerMsgsPulse( tr_peermsgs * msgs )
1860{
1861    if( msgs != NULL )
1862        peerPulse( msgs );
1863}
1864
1865static void
1866gotError( struct tr_iobuf  * iobuf UNUSED,
1867          short              what,
1868          void             * vmsgs )
1869{
1870    if( what & EVBUFFER_TIMEOUT )
1871        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1872    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1873        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1874               what, errno, tr_strerror( errno ) );
1875    fireError( vmsgs, ENOTCONN );
1876}
1877
1878static void
1879sendBitfield( tr_peermsgs * msgs )
1880{
1881    struct evbuffer * out = msgs->outMessages;
1882    tr_bitfield *     field;
1883    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1884    size_t            i;
1885    size_t            lazyCount = 0;
1886
1887    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1888
1889    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1890    {
1891        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1892            speed over a truly random sample -- let's limit the pool size to
1893            the first 1000 pieces so large torrents don't bog things down */
1894        size_t poolSize;
1895        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1896        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1897
1898        /* build the pool */
1899        for( i=poolSize=0; i<maxPoolSize; ++i )
1900            if( tr_bitfieldHas( field, i ) )
1901                pool[poolSize++] = i;
1902
1903        /* pull random piece indices from the pool */
1904        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1905        {
1906            const int pos = tr_cryptoWeakRandInt( poolSize );
1907            const tr_piece_index_t piece = pool[pos];
1908            tr_bitfieldRem( field, piece );
1909            lazyPieces[lazyCount++] = piece;
1910            pool[pos] = pool[--poolSize];
1911        }
1912
1913        /* cleanup */
1914        tr_free( pool );
1915    }
1916
1917    tr_peerIoWriteUint32( msgs->peer->io, out,
1918                          sizeof( uint8_t ) + field->byteCount );
1919    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1920    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1921    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1922            EVBUFFER_LENGTH( out ) );
1923    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1924
1925    for( i = 0; i < lazyCount; ++i )
1926        protocolSendHave( msgs, lazyPieces[i] );
1927
1928    tr_bitfieldFree( field );
1929}
1930
1931static void
1932tellPeerWhatWeHave( tr_peermsgs * msgs )
1933{
1934    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1935
1936    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1937    {
1938        protocolSendHaveAll( msgs );
1939    }
1940    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1941    {
1942        protocolSendHaveNone( msgs );
1943    }
1944    else
1945    {
1946        sendBitfield( msgs );
1947    }
1948}
1949
1950/**
1951***
1952**/
1953
1954/* some peers give us error messages if we send
1955   more than this many peers in a single pex message
1956   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1957#define MAX_PEX_ADDED 50
1958#define MAX_PEX_DROPPED 50
1959
1960typedef struct
1961{
1962    tr_pex *  added;
1963    tr_pex *  dropped;
1964    tr_pex *  elements;
1965    int       addedCount;
1966    int       droppedCount;
1967    int       elementCount;
1968}
1969PexDiffs;
1970
1971static void
1972pexAddedCb( void * vpex,
1973            void * userData )
1974{
1975    PexDiffs * diffs = userData;
1976    tr_pex *   pex = vpex;
1977
1978    if( diffs->addedCount < MAX_PEX_ADDED )
1979    {
1980        diffs->added[diffs->addedCount++] = *pex;
1981        diffs->elements[diffs->elementCount++] = *pex;
1982    }
1983}
1984
1985static void
1986pexDroppedCb( void * vpex,
1987              void * userData )
1988{
1989    PexDiffs * diffs = userData;
1990    tr_pex *   pex = vpex;
1991
1992    if( diffs->droppedCount < MAX_PEX_DROPPED )
1993    {
1994        diffs->dropped[diffs->droppedCount++] = *pex;
1995    }
1996}
1997
1998static void
1999pexElementCb( void * vpex,
2000              void * userData )
2001{
2002    PexDiffs * diffs = userData;
2003    tr_pex *   pex = vpex;
2004
2005    diffs->elements[diffs->elementCount++] = *pex;
2006}
2007
2008/* TODO: ipv6 pex */
2009static void
2010sendPex( tr_peermsgs * msgs )
2011{
2012    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2013    {
2014        PexDiffs diffs;
2015        tr_pex * newPex = NULL;
2016        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2017                                                 msgs->torrent->info.hash,
2018                                                 &newPex );
2019
2020        /* build the diffs */
2021        diffs.added = tr_new( tr_pex, newCount );
2022        diffs.addedCount = 0;
2023        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2024        diffs.droppedCount = 0;
2025        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2026        diffs.elementCount = 0;
2027        tr_set_compare( msgs->pex, msgs->pexCount,
2028                        newPex, newCount,
2029                        tr_pexCompare, sizeof( tr_pex ),
2030                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2031        dbgmsg(
2032            msgs,
2033            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2034            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
2035
2036        if( !diffs.addedCount && !diffs.droppedCount )
2037        {
2038            tr_free( diffs.elements );
2039        }
2040        else
2041        {
2042            int  i;
2043            tr_benc val;
2044            char * benc;
2045            int bencLen;
2046            uint8_t * tmp, *walk;
2047            struct evbuffer * out = msgs->outMessages;
2048
2049            /* update peer */
2050            tr_free( msgs->pex );
2051            msgs->pex = diffs.elements;
2052            msgs->pexCount = diffs.elementCount;
2053
2054            /* build the pex payload */
2055            tr_bencInitDict( &val, 3 );
2056
2057            /* "added" */
2058            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2059            for( i = 0; i < diffs.addedCount; ++i ) {
2060                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2061                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2062            }
2063            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2064            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2065            tr_free( tmp );
2066
2067            /* "added.f" */
2068            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2069            for( i = 0; i < diffs.addedCount; ++i )
2070                *walk++ = diffs.added[i].flags;
2071            assert( ( walk - tmp ) == diffs.addedCount );
2072            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2073            tr_free( tmp );
2074
2075            /* "dropped" */
2076            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2077            for( i = 0; i < diffs.droppedCount; ++i ) {
2078                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2079                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2080            }
2081            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2082            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2083            tr_free( tmp );
2084
2085            /* write the pex message */
2086            benc = tr_bencSave( &val, &bencLen );
2087            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2088            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2089            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2090            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2091            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2092            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2093
2094            tr_free( benc );
2095            tr_bencFree( &val );
2096        }
2097
2098        /* cleanup */
2099        tr_free( diffs.added );
2100        tr_free( diffs.dropped );
2101        tr_free( newPex );
2102
2103        msgs->clientSentPexAt = time( NULL );
2104    }
2105}
2106
2107static int
2108pexPulse( void * vpeer )
2109{
2110    sendPex( vpeer );
2111    return TRUE;
2112}
2113
2114/**
2115***
2116**/
2117
2118tr_peermsgs*
2119tr_peerMsgsNew( struct tr_torrent * torrent,
2120                struct tr_peer *    peer,
2121                tr_delivery_func    func,
2122                void *              userData,
2123                tr_publisher_tag *  setme )
2124{
2125    tr_peermsgs * m;
2126
2127    assert( peer );
2128    assert( peer->io );
2129
2130    m = tr_new0( tr_peermsgs, 1 );
2131    m->publisher = tr_publisherNew( );
2132    m->peer = peer;
2133    m->session = torrent->session;
2134    m->torrent = torrent;
2135    m->peer->clientIsChoked = 1;
2136    m->peer->peerIsChoked = 1;
2137    m->peer->clientIsInterested = 0;
2138    m->peer->peerIsInterested = 0;
2139    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2140    m->state = AWAITING_BT_LENGTH;
2141    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2142    m->outMessages = evbuffer_new( );
2143    m->outMessagesBatchedAt = 0;
2144    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2145    m->incoming.block = evbuffer_new( );
2146    m->peerAskedFor = REQUEST_LIST_INIT;
2147    m->clientAskedFor = REQUEST_LIST_INIT;
2148    m->clientWillAskFor = REQUEST_LIST_INIT;
2149    peer->msgs = m;
2150
2151    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2152
2153    if( tr_peerIoSupportsLTEP( peer->io ) )
2154        sendLtepHandshake( m );
2155
2156    tellPeerWhatWeHave( m );
2157
2158    tr_peerIoSetTimeoutSecs( m->peer->io, 150 ); /* timeout after N seconds of inactivity */
2159    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2160    ratePulse( m );
2161
2162    return m;
2163}
2164
2165void
2166tr_peerMsgsFree( tr_peermsgs* msgs )
2167{
2168    if( msgs )
2169    {
2170        tr_timerFree( &msgs->pexTimer );
2171        tr_publisherFree( &msgs->publisher );
2172        reqListClear( &msgs->clientWillAskFor );
2173        reqListClear( &msgs->clientAskedFor );
2174        reqListClear( &msgs->peerAskedFor );
2175
2176        evbuffer_free( msgs->incoming.block );
2177        evbuffer_free( msgs->outMessages );
2178        tr_free( msgs->pex );
2179
2180        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2181        tr_free( msgs );
2182    }
2183}
2184
2185void
2186tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2187                        tr_publisher_tag tag )
2188{
2189    tr_publisherUnsubscribe( peer->publisher, tag );
2190}
2191
Note: See TracBrowser for help on using the repository browser.