source: trunk/libtransmission/peer-msgs.c @ 9466

Last change on this file since 9466 was 9466, checked in by charles, 12 years ago

(trunk libT) turn off a debugging message in the terminal

  • Property svn:keywords set to Date Rev Author Id
File size: 64.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 9466 2009-11-01 03:56:10Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
34#include "ratecontrol.h"
35#include "request-list.h"
36#include "session.h"
37#include "stats.h"
38#include "torrent.h"
39#include "tr-dht.h"
40#include "utils.h"
41#include "version.h"
42
43// #ifdef TRACK_DUPES
44
45/**
46***
47**/
48
49enum
50{
51    BT_CHOKE                = 0,
52    BT_UNCHOKE              = 1,
53    BT_INTERESTED           = 2,
54    BT_NOT_INTERESTED       = 3,
55    BT_HAVE                 = 4,
56    BT_BITFIELD             = 5,
57    BT_REQUEST              = 6,
58    BT_PIECE                = 7,
59    BT_CANCEL               = 8,
60    BT_PORT                 = 9,
61
62    BT_FEXT_SUGGEST         = 13,
63    BT_FEXT_HAVE_ALL        = 14,
64    BT_FEXT_HAVE_NONE       = 15,
65    BT_FEXT_REJECT          = 16,
66    BT_FEXT_ALLOWED_FAST    = 17,
67
68    BT_LTEP                 = 20,
69
70    LTEP_HANDSHAKE          = 0,
71
72    TR_LTEP_PEX             = 1,
73
74    MAX_PEX_PEER_COUNT      = 50,
75
76    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
77
78    /* idle seconds before we send a keepalive */
79    KEEPALIVE_INTERVAL_SECS = 100,
80
81    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
82
83
84    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
85
86
87    /* how long an unsent request can stay queued before it's returned
88       back to the peer-mgr's pool of requests */
89    QUEUED_REQUEST_TTL_SECS = 20,
90
91    /* how long a sent request can stay queued before it's returned
92       back to the peer-mgr's pool of requests */
93    SENT_REQUEST_TTL_SECS = 240,
94
95    /* used in lowering the outMessages queue period */
96    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
97    HIGH_PRIORITY_INTERVAL_SECS = 2,
98    LOW_PRIORITY_INTERVAL_SECS = 20,
99
100    /* number of pieces to remove from the bitfield when
101     * lazy bitfields are turned on */
102    LAZY_PIECE_COUNT = 26,
103
104    /* number of pieces we'll allow in our fast set */
105    MAX_FAST_SET_SIZE = 3
106};
107
108enum
109{
110    AWAITING_BT_LENGTH,
111    AWAITING_BT_ID,
112    AWAITING_BT_MESSAGE,
113    AWAITING_BT_PIECE
114};
115
116/**
117***
118**/
119
120/* this is raw, unchanged data from the peer regarding
121 * the current message that it's sending us. */
122struct tr_incoming
123{
124    uint8_t                id;
125    uint32_t               length; /* includes the +1 for id length */
126    struct peer_request    blockReq; /* metadata for incoming blocks */
127    struct evbuffer *      block; /* piece data for incoming blocks */
128};
129
130/**
131 * Low-level communication state information about a connected peer.
132 *
133 * This structure remembers the low-level protocol states that we're
134 * in with this peer, such as active requests, pex messages, and so on.
135 * Its fields are all private to peer-msgs.c.
136 *
137 * Data not directly involved with sending & receiving messages is
138 * stored in tr_peer, where it can be accessed by both peermsgs and
139 * the peer manager.
140 *
141 * @see struct peer_atom
142 * @see tr_peer
143 */
144struct tr_peermsgs
145{
146    tr_bool         peerSupportsPex;
147    tr_bool         clientSentLtepHandshake;
148    tr_bool         peerSentLtepHandshake;
149    /*tr_bool         haveFastSet;*/
150
151    /* how long the outMessages batch should be allowed to grow before
152     * it's flushed -- some messages (like requests >:) should be sent
153     * very quickly; others aren't as urgent. */
154    int8_t          outMessagesBatchPeriod;
155
156    uint8_t         state;
157    uint8_t         ut_pex_id;
158    uint16_t        pexCount;
159    uint16_t        pexCount6;
160    uint16_t        maxActiveRequests;
161
162#if 0
163    size_t                 fastsetSize;
164    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
165#endif
166
167    tr_peer *              peer;
168
169    tr_torrent *           torrent;
170
171    tr_publisher           publisher;
172
173    struct evbuffer *      outMessages; /* all the non-piece messages */
174
175    struct request_list    peerAskedFor;
176    struct request_list    clientAskedFor;
177    struct request_list    clientWillAskFor;
178
179    tr_pex               * pex;
180    tr_pex               * pex6;
181
182    /*time_t                 clientSentPexAt;*/
183    time_t                 clientSentAnythingAt;
184
185    /* when we started batching the outMessages */
186    time_t                outMessagesBatchedAt;
187
188    struct tr_incoming    incoming;
189
190    /* if the peer supports the Extension Protocol in BEP 10 and
191       supplied a reqq argument, it's stored here.  otherwise the
192       value is zero and should be ignored. */
193    int64_t               reqq;
194
195    struct event          pexTimer;
196};
197
198static TR_INLINE tr_session*
199getSession( struct tr_peermsgs * msgs )
200{
201    return msgs->torrent->session;
202}
203
204/**
205***
206**/
207
208static void
209myDebug( const char * file, int line,
210         const struct tr_peermsgs * msgs,
211         const char * fmt, ... )
212{
213    FILE * fp = tr_getLog( );
214
215    if( fp )
216    {
217        va_list           args;
218        char              timestr[64];
219        struct evbuffer * buf = evbuffer_new( );
220        char *            base = tr_basename( file );
221
222        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
223                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
224                             tr_torrentName( msgs->torrent ),
225                             tr_peerIoGetAddrStr( msgs->peer->io ),
226                             msgs->peer->client );
227        va_start( args, fmt );
228        evbuffer_add_vprintf( buf, fmt, args );
229        va_end( args );
230        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
231        /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
232        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
233
234        tr_free( base );
235        evbuffer_free( buf );
236    }
237}
238
239#define dbgmsg( msgs, ... ) \
240    do { \
241        if( tr_deepLoggingIsActive( ) ) \
242            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
243    } while( 0 )
244
245/**
246***
247**/
248
249static void
250pokeBatchPeriod( tr_peermsgs * msgs,
251                 int           interval )
252{
253    if( msgs->outMessagesBatchPeriod > interval )
254    {
255        msgs->outMessagesBatchPeriod = interval;
256        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
257    }
258}
259
260static TR_INLINE void
261dbgOutMessageLen( tr_peermsgs * msgs )
262{
263    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
264}
265
266static void
267protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
268{
269    tr_peerIo       * io  = msgs->peer->io;
270    struct evbuffer * out = msgs->outMessages;
271
272    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
273
274    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
275    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
276    tr_peerIoWriteUint32( io, out, req->index );
277    tr_peerIoWriteUint32( io, out, req->offset );
278    tr_peerIoWriteUint32( io, out, req->length );
279
280    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
281    dbgOutMessageLen( msgs );
282}
283
284static void
285protocolSendRequest( tr_peermsgs               * msgs,
286                     const struct peer_request * req )
287{
288    tr_peerIo       * io  = msgs->peer->io;
289    struct evbuffer * out = msgs->outMessages;
290
291    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
292    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
293    tr_peerIoWriteUint32( io, out, req->index );
294    tr_peerIoWriteUint32( io, out, req->offset );
295    tr_peerIoWriteUint32( io, out, req->length );
296
297    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
298    dbgOutMessageLen( msgs );
299    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
300}
301
302static void
303protocolSendCancel( tr_peermsgs               * msgs,
304                    const struct peer_request * req )
305{
306    tr_peerIo       * io  = msgs->peer->io;
307    struct evbuffer * out = msgs->outMessages;
308
309    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
310    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
311    tr_peerIoWriteUint32( io, out, req->index );
312    tr_peerIoWriteUint32( io, out, req->offset );
313    tr_peerIoWriteUint32( io, out, req->length );
314
315    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
316    dbgOutMessageLen( msgs );
317    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
318}
319
320static void
321protocolSendPort(tr_peermsgs *msgs, uint16_t port)
322{
323    tr_peerIo       * io  = msgs->peer->io;
324    struct evbuffer * out = msgs->outMessages;
325
326    dbgmsg( msgs, "sending Port %u", port);
327    tr_peerIoWriteUint32( io, out, 3 );
328    tr_peerIoWriteUint8 ( io, out, BT_PORT );
329    tr_peerIoWriteUint16( io, out, port);
330}
331
332static void
333protocolSendHave( tr_peermsgs * msgs,
334                  uint32_t      index )
335{
336    tr_peerIo       * io  = msgs->peer->io;
337    struct evbuffer * out = msgs->outMessages;
338
339    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
340    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
341    tr_peerIoWriteUint32( io, out, index );
342
343    dbgmsg( msgs, "sending Have %u", index );
344    dbgOutMessageLen( msgs );
345    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
346}
347
348#if 0
349static void
350protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
351{
352    tr_peerIo       * io  = msgs->peer->io;
353    struct evbuffer * out = msgs->outMessages;
354
355    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
356
357    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
358    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
359    tr_peerIoWriteUint32( io, out, pieceIndex );
360
361    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
362    dbgOutMessageLen( msgs );
363}
364#endif
365
366static void
367protocolSendChoke( tr_peermsgs * msgs,
368                   int           choke )
369{
370    tr_peerIo       * io  = msgs->peer->io;
371    struct evbuffer * out = msgs->outMessages;
372
373    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
374    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
375
376    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
377    dbgOutMessageLen( msgs );
378    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
379}
380
381static void
382protocolSendHaveAll( tr_peermsgs * msgs )
383{
384    tr_peerIo       * io  = msgs->peer->io;
385    struct evbuffer * out = msgs->outMessages;
386
387    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
388
389    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
390    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
391
392    dbgmsg( msgs, "sending HAVE_ALL..." );
393    dbgOutMessageLen( msgs );
394    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
395}
396
397static void
398protocolSendHaveNone( tr_peermsgs * msgs )
399{
400    tr_peerIo       * io  = msgs->peer->io;
401    struct evbuffer * out = msgs->outMessages;
402
403    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
404
405    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
406    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
407
408    dbgmsg( msgs, "sending HAVE_NONE..." );
409    dbgOutMessageLen( msgs );
410    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
411}
412
413/**
414***  EVENTS
415**/
416
417static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0, 0 };
418
419static void
420publish( tr_peermsgs * msgs, tr_peer_event * e )
421{
422    assert( msgs->peer );
423    assert( msgs->peer->msgs == msgs );
424
425    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
426}
427
428static void
429fireError( tr_peermsgs * msgs, int err )
430{
431    tr_peer_event e = blankEvent;
432    e.eventType = TR_PEER_ERROR;
433    e.err = err;
434    publish( msgs, &e );
435}
436
437static void
438fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
439{
440    tr_peer_event e = blankEvent;
441    e.eventType = TR_PEER_UPLOAD_ONLY;
442    e.uploadOnly = uploadOnly;
443    publish( msgs, &e );
444}
445
446static void
447fireNeedReq( tr_peermsgs * msgs )
448{
449    tr_peer_event e = blankEvent;
450    e.eventType = TR_PEER_NEED_REQ;
451    publish( msgs, &e );
452}
453
454static void
455firePeerProgress( tr_peermsgs * msgs )
456{
457    tr_peer_event e = blankEvent;
458    e.eventType = TR_PEER_PEER_PROGRESS;
459    e.progress = msgs->peer->progress;
460    publish( msgs, &e );
461}
462
463#ifdef TRACK_DUPES
464static double blocksGotten = 0.0;
465#endif
466
467static void
468fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
469{
470    tr_peer_event e = blankEvent;
471#ifdef TRACK_DUPES
472++blocksGotten;
473#endif
474    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
475    e.pieceIndex = req->index;
476    e.offset = req->offset;
477    e.length = req->length;
478    publish( msgs, &e );
479}
480
481static void
482fireClientGotData( tr_peermsgs * msgs,
483                   uint32_t      length,
484                   int           wasPieceData )
485{
486    tr_peer_event e = blankEvent;
487
488    e.length = length;
489    e.eventType = TR_PEER_CLIENT_GOT_DATA;
490    e.wasPieceData = wasPieceData;
491    publish( msgs, &e );
492}
493
494static void
495fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
496{
497    tr_peer_event e = blankEvent;
498    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
499    e.pieceIndex = pieceIndex;
500    publish( msgs, &e );
501}
502
503static void
504fireClientGotPort( tr_peermsgs * msgs, tr_port port )
505{
506    tr_peer_event e = blankEvent;
507    e.eventType = TR_PEER_CLIENT_GOT_PORT;
508    e.port = port;
509    publish( msgs, &e );
510}
511
512static void
513fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
514{
515    tr_peer_event e = blankEvent;
516    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
517    e.pieceIndex = pieceIndex;
518    publish( msgs, &e );
519}
520
521static void
522firePeerGotData( tr_peermsgs  * msgs,
523                 uint32_t       length,
524                 int            wasPieceData )
525{
526    tr_peer_event e = blankEvent;
527
528    e.length = length;
529    e.eventType = TR_PEER_PEER_GOT_DATA;
530    e.wasPieceData = wasPieceData;
531
532    publish( msgs, &e );
533}
534
535static void
536fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
537{
538    tr_peer_event e = blankEvent;
539    e.eventType = TR_PEER_CANCEL;
540    e.pieceIndex = req->index;
541    e.offset = req->offset;
542    e.length = req->length;
543    publish( msgs, &e );
544}
545
546/**
547***  ALLOWED FAST SET
548***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
549**/
550
551size_t
552tr_generateAllowedSet( tr_piece_index_t * setmePieces,
553                       size_t             desiredSetSize,
554                       size_t             pieceCount,
555                       const uint8_t    * infohash,
556                       const tr_address * addr )
557{
558    size_t setSize = 0;
559
560    assert( setmePieces );
561    assert( desiredSetSize <= pieceCount );
562    assert( desiredSetSize );
563    assert( pieceCount );
564    assert( infohash );
565    assert( addr );
566
567    if( addr->type == TR_AF_INET )
568    {
569        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
570        uint8_t x[SHA_DIGEST_LENGTH];
571
572        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
573        memcpy( w, &ui32, sizeof( uint32_t ) );
574        walk += sizeof( uint32_t );
575        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
576        walk += SHA_DIGEST_LENGTH;
577        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
578        assert( sizeof( w ) == walk-w );
579
580        while( setSize<desiredSetSize )
581        {
582            int i;
583            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
584            {
585                size_t k;
586                uint32_t j = i * 4;                                  /* (5) */
587                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
588                uint32_t index = y % pieceCount;                     /* (7) */
589
590                for( k=0; k<setSize; ++k )                           /* (8) */
591                    if( setmePieces[k] == index )
592                        break;
593
594                if( k == setSize )
595                    setmePieces[setSize++] = index;                  /* (9) */
596            }
597
598            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
599        }
600    }
601
602    return setSize;
603}
604
605static void
606updateFastSet( tr_peermsgs * msgs UNUSED )
607{
608#if 0
609    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
610    const int peerIsNeedy = msgs->peer->progress < 0.10;
611
612    if( fext && peerIsNeedy && !msgs->haveFastSet )
613    {
614        size_t i;
615        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
616        const tr_info * inf = &msgs->torrent->info;
617        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
618
619        /* build the fast set */
620        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
621        msgs->haveFastSet = 1;
622
623        /* send it to the peer */
624        for( i=0; i<msgs->fastsetSize; ++i )
625            protocolSendAllowedFast( msgs, msgs->fastset[i] );
626    }
627#endif
628}
629
630/**
631***  INTEREST
632**/
633
634static tr_bool
635isPieceInteresting( const tr_peermsgs * msgs,
636                    tr_piece_index_t    piece )
637{
638    const tr_torrent * torrent = msgs->torrent;
639
640    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
641          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
642          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
643}
644
645/* "interested" means we'll ask for piece data if they unchoke us */
646static tr_bool
647isPeerInteresting( const tr_peermsgs * msgs )
648{
649    tr_piece_index_t    i;
650    const tr_torrent *  torrent;
651    const tr_bitfield * bitfield;
652    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
653
654    if( clientIsSeed )
655        return FALSE;
656
657    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
658        return FALSE;
659
660    torrent = msgs->torrent;
661    bitfield = tr_cpPieceBitfield( &torrent->completion );
662
663    if( !msgs->peer->have )
664        return TRUE;
665
666    assert( bitfield->byteCount == msgs->peer->have->byteCount );
667
668    for( i = 0; i < torrent->info.pieceCount; ++i )
669        if( isPieceInteresting( msgs, i ) )
670            return TRUE;
671
672    return FALSE;
673}
674
675static void
676sendInterest( tr_peermsgs * msgs,
677              int           weAreInterested )
678{
679    struct evbuffer * out = msgs->outMessages;
680
681    assert( msgs );
682    assert( weAreInterested == 0 || weAreInterested == 1 );
683
684    msgs->peer->clientIsInterested = weAreInterested;
685    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
686    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
687    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
688
689    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
690    dbgOutMessageLen( msgs );
691}
692
693static void
694updateInterest( tr_peermsgs * msgs )
695{
696    const int i = isPeerInteresting( msgs );
697
698    if( i != msgs->peer->clientIsInterested )
699        sendInterest( msgs, i );
700    if( i )
701        fireNeedReq( msgs );
702}
703
704static tr_bool
705popNextRequest( tr_peermsgs *         msgs,
706                struct peer_request * setme )
707{
708    return reqListPop( &msgs->peerAskedFor, setme );
709}
710
711static void
712cancelAllRequestsToClient( tr_peermsgs * msgs )
713{
714    struct peer_request req;
715    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
716
717    while( popNextRequest( msgs, &req ) )
718        if( mustSendCancel )
719            protocolSendReject( msgs, &req );
720}
721
722void
723tr_peerMsgsSetChoke( tr_peermsgs * msgs,
724                     int           choke )
725{
726    const time_t now = time( NULL );
727    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
728
729    assert( msgs );
730    assert( msgs->peer );
731    assert( choke == 0 || choke == 1 );
732
733    if( msgs->peer->chokeChangedAt > fibrillationTime )
734    {
735        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
736    }
737    else if( msgs->peer->peerIsChoked != choke )
738    {
739        msgs->peer->peerIsChoked = choke;
740        if( choke )
741            cancelAllRequestsToClient( msgs );
742        protocolSendChoke( msgs, choke );
743        msgs->peer->chokeChangedAt = now;
744    }
745}
746
747/**
748***
749**/
750
751void
752tr_peerMsgsHave( tr_peermsgs * msgs,
753                 uint32_t      index )
754{
755    protocolSendHave( msgs, index );
756
757    /* since we have more pieces now, we might not be interested in this peer */
758    updateInterest( msgs );
759}
760
761/**
762***
763**/
764
765static tr_bool
766reqIsValid( const tr_peermsgs * peer,
767            uint32_t            index,
768            uint32_t            offset,
769            uint32_t            length )
770{
771    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
772}
773
774static tr_bool
775requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
776{
777    return reqIsValid( msgs, req->index, req->offset, req->length );
778}
779
780static void
781expireFromList( tr_peermsgs          * msgs,
782                struct request_list  * list,
783                const time_t           oldestAllowed )
784{
785    size_t i;
786    struct request_list tmp = REQUEST_LIST_INIT;
787
788    /* since the fifo list is sorted by time, the oldest will be first */
789    if( !list->len || ( list->fifo[0].time_requested >= oldestAllowed ) )
790        return;
791
792    /* if we found one too old, start pruning them */
793    reqListCopy( &tmp, list );
794    for( i=0; i<tmp.len; ++i ) {
795        const struct peer_request * req = &tmp.fifo[i];
796        if( req->time_requested >= oldestAllowed )
797            break;
798        tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
799    }
800    reqListClear( &tmp );
801}
802
803static void
804expireOldRequests( tr_peermsgs * msgs, const time_t now  )
805{
806    time_t oldestAllowed;
807    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
808    dbgmsg( msgs, "entering `expire old requests' block" );
809
810    /* cancel requests that have been queued for too long */
811    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
812    expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed );
813
814    /* if the peer doesn't support "Reject Request",
815     * cancel requests that were sent too long ago. */
816    if( !fext ) {
817        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
818        expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed );
819    }
820
821    dbgmsg( msgs, "leaving `expire old requests' block" );
822}
823
824static void
825pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
826{
827    const int           max = msgs->maxActiveRequests;
828    int                 sent = 0;
829    int                 len = msgs->clientAskedFor.len;
830    struct peer_request req;
831
832    dbgmsg( msgs, "clientIsChoked %d, download allowed %d, len %d, max %d, msgs->clientWillAskFor.len %d",
833            (int)msgs->peer->clientIsChoked,
834            (int)tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ),
835            len, max, msgs->clientWillAskFor.len );
836
837    if( msgs->peer->clientIsChoked )
838        return;
839    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
840        return;
841
842    while( ( len < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
843    {
844        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
845
846        assert( requestIsValid( msgs, &req ) );
847        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
848
849        /* don't ask for it if we've already got it... this block may have
850         * come in from a different peer after we cancelled a request for it */
851        if( !tr_cpBlockIsComplete( &msgs->torrent->completion, block ) )
852        {
853            protocolSendRequest( msgs, &req );
854            req.time_requested = now;
855            reqListAppend( &msgs->clientAskedFor, &req );
856
857            ++len;
858            ++sent;
859        }
860        else dbgmsg( msgs, "not asking for it because we've already got it..." );
861    }
862
863    if( sent )
864        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
865                sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len );
866
867    if( len < max )
868        fireNeedReq( msgs );
869}
870
871static TR_INLINE tr_bool
872requestQueueIsFull( const tr_peermsgs * msgs )
873{
874    const int req_max = msgs->maxActiveRequests;
875    return msgs->clientWillAskFor.len >= (size_t)req_max;
876}
877
878tr_addreq_t
879tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
880                       uint32_t         index,
881                       uint32_t         offset,
882                       uint32_t         length )
883{
884    struct peer_request req;
885
886    assert( msgs );
887    assert( msgs->torrent );
888
889    /**
890    ***  Reasons to decline the request
891    **/
892
893    /* don't send requests to choked clients */
894    if( msgs->peer->clientIsChoked ) {
895        dbgmsg( msgs, "declining request because they're choking us" );
896        return TR_ADDREQ_CLIENT_CHOKED;
897    }
898
899    /* peer's queue is full */
900    if( requestQueueIsFull( msgs ) ) {
901        dbgmsg( msgs, "declining request because we're full" );
902        return TR_ADDREQ_FULL;
903    }
904
905    /* peer doesn't have this piece */
906    if( !tr_bitfieldHas( msgs->peer->have, index ) )
907        return TR_ADDREQ_MISSING;
908
909    /* have we already asked for this piece? */
910    req.index = index;
911    req.offset = offset;
912    req.length = length;
913    if( reqListHas( &msgs->clientAskedFor, &req ) ) {
914        dbgmsg( msgs, "declining because it's a duplicate" );
915        return TR_ADDREQ_DUPLICATE;
916    }
917    if( reqListHas( &msgs->clientWillAskFor, &req ) ) {
918        dbgmsg( msgs, "declining because it's a duplicate" );
919        return TR_ADDREQ_DUPLICATE;
920    }
921
922    /**
923    ***  Accept this request
924    **/
925
926    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
927            index, offset, length );
928    req.time_requested = time( NULL );
929    reqListAppend( &msgs->clientWillAskFor, &req );
930    return TR_ADDREQ_OK;
931}
932
933static void
934cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
935{
936    size_t i;
937    struct request_list a = msgs->clientWillAskFor;
938    struct request_list b = msgs->clientAskedFor;
939    dbgmsg( msgs, "cancelling all requests to peer" );
940
941    msgs->clientAskedFor = REQUEST_LIST_INIT;
942    msgs->clientWillAskFor = REQUEST_LIST_INIT;
943
944    for( i=0; i<a.len; ++i )
945        fireCancelledReq( msgs, &a.fifo[i] );
946
947    for( i = 0; i < b.len; ++i ) {
948        fireCancelledReq( msgs, &b.fifo[i] );
949        if( sendCancel )
950            protocolSendCancel( msgs, &b.fifo[i] );
951    }
952
953    reqListClear( &a );
954    reqListClear( &b );
955}
956
957void
958tr_peerMsgsCancel( tr_peermsgs * msgs,
959                   uint32_t      pieceIndex,
960                   uint32_t      offset,
961                   uint32_t      length )
962{
963    struct peer_request req;
964
965    assert( msgs != NULL );
966    assert( length > 0 );
967
968
969    /* have we asked the peer for this piece? */
970    req.index = pieceIndex;
971    req.offset = offset;
972    req.length = length;
973
974    /* if it's only in the queue and hasn't been sent yet, free it */
975    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
976        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
977        fireCancelledReq( msgs, &req );
978    }
979
980    /* if it's already been sent, send a cancel message too */
981    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
982        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
983        protocolSendCancel( msgs, &req );
984        fireCancelledReq( msgs, &req );
985    }
986}
987
988
989/**
990***
991**/
992
993static void
994sendLtepHandshake( tr_peermsgs * msgs )
995{
996    tr_benc val, *m;
997    char * buf;
998    int len;
999    int pex;
1000    struct evbuffer * out = msgs->outMessages;
1001
1002    if( msgs->clientSentLtepHandshake )
1003        return;
1004
1005    dbgmsg( msgs, "sending an ltep handshake" );
1006    msgs->clientSentLtepHandshake = 1;
1007
1008    /* decide if we want to advertise pex support */
1009    if( !tr_torrentAllowsPex( msgs->torrent ) )
1010        pex = 0;
1011    else if( msgs->peerSentLtepHandshake )
1012        pex = msgs->peerSupportsPex ? 1 : 0;
1013    else
1014        pex = 1;
1015
1016    tr_bencInitDict( &val, 5 );
1017    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
1018    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
1019    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1020    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1021    m  = tr_bencDictAddDict( &val, "m", 1 );
1022    if( pex )
1023        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1024    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
1025
1026    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1027    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1028    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1029    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1030    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1031    dbgOutMessageLen( msgs );
1032
1033    /* cleanup */
1034    tr_bencFree( &val );
1035    tr_free( buf );
1036}
1037
1038static void
1039parseLtepHandshake( tr_peermsgs *     msgs,
1040                    int               len,
1041                    struct evbuffer * inbuf )
1042{
1043    int64_t   i;
1044    tr_benc   val, * sub;
1045    uint8_t * tmp = tr_new( uint8_t, len );
1046
1047    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1048    msgs->peerSentLtepHandshake = 1;
1049
1050    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1051    {
1052        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1053        tr_free( tmp );
1054        return;
1055    }
1056
1057    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1058
1059    /* does the peer prefer encrypted connections? */
1060    if( tr_bencDictFindInt( &val, "e", &i ) )
1061        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1062                                              : ENCRYPTION_PREFERENCE_NO;
1063
1064    /* check supported messages for utorrent pex */
1065    msgs->peerSupportsPex = 0;
1066    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1067        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1068            msgs->ut_pex_id = (uint8_t) i;
1069            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1070            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1071        }
1072    }
1073
1074    /* look for upload_only (BEP 21) */
1075    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1076        fireUploadOnly( msgs, i!=0 );
1077
1078    /* get peer's listening port */
1079    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1080        fireClientGotPort( msgs, (tr_port)i );
1081        dbgmsg( msgs, "peer's port is now %d", (int)i );
1082    }
1083
1084    /* get peer's maximum request queue size */
1085    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1086        msgs->reqq = i;
1087
1088    tr_bencFree( &val );
1089    tr_free( tmp );
1090}
1091
1092static void
1093parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1094{
1095    int loaded = 0;
1096    uint8_t * tmp = tr_new( uint8_t, msglen );
1097    tr_benc val;
1098    tr_torrent * tor = msgs->torrent;
1099    const uint8_t * added;
1100    size_t added_len;
1101
1102    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1103
1104    if( tr_torrentAllowsPex( tor )
1105      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1106    {
1107        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1108        {
1109            tr_pex * pex;
1110            size_t i, n;
1111            size_t added_f_len = 0;
1112            const uint8_t * added_f = NULL;
1113
1114            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1115            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1116
1117            n = MIN( n, MAX_PEX_PEER_COUNT );
1118            for( i=0; i<n; ++i )
1119                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1120
1121            tr_free( pex );
1122        }
1123
1124        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1125        {
1126            tr_pex * pex;
1127            size_t i, n;
1128            size_t added_f_len = 0;
1129            const uint8_t * added_f = NULL;
1130
1131            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1132            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1133
1134            n = MIN( n, MAX_PEX_PEER_COUNT );
1135            for( i=0; i<n; ++i )
1136                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1137
1138            tr_free( pex );
1139        }
1140    }
1141
1142    if( loaded )
1143        tr_bencFree( &val );
1144    tr_free( tmp );
1145}
1146
1147static void sendPex( tr_peermsgs * msgs );
1148
1149static void
1150parseLtep( tr_peermsgs *     msgs,
1151           int               msglen,
1152           struct evbuffer * inbuf )
1153{
1154    uint8_t ltep_msgid;
1155
1156    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1157    msglen--;
1158
1159    if( ltep_msgid == LTEP_HANDSHAKE )
1160    {
1161        dbgmsg( msgs, "got ltep handshake" );
1162        parseLtepHandshake( msgs, msglen, inbuf );
1163        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1164        {
1165            sendLtepHandshake( msgs );
1166            sendPex( msgs );
1167        }
1168    }
1169    else if( ltep_msgid == TR_LTEP_PEX )
1170    {
1171        dbgmsg( msgs, "got ut pex" );
1172        msgs->peerSupportsPex = 1;
1173        parseUtPex( msgs, msglen, inbuf );
1174    }
1175    else
1176    {
1177        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1178        evbuffer_drain( inbuf, msglen );
1179    }
1180}
1181
1182static int
1183readBtLength( tr_peermsgs *     msgs,
1184              struct evbuffer * inbuf,
1185              size_t            inlen )
1186{
1187    uint32_t len;
1188
1189    if( inlen < sizeof( len ) )
1190        return READ_LATER;
1191
1192    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1193
1194    if( len == 0 ) /* peer sent us a keepalive message */
1195        dbgmsg( msgs, "got KeepAlive" );
1196    else
1197    {
1198        msgs->incoming.length = len;
1199        msgs->state = AWAITING_BT_ID;
1200    }
1201
1202    return READ_NOW;
1203}
1204
1205static int readBtMessage( tr_peermsgs *     msgs,
1206                          struct evbuffer * inbuf,
1207                          size_t            inlen );
1208
1209static int
1210readBtId( tr_peermsgs *     msgs,
1211          struct evbuffer * inbuf,
1212          size_t            inlen )
1213{
1214    uint8_t id;
1215
1216    if( inlen < sizeof( uint8_t ) )
1217        return READ_LATER;
1218
1219    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1220    msgs->incoming.id = id;
1221    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1222
1223    if( id == BT_PIECE )
1224    {
1225        msgs->state = AWAITING_BT_PIECE;
1226        return READ_NOW;
1227    }
1228    else if( msgs->incoming.length != 1 )
1229    {
1230        msgs->state = AWAITING_BT_MESSAGE;
1231        return READ_NOW;
1232    }
1233    else return readBtMessage( msgs, inbuf, inlen - 1 );
1234}
1235
1236static void
1237updatePeerProgress( tr_peermsgs * msgs )
1238{
1239    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1240    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1241    updateFastSet( msgs );
1242    updateInterest( msgs );
1243    firePeerProgress( msgs );
1244}
1245
1246static void
1247peerMadeRequest( tr_peermsgs *               msgs,
1248                 const struct peer_request * req )
1249{
1250    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1251    const int reqIsValid = requestIsValid( msgs, req );
1252    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1253    const int peerIsChoked = msgs->peer->peerIsChoked;
1254
1255    int allow = FALSE;
1256
1257    if( !reqIsValid )
1258        dbgmsg( msgs, "rejecting an invalid request." );
1259    else if( !clientHasPiece )
1260        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1261    else if( peerIsChoked )
1262        dbgmsg( msgs, "rejecting request from choked peer" );
1263    else
1264        allow = TRUE;
1265
1266    if( allow )
1267        reqListAppend( &msgs->peerAskedFor, req );
1268    else if( fext )
1269        protocolSendReject( msgs, req );
1270}
1271
1272static tr_bool
1273messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1274{
1275    switch( id )
1276    {
1277        case BT_CHOKE:
1278        case BT_UNCHOKE:
1279        case BT_INTERESTED:
1280        case BT_NOT_INTERESTED:
1281        case BT_FEXT_HAVE_ALL:
1282        case BT_FEXT_HAVE_NONE:
1283            return len == 1;
1284
1285        case BT_HAVE:
1286        case BT_FEXT_SUGGEST:
1287        case BT_FEXT_ALLOWED_FAST:
1288            return len == 5;
1289
1290        case BT_BITFIELD:
1291            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1292
1293        case BT_REQUEST:
1294        case BT_CANCEL:
1295        case BT_FEXT_REJECT:
1296            return len == 13;
1297
1298        case BT_PIECE:
1299            return len > 9 && len <= 16393;
1300
1301        case BT_PORT:
1302            return len == 3;
1303
1304        case BT_LTEP:
1305            return len >= 2;
1306
1307        default:
1308            return FALSE;
1309    }
1310}
1311
1312static int clientGotBlock( tr_peermsgs *               msgs,
1313                           const uint8_t *             block,
1314                           const struct peer_request * req );
1315
1316static int
1317readBtPiece( tr_peermsgs      * msgs,
1318             struct evbuffer  * inbuf,
1319             size_t             inlen,
1320             size_t           * setme_piece_bytes_read )
1321{
1322    struct peer_request * req = &msgs->incoming.blockReq;
1323
1324    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1325    dbgmsg( msgs, "In readBtPiece" );
1326
1327    if( !req->length )
1328    {
1329        if( inlen < 8 )
1330            return READ_LATER;
1331
1332        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1333        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1334        req->length = msgs->incoming.length - 9;
1335        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1336        return READ_NOW;
1337    }
1338    else
1339    {
1340        int err;
1341
1342        /* read in another chunk of data */
1343        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1344        size_t n = MIN( nLeft, inlen );
1345        size_t i = n;
1346
1347        while( i > 0 )
1348        {
1349            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1350            const size_t thisPass = MIN( i, sizeof( buf ) );
1351            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1352            evbuffer_add( msgs->incoming.block, buf, thisPass );
1353            i -= thisPass;
1354        }
1355
1356        fireClientGotData( msgs, n, TRUE );
1357        *setme_piece_bytes_read += n;
1358        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1359               n, req->index, req->offset, req->length,
1360               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1361        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1362            return READ_LATER;
1363
1364        /* we've got the whole block ... process it */
1365        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1366
1367        /* cleanup */
1368        evbuffer_free( msgs->incoming.block );
1369        msgs->incoming.block = evbuffer_new( );
1370        req->length = 0;
1371        msgs->state = AWAITING_BT_LENGTH;
1372        return err ? READ_ERR : READ_NOW;
1373    }
1374}
1375
1376static int
1377readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1378{
1379    uint32_t      ui32;
1380    uint32_t      msglen = msgs->incoming.length;
1381    const uint8_t id = msgs->incoming.id;
1382    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1383    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1384
1385    --msglen; /* id length */
1386
1387    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1388
1389    if( inlen < msglen )
1390        return READ_LATER;
1391
1392    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1393    {
1394        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1395        fireError( msgs, EMSGSIZE );
1396        return READ_ERR;
1397    }
1398
1399    switch( id )
1400    {
1401        case BT_CHOKE:
1402            dbgmsg( msgs, "got Choke" );
1403            msgs->peer->clientIsChoked = 1;
1404            if( !fext )
1405                cancelAllRequestsToPeer( msgs, FALSE );
1406            break;
1407
1408        case BT_UNCHOKE:
1409            dbgmsg( msgs, "got Unchoke" );
1410            msgs->peer->clientIsChoked = 0;
1411            fireNeedReq( msgs );
1412            break;
1413
1414        case BT_INTERESTED:
1415            dbgmsg( msgs, "got Interested" );
1416            msgs->peer->peerIsInterested = 1;
1417            break;
1418
1419        case BT_NOT_INTERESTED:
1420            dbgmsg( msgs, "got Not Interested" );
1421            msgs->peer->peerIsInterested = 0;
1422            break;
1423
1424        case BT_HAVE:
1425            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1426            dbgmsg( msgs, "got Have: %u", ui32 );
1427            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1428                fireError( msgs, ERANGE );
1429                return READ_ERR;
1430            }
1431            updatePeerProgress( msgs );
1432            tr_rcTransferred( &msgs->torrent->swarmSpeed,
1433                              msgs->torrent->info.pieceSize );
1434            break;
1435
1436        case BT_BITFIELD:
1437        {
1438            dbgmsg( msgs, "got a bitfield" );
1439            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1440            updatePeerProgress( msgs );
1441            fireNeedReq( msgs );
1442            break;
1443        }
1444
1445        case BT_REQUEST:
1446        {
1447            struct peer_request r;
1448            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1449            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1450            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1451            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1452            peerMadeRequest( msgs, &r );
1453            break;
1454        }
1455
1456        case BT_CANCEL:
1457        {
1458            struct peer_request r;
1459            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1460            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1461            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1462            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1463            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1464                protocolSendReject( msgs, &r );
1465            break;
1466        }
1467
1468        case BT_PIECE:
1469            assert( 0 ); /* handled elsewhere! */
1470            break;
1471
1472        case BT_PORT:
1473            dbgmsg( msgs, "Got a BT_PORT" );
1474            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1475            if( msgs->peer->dht_port > 0 )
1476                tr_dhtAddNode( getSession(msgs),
1477                               tr_peerAddress( msgs->peer ),
1478                               msgs->peer->dht_port, 0 );
1479            break;
1480
1481        case BT_FEXT_SUGGEST:
1482            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1483            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1484            if( fext )
1485                fireClientGotSuggest( msgs, ui32 );
1486            else {
1487                fireError( msgs, EMSGSIZE );
1488                return READ_ERR;
1489            }
1490            break;
1491
1492        case BT_FEXT_ALLOWED_FAST:
1493            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1494            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1495            if( fext )
1496                fireClientGotAllowedFast( msgs, ui32 );
1497            else {
1498                fireError( msgs, EMSGSIZE );
1499                return READ_ERR;
1500            }
1501            break;
1502
1503        case BT_FEXT_HAVE_ALL:
1504            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1505            if( fext ) {
1506                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1507                updatePeerProgress( msgs );
1508            } else {
1509                fireError( msgs, EMSGSIZE );
1510                return READ_ERR;
1511            }
1512            break;
1513
1514        case BT_FEXT_HAVE_NONE:
1515            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1516            if( fext ) {
1517                tr_bitfieldClear( msgs->peer->have );
1518                updatePeerProgress( msgs );
1519            } else {
1520                fireError( msgs, EMSGSIZE );
1521                return READ_ERR;
1522            }
1523            break;
1524
1525        case BT_FEXT_REJECT:
1526        {
1527            struct peer_request r;
1528            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1529            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1530            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1531            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1532            if( fext )
1533                reqListRemove( &msgs->clientAskedFor, &r );
1534            else {
1535                fireError( msgs, EMSGSIZE );
1536                return READ_ERR;
1537            }
1538            break;
1539        }
1540
1541        case BT_LTEP:
1542            dbgmsg( msgs, "Got a BT_LTEP" );
1543            parseLtep( msgs, msglen, inbuf );
1544            break;
1545
1546        default:
1547            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1548            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1549            break;
1550    }
1551
1552    assert( msglen + 1 == msgs->incoming.length );
1553    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1554
1555    msgs->state = AWAITING_BT_LENGTH;
1556    return READ_NOW;
1557}
1558
1559static TR_INLINE void
1560decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1561{
1562    tr_torrent * tor = msgs->torrent;
1563
1564    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1565}
1566
1567static TR_INLINE void
1568clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1569{
1570#ifdef TRACK_DUPES
1571static double unwantedGotten = 0.0;
1572fprintf( stderr, "dupe ratio: %f\n", ++unwantedGotten / blocksGotten );
1573#endif
1574    decrementDownloadedCount( msgs, req->length );
1575}
1576
1577static void
1578addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1579{
1580    if( !msgs->peer->blame )
1581         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1582    tr_bitfieldAdd( msgs->peer->blame, index );
1583}
1584
1585/* returns 0 on success, or an errno on failure */
1586static int
1587clientGotBlock( tr_peermsgs *               msgs,
1588                const uint8_t *             data,
1589                const struct peer_request * req )
1590{
1591    int err;
1592    tr_torrent * tor = msgs->torrent;
1593    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1594
1595    assert( msgs );
1596    assert( req );
1597
1598    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1599        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1600                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1601        return EMSGSIZE;
1602    }
1603
1604    /* save the block */
1605    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1606
1607    /**
1608    *** Remove the block from our `we asked for this' list
1609    **/
1610
1611    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1612        clientGotUnwantedBlock( msgs, req );
1613        dbgmsg( msgs, "we didn't ask for this message..." );
1614        return 0;
1615    }
1616
1617    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1618            msgs->clientAskedFor.len );
1619
1620    /**
1621    *** Error checks
1622    **/
1623
1624    if( tr_cpBlockIsComplete( &tor->completion, block ) ) {
1625        dbgmsg( msgs, "we have this block already..." );
1626        clientGotUnwantedBlock( msgs, req );
1627        return 0;
1628    }
1629
1630    /**
1631    ***  Save the block
1632    **/
1633
1634    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1635        return err;
1636
1637    addPeerToBlamefield( msgs, req->index );
1638    fireGotBlock( msgs, req );
1639    return 0;
1640}
1641
1642static int peerPulse( void * vmsgs );
1643
1644static void
1645didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1646{
1647    tr_peermsgs * msgs = vmsgs;
1648    firePeerGotData( msgs, bytesWritten, wasPieceData );
1649
1650    if ( tr_isPeerIo( io ) && io->userData )
1651        peerPulse( msgs );
1652}
1653
1654static ReadState
1655canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1656{
1657    ReadState         ret;
1658    tr_peermsgs *     msgs = vmsgs;
1659    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1660    const size_t      inlen = EVBUFFER_LENGTH( in );
1661
1662    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1663
1664    if( !inlen )
1665    {
1666        ret = READ_LATER;
1667    }
1668    else if( msgs->state == AWAITING_BT_PIECE )
1669    {
1670        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1671    }
1672    else switch( msgs->state )
1673    {
1674        case AWAITING_BT_LENGTH:
1675            ret = readBtLength ( msgs, in, inlen ); break;
1676
1677        case AWAITING_BT_ID:
1678            ret = readBtId     ( msgs, in, inlen ); break;
1679
1680        case AWAITING_BT_MESSAGE:
1681            ret = readBtMessage( msgs, in, inlen ); break;
1682
1683        default:
1684            ret = READ_ERR;
1685            assert( 0 );
1686    }
1687
1688    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1689
1690    /* log the raw data that was read */
1691    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1692        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1693
1694    return ret;
1695}
1696
1697/**
1698***
1699**/
1700
1701static int
1702ratePulse( tr_peermsgs * msgs, uint64_t now )
1703{
1704    int irate;
1705    const int floor = 8;
1706    const int seconds = 10;
1707    double rate;
1708    int estimatedBlocksInPeriod;
1709    const tr_torrent * const torrent = msgs->torrent;
1710
1711    /* Get the rate limit we should use.
1712     * FIXME: this needs to consider all the other peers as well... */
1713    rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1714    if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1715        rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
1716    if( tr_torrentUsesSessionLimits( torrent ) )
1717        if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
1718            rate = MIN( rate, irate );
1719
1720    estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
1721    msgs->maxActiveRequests = MAX( floor, estimatedBlocksInPeriod );
1722
1723    if( msgs->reqq > 0 )
1724        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1725
1726    return TRUE;
1727}
1728
1729static size_t
1730fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1731{
1732    size_t bytesWritten = 0;
1733    struct peer_request req;
1734    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1735    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1736
1737    /**
1738    ***  Protocol messages
1739    **/
1740
1741    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1742    {
1743        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1744        msgs->outMessagesBatchedAt = now;
1745    }
1746    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1747    {
1748        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1749        /* flush the protocol messages */
1750        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1751        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1752        msgs->clientSentAnythingAt = now;
1753        msgs->outMessagesBatchedAt = 0;
1754        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1755        bytesWritten +=  len;
1756    }
1757
1758    /**
1759    ***  Blocks
1760    **/
1761
1762    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1763        && popNextRequest( msgs, &req ) )
1764    {
1765        if( requestIsValid( msgs, &req )
1766            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1767        {
1768            /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1769            int err;
1770            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1771            struct evbuffer * out;
1772            tr_peerIo * io = msgs->peer->io;
1773
1774            out = evbuffer_new( );
1775            evbuffer_expand( out, msglen );
1776
1777            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1778            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1779            tr_peerIoWriteUint32( io, out, req.index );
1780            tr_peerIoWriteUint32( io, out, req.offset );
1781
1782            err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1783            if( err )
1784            {
1785                if( fext )
1786                    protocolSendReject( msgs, &req );
1787            }
1788            else
1789            {
1790                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1791                EVBUFFER_LENGTH(out) += req.length;
1792                assert( EVBUFFER_LENGTH( out ) == msglen );
1793                tr_peerIoWriteBuf( io, out, TRUE );
1794                bytesWritten += EVBUFFER_LENGTH( out );
1795                msgs->clientSentAnythingAt = now;
1796            }
1797
1798            evbuffer_free( out );
1799
1800            if( err )
1801            {
1802                bytesWritten = 0;
1803                msgs = NULL;
1804            }
1805        }
1806        else if( fext ) /* peer needs a reject message */
1807        {
1808            protocolSendReject( msgs, &req );
1809        }
1810    }
1811
1812    /**
1813    ***  Keepalive
1814    **/
1815
1816    if( ( msgs != NULL )
1817        && ( msgs->clientSentAnythingAt != 0 )
1818        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1819    {
1820        dbgmsg( msgs, "sending a keepalive message" );
1821        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1822        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1823    }
1824
1825    return bytesWritten;
1826}
1827
1828static int
1829peerPulse( void * vmsgs )
1830{
1831    tr_peermsgs * msgs = vmsgs;
1832    const time_t  now = time( NULL );
1833
1834    ratePulse( msgs, now );
1835
1836    pumpRequestQueue( msgs, now );
1837    expireOldRequests( msgs, now );
1838
1839    for( ;; )
1840        if( fillOutputBuffer( msgs, now ) < 1 )
1841            break;
1842
1843    return TRUE; /* loop forever */
1844}
1845
1846void
1847tr_peerMsgsPulse( tr_peermsgs * msgs )
1848{
1849    if( msgs != NULL )
1850        peerPulse( msgs );
1851}
1852
1853static void
1854gotError( tr_peerIo  * io UNUSED,
1855          short        what,
1856          void       * vmsgs )
1857{
1858    if( what & EVBUFFER_TIMEOUT )
1859        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1860    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1861        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1862               what, errno, tr_strerror( errno ) );
1863    fireError( vmsgs, ENOTCONN );
1864}
1865
1866static void
1867sendBitfield( tr_peermsgs * msgs )
1868{
1869    struct evbuffer * out = msgs->outMessages;
1870    tr_bitfield *     field;
1871    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1872    size_t            i;
1873    size_t            lazyCount = 0;
1874
1875    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
1876
1877    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
1878    {
1879        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1880            speed over a truly random sample -- let's limit the pool size to
1881            the first 1000 pieces so large torrents don't bog things down */
1882        size_t poolSize;
1883        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1884        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1885
1886        /* build the pool */
1887        for( i=poolSize=0; i<maxPoolSize; ++i )
1888            if( tr_bitfieldHas( field, i ) )
1889                pool[poolSize++] = i;
1890
1891        /* pull random piece indices from the pool */
1892        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1893        {
1894            const int pos = tr_cryptoWeakRandInt( poolSize );
1895            const tr_piece_index_t piece = pool[pos];
1896            tr_bitfieldRem( field, piece );
1897            lazyPieces[lazyCount++] = piece;
1898            pool[pos] = pool[--poolSize];
1899        }
1900
1901        /* cleanup */
1902        tr_free( pool );
1903    }
1904
1905    tr_peerIoWriteUint32( msgs->peer->io, out,
1906                          sizeof( uint8_t ) + field->byteCount );
1907    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1908    /* FIXME(libevent2): use evbuffer_add_reference() */
1909    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1910    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1911            EVBUFFER_LENGTH( out ) );
1912    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1913
1914    for( i = 0; i < lazyCount; ++i )
1915        protocolSendHave( msgs, lazyPieces[i] );
1916
1917    tr_bitfieldFree( field );
1918}
1919
1920static void
1921tellPeerWhatWeHave( tr_peermsgs * msgs )
1922{
1923    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1924
1925    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
1926    {
1927        protocolSendHaveAll( msgs );
1928    }
1929    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
1930    {
1931        protocolSendHaveNone( msgs );
1932    }
1933    else
1934    {
1935        sendBitfield( msgs );
1936    }
1937}
1938
1939/**
1940***
1941**/
1942
1943/* some peers give us error messages if we send
1944   more than this many peers in a single pex message
1945   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1946#define MAX_PEX_ADDED 50
1947#define MAX_PEX_DROPPED 50
1948
1949typedef struct
1950{
1951    tr_pex *  added;
1952    tr_pex *  dropped;
1953    tr_pex *  elements;
1954    int       addedCount;
1955    int       droppedCount;
1956    int       elementCount;
1957}
1958PexDiffs;
1959
1960static void
1961pexAddedCb( void * vpex,
1962            void * userData )
1963{
1964    PexDiffs * diffs = userData;
1965    tr_pex *   pex = vpex;
1966
1967    if( diffs->addedCount < MAX_PEX_ADDED )
1968    {
1969        diffs->added[diffs->addedCount++] = *pex;
1970        diffs->elements[diffs->elementCount++] = *pex;
1971    }
1972}
1973
1974static TR_INLINE void
1975pexDroppedCb( void * vpex,
1976              void * userData )
1977{
1978    PexDiffs * diffs = userData;
1979    tr_pex *   pex = vpex;
1980
1981    if( diffs->droppedCount < MAX_PEX_DROPPED )
1982    {
1983        diffs->dropped[diffs->droppedCount++] = *pex;
1984    }
1985}
1986
1987static TR_INLINE void
1988pexElementCb( void * vpex,
1989              void * userData )
1990{
1991    PexDiffs * diffs = userData;
1992    tr_pex * pex = vpex;
1993
1994    diffs->elements[diffs->elementCount++] = *pex;
1995}
1996
1997static void
1998sendPex( tr_peermsgs * msgs )
1999{
2000    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2001    {
2002        PexDiffs diffs;
2003        PexDiffs diffs6;
2004        tr_pex * newPex = NULL;
2005        tr_pex * newPex6 = NULL;
2006        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, MAX_PEX_PEER_COUNT );
2007        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, MAX_PEX_PEER_COUNT );
2008
2009        /* build the diffs */
2010        diffs.added = tr_new( tr_pex, newCount );
2011        diffs.addedCount = 0;
2012        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2013        diffs.droppedCount = 0;
2014        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2015        diffs.elementCount = 0;
2016        tr_set_compare( msgs->pex, msgs->pexCount,
2017                        newPex, newCount,
2018                        tr_pexCompare, sizeof( tr_pex ),
2019                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2020        diffs6.added = tr_new( tr_pex, newCount6 );
2021        diffs6.addedCount = 0;
2022        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2023        diffs6.droppedCount = 0;
2024        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2025        diffs6.elementCount = 0;
2026        tr_set_compare( msgs->pex6, msgs->pexCount6,
2027                        newPex6, newCount6,
2028                        tr_pexCompare, sizeof( tr_pex ),
2029                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2030        dbgmsg(
2031            msgs,
2032            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2033            msgs->pexCount, newCount + newCount6,
2034            diffs.addedCount + diffs6.addedCount,
2035            diffs.droppedCount + diffs6.droppedCount );
2036
2037        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2038            !diffs6.droppedCount )
2039        {
2040            tr_free( diffs.elements );
2041            tr_free( diffs6.elements );
2042        }
2043        else
2044        {
2045            int  i;
2046            tr_benc val;
2047            char * benc;
2048            int bencLen;
2049            uint8_t * tmp, *walk;
2050            tr_peerIo       * io  = msgs->peer->io;
2051            struct evbuffer * out = msgs->outMessages;
2052
2053            /* update peer */
2054            tr_free( msgs->pex );
2055            msgs->pex = diffs.elements;
2056            msgs->pexCount = diffs.elementCount;
2057            tr_free( msgs->pex6 );
2058            msgs->pex6 = diffs6.elements;
2059            msgs->pexCount6 = diffs6.elementCount;
2060
2061            /* build the pex payload */
2062            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2063                                         * speed vs. likelihood? */
2064
2065            /* "added" */
2066            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2067            for( i = 0; i < diffs.addedCount; ++i )
2068            {
2069                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2070                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2071            }
2072            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2073            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2074            tr_free( tmp );
2075
2076            /* "added.f" */
2077            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2078            for( i = 0; i < diffs.addedCount; ++i )
2079                *walk++ = diffs.added[i].flags;
2080            assert( ( walk - tmp ) == diffs.addedCount );
2081            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2082            tr_free( tmp );
2083
2084            /* "dropped" */
2085            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2086            for( i = 0; i < diffs.droppedCount; ++i )
2087            {
2088                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2089                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2090            }
2091            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2092            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2093            tr_free( tmp );
2094
2095            /* "added6" */
2096            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2097            for( i = 0; i < diffs6.addedCount; ++i )
2098            {
2099                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2100                walk += 16;
2101                memcpy( walk, &diffs6.added[i].port, 2 );
2102                walk += 2;
2103            }
2104            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2105            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2106            tr_free( tmp );
2107
2108            /* "added6.f" */
2109            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2110            for( i = 0; i < diffs6.addedCount; ++i )
2111                *walk++ = diffs6.added[i].flags;
2112            assert( ( walk - tmp ) == diffs6.addedCount );
2113            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2114            tr_free( tmp );
2115
2116            /* "dropped6" */
2117            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2118            for( i = 0; i < diffs6.droppedCount; ++i )
2119            {
2120                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2121                walk += 16;
2122                memcpy( walk, &diffs6.dropped[i].port, 2 );
2123                walk += 2;
2124            }
2125            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2126            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2127            tr_free( tmp );
2128
2129            /* write the pex message */
2130            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2131            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2132            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2133            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2134            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2135            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2136            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2137            dbgOutMessageLen( msgs );
2138
2139            tr_free( benc );
2140            tr_bencFree( &val );
2141        }
2142
2143        /* cleanup */
2144        tr_free( diffs.added );
2145        tr_free( diffs.dropped );
2146        tr_free( newPex );
2147        tr_free( diffs6.added );
2148        tr_free( diffs6.dropped );
2149        tr_free( newPex6 );
2150
2151        /*msgs->clientSentPexAt = time( NULL );*/
2152    }
2153}
2154
2155static void
2156pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2157{
2158    struct tr_peermsgs * msgs = vmsgs;
2159
2160    sendPex( msgs );
2161
2162    tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2163}
2164
2165/**
2166***
2167**/
2168
2169tr_peermsgs*
2170tr_peerMsgsNew( struct tr_torrent * torrent,
2171                struct tr_peer    * peer,
2172                tr_delivery_func    func,
2173                void              * userData,
2174                tr_publisher_tag  * setme )
2175{
2176    tr_peermsgs * m;
2177
2178    assert( peer );
2179    assert( peer->io );
2180
2181    m = tr_new0( tr_peermsgs, 1 );
2182    m->publisher = TR_PUBLISHER_INIT;
2183    m->peer = peer;
2184    m->torrent = torrent;
2185    m->peer->clientIsChoked = 1;
2186    m->peer->peerIsChoked = 1;
2187    m->peer->clientIsInterested = 0;
2188    m->peer->peerIsInterested = 0;
2189    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2190    m->state = AWAITING_BT_LENGTH;
2191    m->outMessages = evbuffer_new( );
2192    m->outMessagesBatchedAt = 0;
2193    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2194    m->incoming.block = evbuffer_new( );
2195    m->peerAskedFor = REQUEST_LIST_INIT;
2196    m->clientAskedFor = REQUEST_LIST_INIT;
2197    m->clientWillAskFor = REQUEST_LIST_INIT;
2198    evtimer_set( &m->pexTimer, pexPulse, m );
2199    tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2200    peer->msgs = m;
2201
2202    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2203
2204    if( tr_peerIoSupportsLTEP( peer->io ) )
2205        sendLtepHandshake( m );
2206
2207    if(tr_peerIoSupportsDHT(peer->io))
2208        protocolSendPort(m, tr_dhtPort(torrent->session));
2209
2210    tellPeerWhatWeHave( m );
2211
2212    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2213    ratePulse( m, tr_date() );
2214
2215    return m;
2216}
2217
2218void
2219tr_peerMsgsFree( tr_peermsgs* msgs )
2220{
2221    if( msgs )
2222    {
2223        evtimer_del( &msgs->pexTimer );
2224        tr_publisherDestruct( &msgs->publisher );
2225        reqListClear( &msgs->clientWillAskFor );
2226        reqListClear( &msgs->clientAskedFor );
2227        reqListClear( &msgs->peerAskedFor );
2228
2229        evbuffer_free( msgs->incoming.block );
2230        evbuffer_free( msgs->outMessages );
2231        tr_free( msgs->pex6 );
2232        tr_free( msgs->pex );
2233
2234        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2235        tr_free( msgs );
2236    }
2237}
2238
2239void
2240tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2241                        tr_publisher_tag tag )
2242{
2243    tr_publisherUnsubscribe( &peer->publisher, tag );
2244}
2245
Note: See TracBrowser for help on using the repository browser.