source: trunk/libtransmission/peer-msgs.c @ 12304

Last change on this file since 12304 was 12304, checked in by jordan, 11 years ago

(trunk libT) avoid an unnecessary malloc() + free() call when we finish downloading a block.

Pre-libevent2, this free() was useful in helping keep the peer's incoming piece data buffer from growing too large because that could be a significant amount of wasted space given enough peers. However now that we're using the libevent2 code, that piece data buffer gets handed off to the block cache, so most of the time we're freeing an evbuffer that doesn't have any inernal chains allocated anyway.

  • Property svn:keywords set to Date Rev Author Id
File size: 70.9 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 12304 2011-04-04 05:15:54Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <event2/buffer.h>
20#include <event2/bufferevent.h>
21#include <event2/event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "cache.h"
26#include "completion.h"
27#include "crypto.h" /* tr_sha1() */
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-msgs.h"
31#include "session.h"
32#include "torrent.h"
33#include "torrent-magnet.h"
34#include "tr-dht.h"
35#include "utils.h"
36#include "version.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54
55    BT_FEXT_SUGGEST         = 13,
56    BT_FEXT_HAVE_ALL        = 14,
57    BT_FEXT_HAVE_NONE       = 15,
58    BT_FEXT_REJECT          = 16,
59    BT_FEXT_ALLOWED_FAST    = 17,
60
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    UT_PEX_ID               = 1,
66    UT_METADATA_ID          = 3,
67
68    MAX_PEX_PEER_COUNT      = 50,
69
70    MIN_CHOKE_PERIOD_SEC    = 10,
71
72    /* idle seconds before we send a keepalive */
73    KEEPALIVE_INTERVAL_SECS = 100,
74
75    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
76
77    REQQ                    = 512,
78
79    METADATA_REQQ           = 64,
80
81    /* used in lowering the outMessages queue period */
82    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
83    HIGH_PRIORITY_INTERVAL_SECS = 2,
84    LOW_PRIORITY_INTERVAL_SECS = 10,
85
86    /* number of pieces we'll allow in our fast set */
87    MAX_FAST_SET_SIZE = 3,
88
89    /* defined in BEP #9 */
90    METADATA_MSG_TYPE_REQUEST = 0,
91    METADATA_MSG_TYPE_DATA = 1,
92    METADATA_MSG_TYPE_REJECT = 2
93};
94
95enum
96{
97    AWAITING_BT_LENGTH,
98    AWAITING_BT_ID,
99    AWAITING_BT_MESSAGE,
100    AWAITING_BT_PIECE
101};
102
103/**
104***
105**/
106
107struct peer_request
108{
109    uint32_t    index;
110    uint32_t    offset;
111    uint32_t    length;
112};
113
114static void
115blockToReq( const tr_torrent     * tor,
116            tr_block_index_t       block,
117            struct peer_request  * setme )
118{
119    tr_torrentGetBlockLocation( tor, block, &setme->index,
120                                            &setme->offset,
121                                            &setme->length );
122}
123
124/**
125***
126**/
127
128/* this is raw, unchanged data from the peer regarding
129 * the current message that it's sending us. */
130struct tr_incoming
131{
132    uint8_t                id;
133    uint32_t               length; /* includes the +1 for id length */
134    struct peer_request    blockReq; /* metadata for incoming blocks */
135    struct evbuffer *      block; /* piece data for incoming blocks */
136};
137
138/**
139 * Low-level communication state information about a connected peer.
140 *
141 * This structure remembers the low-level protocol states that we're
142 * in with this peer, such as active requests, pex messages, and so on.
143 * Its fields are all private to peer-msgs.c.
144 *
145 * Data not directly involved with sending & receiving messages is
146 * stored in tr_peer, where it can be accessed by both peermsgs and
147 * the peer manager.
148 *
149 * @see struct peer_atom
150 * @see tr_peer
151 */
152struct tr_peermsgs
153{
154    bool            peerSupportsPex;
155    bool            peerSupportsMetadataXfer;
156    bool            clientSentLtepHandshake;
157    bool            peerSentLtepHandshake;
158
159    /*bool          haveFastSet;*/
160
161    int             desiredRequestCount;
162
163    int             prefetchCount;
164
165    /* how long the outMessages batch should be allowed to grow before
166     * it's flushed -- some messages (like requests >:) should be sent
167     * very quickly; others aren't as urgent. */
168    int8_t          outMessagesBatchPeriod;
169
170    uint8_t         state;
171    uint8_t         ut_pex_id;
172    uint8_t         ut_metadata_id;
173    uint16_t        pexCount;
174    uint16_t        pexCount6;
175
176    size_t          metadata_size_hint;
177#if 0
178    size_t                 fastsetSize;
179    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
180#endif
181
182    tr_peer *              peer;
183
184    tr_torrent *           torrent;
185
186    tr_peer_callback      * callback;
187    void                  * callbackData;
188
189    struct evbuffer *      outMessages; /* all the non-piece messages */
190
191    struct peer_request    peerAskedFor[REQQ];
192
193    int                    peerAskedForMetadata[METADATA_REQQ];
194    int                    peerAskedForMetadataCount;
195
196    tr_pex               * pex;
197    tr_pex               * pex6;
198
199    /*time_t                 clientSentPexAt;*/
200    time_t                 clientSentAnythingAt;
201
202    /* when we started batching the outMessages */
203    time_t                outMessagesBatchedAt;
204
205    struct tr_incoming    incoming;
206
207    /* if the peer supports the Extension Protocol in BEP 10 and
208       supplied a reqq argument, it's stored here. Otherwise, the
209       value is zero and should be ignored. */
210    int64_t               reqq;
211
212    struct event        * pexTimer;
213};
214
215/**
216***
217**/
218
219static inline tr_session*
220getSession( struct tr_peermsgs * msgs )
221{
222    return msgs->torrent->session;
223}
224
225/**
226***
227**/
228
229static void
230myDebug( const char * file, int line,
231         const struct tr_peermsgs * msgs,
232         const char * fmt, ... )
233{
234    FILE * fp = tr_getLog( );
235
236    if( fp )
237    {
238        va_list           args;
239        char              timestr[64];
240        struct evbuffer * buf = evbuffer_new( );
241        char *            base = tr_basename( file );
242
243        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
244                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
245                             tr_torrentName( msgs->torrent ),
246                             tr_peerIoGetAddrStr( msgs->peer->io ),
247                             msgs->peer->client );
248        va_start( args, fmt );
249        evbuffer_add_vprintf( buf, fmt, args );
250        va_end( args );
251        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
252        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
253
254        tr_free( base );
255        evbuffer_free( buf );
256    }
257}
258
259#define dbgmsg( msgs, ... ) \
260    do { \
261        if( tr_deepLoggingIsActive( ) ) \
262            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
263    } while( 0 )
264
265/**
266***
267**/
268
269static void
270pokeBatchPeriod( tr_peermsgs * msgs, int interval )
271{
272    if( msgs->outMessagesBatchPeriod > interval )
273    {
274        msgs->outMessagesBatchPeriod = interval;
275        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
276    }
277}
278
279static void
280dbgOutMessageLen( tr_peermsgs * msgs )
281{
282    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
283}
284
285static void
286protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
287{
288    struct evbuffer * out = msgs->outMessages;
289
290    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
291
292    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
293    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
294    evbuffer_add_uint32( out, req->index );
295    evbuffer_add_uint32( out, req->offset );
296    evbuffer_add_uint32( out, req->length );
297
298    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
299    dbgOutMessageLen( msgs );
300}
301
302static void
303protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
304{
305    struct evbuffer * out = msgs->outMessages;
306
307    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
308    evbuffer_add_uint8 ( out, BT_REQUEST );
309    evbuffer_add_uint32( out, req->index );
310    evbuffer_add_uint32( out, req->offset );
311    evbuffer_add_uint32( out, req->length );
312
313    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
314    dbgOutMessageLen( msgs );
315    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
316}
317
318static void
319protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
320{
321    struct evbuffer * out = msgs->outMessages;
322
323    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
324    evbuffer_add_uint8 ( out, BT_CANCEL );
325    evbuffer_add_uint32( out, req->index );
326    evbuffer_add_uint32( out, req->offset );
327    evbuffer_add_uint32( out, req->length );
328
329    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
330    dbgOutMessageLen( msgs );
331    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
332}
333
334static void
335protocolSendPort(tr_peermsgs *msgs, uint16_t port)
336{
337    struct evbuffer * out = msgs->outMessages;
338
339    dbgmsg( msgs, "sending Port %u", port);
340    evbuffer_add_uint32( out, 3 );
341    evbuffer_add_uint8 ( out, BT_PORT );
342    evbuffer_add_uint16( out, port);
343}
344
345static void
346protocolSendHave( tr_peermsgs * msgs, uint32_t index )
347{
348    struct evbuffer * out = msgs->outMessages;
349
350    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
351    evbuffer_add_uint8 ( out, BT_HAVE );
352    evbuffer_add_uint32( out, index );
353
354    dbgmsg( msgs, "sending Have %u", index );
355    dbgOutMessageLen( msgs );
356    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
357}
358
359#if 0
360static void
361protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
362{
363    tr_peerIo       * io  = msgs->peer->io;
364    struct evbuffer * out = msgs->outMessages;
365
366    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
367
368    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
369    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
370    evbuffer_add_uint32( io, out, pieceIndex );
371
372    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
373    dbgOutMessageLen( msgs );
374}
375#endif
376
377static void
378protocolSendChoke( tr_peermsgs * msgs, int choke )
379{
380    struct evbuffer * out = msgs->outMessages;
381
382    evbuffer_add_uint32( out, sizeof( uint8_t ) );
383    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
384
385    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
386    dbgOutMessageLen( msgs );
387    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
388}
389
390static void
391protocolSendHaveAll( tr_peermsgs * msgs )
392{
393    struct evbuffer * out = msgs->outMessages;
394
395    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
396
397    evbuffer_add_uint32( out, sizeof( uint8_t ) );
398    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
399
400    dbgmsg( msgs, "sending HAVE_ALL..." );
401    dbgOutMessageLen( msgs );
402    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
403}
404
405static void
406protocolSendHaveNone( tr_peermsgs * msgs )
407{
408    struct evbuffer * out = msgs->outMessages;
409
410    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
411
412    evbuffer_add_uint32( out, sizeof( uint8_t ) );
413    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
414
415    dbgmsg( msgs, "sending HAVE_NONE..." );
416    dbgOutMessageLen( msgs );
417    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
418}
419
420/**
421***  EVENTS
422**/
423
424static void
425publish( tr_peermsgs * msgs, tr_peer_event * e )
426{
427    assert( msgs->peer );
428    assert( msgs->peer->msgs == msgs );
429
430    if( msgs->callback != NULL )
431        msgs->callback( msgs->peer, e, msgs->callbackData );
432}
433
434static void
435fireError( tr_peermsgs * msgs, int err )
436{
437    tr_peer_event e = TR_PEER_EVENT_INIT;
438    e.eventType = TR_PEER_ERROR;
439    e.err = err;
440    publish( msgs, &e );
441}
442
443static void
444fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
445{
446    tr_peer_event e = TR_PEER_EVENT_INIT;
447    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
448    e.pieceIndex = req->index;
449    e.offset = req->offset;
450    e.length = req->length;
451    publish( msgs, &e );
452}
453
454static void
455fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
456{
457    tr_peer_event e = TR_PEER_EVENT_INIT;
458    e.eventType = TR_PEER_CLIENT_GOT_REJ;
459    e.pieceIndex = req->index;
460    e.offset = req->offset;
461    e.length = req->length;
462    publish( msgs, &e );
463}
464
465static void
466fireGotChoke( tr_peermsgs * msgs )
467{
468    tr_peer_event e = TR_PEER_EVENT_INIT;
469    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
470    publish( msgs, &e );
471}
472
473static void
474fireClientGotHaveAll( tr_peermsgs * msgs )
475{
476    tr_peer_event e = TR_PEER_EVENT_INIT;
477    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
478    publish( msgs, &e );
479}
480
481static void
482fireClientGotHaveNone( tr_peermsgs * msgs )
483{
484    tr_peer_event e = TR_PEER_EVENT_INIT;
485    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
486    publish( msgs, &e );
487}
488
489static void
490fireClientGotData( tr_peermsgs * msgs, uint32_t length, int wasPieceData )
491{
492    tr_peer_event e = TR_PEER_EVENT_INIT;
493
494    e.length = length;
495    e.eventType = TR_PEER_CLIENT_GOT_DATA;
496    e.wasPieceData = wasPieceData;
497    publish( msgs, &e );
498}
499
500static void
501fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
502{
503    tr_peer_event e = TR_PEER_EVENT_INIT;
504    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
505    e.pieceIndex = pieceIndex;
506    publish( msgs, &e );
507}
508
509static void
510fireClientGotPort( tr_peermsgs * msgs, tr_port port )
511{
512    tr_peer_event e = TR_PEER_EVENT_INIT;
513    e.eventType = TR_PEER_CLIENT_GOT_PORT;
514    e.port = port;
515    publish( msgs, &e );
516}
517
518static void
519fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
520{
521    tr_peer_event e = TR_PEER_EVENT_INIT;
522    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
523    e.pieceIndex = pieceIndex;
524    publish( msgs, &e );
525}
526
527static void
528fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
529{
530    tr_peer_event e = TR_PEER_EVENT_INIT;
531    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
532    e.bitfield = bitfield;
533    publish( msgs, &e );
534}
535
536static void
537fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
538{
539    tr_peer_event e = TR_PEER_EVENT_INIT;
540    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
541    e.pieceIndex = index;
542    publish( msgs, &e );
543}
544
545static void
546firePeerGotData( tr_peermsgs * msgs, uint32_t length, bool wasPieceData )
547{
548    tr_peer_event e = TR_PEER_EVENT_INIT;
549
550    e.length = length;
551    e.eventType = TR_PEER_PEER_GOT_DATA;
552    e.wasPieceData = wasPieceData;
553
554    publish( msgs, &e );
555}
556
557/**
558***  ALLOWED FAST SET
559***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
560**/
561
562#if 0
563size_t
564tr_generateAllowedSet( tr_piece_index_t * setmePieces,
565                       size_t             desiredSetSize,
566                       size_t             pieceCount,
567                       const uint8_t    * infohash,
568                       const tr_address * addr )
569{
570    size_t setSize = 0;
571
572    assert( setmePieces );
573    assert( desiredSetSize <= pieceCount );
574    assert( desiredSetSize );
575    assert( pieceCount );
576    assert( infohash );
577    assert( addr );
578
579    if( addr->type == TR_AF_INET )
580    {
581        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
582        uint8_t x[SHA_DIGEST_LENGTH];
583
584        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
585        memcpy( w, &ui32, sizeof( uint32_t ) );
586        walk += sizeof( uint32_t );
587        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
588        walk += SHA_DIGEST_LENGTH;
589        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
590        assert( sizeof( w ) == walk-w );
591
592        while( setSize<desiredSetSize )
593        {
594            int i;
595            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
596            {
597                size_t k;
598                uint32_t j = i * 4;                                  /* (5) */
599                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
600                uint32_t index = y % pieceCount;                     /* (7) */
601
602                for( k=0; k<setSize; ++k )                           /* (8) */
603                    if( setmePieces[k] == index )
604                        break;
605
606                if( k == setSize )
607                    setmePieces[setSize++] = index;                  /* (9) */
608            }
609
610            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
611        }
612    }
613
614    return setSize;
615}
616
617static void
618updateFastSet( tr_peermsgs * msgs UNUSED )
619{
620    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
621    const int peerIsNeedy = msgs->peer->progress < 0.10;
622
623    if( fext && peerIsNeedy && !msgs->haveFastSet )
624    {
625        size_t i;
626        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
627        const tr_info * inf = &msgs->torrent->info;
628        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
629
630        /* build the fast set */
631        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
632        msgs->haveFastSet = 1;
633
634        /* send it to the peer */
635        for( i=0; i<msgs->fastsetSize; ++i )
636            protocolSendAllowedFast( msgs, msgs->fastset[i] );
637    }
638}
639#endif
640
641/**
642***  INTEREST
643**/
644
645static void
646sendInterest( tr_peermsgs * msgs, bool clientIsInterested )
647{
648    struct evbuffer * out = msgs->outMessages;
649
650    assert( msgs );
651    assert( tr_isBool( clientIsInterested ) );
652
653    msgs->peer->clientIsInterested = clientIsInterested;
654    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
655    evbuffer_add_uint32( out, sizeof( uint8_t ) );
656    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
657
658    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
659    dbgOutMessageLen( msgs );
660}
661
662static void
663updateInterest( tr_peermsgs * msgs UNUSED )
664{
665    /* FIXME -- might need to poke the mgr on startup */
666}
667
668void
669tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
670{
671    assert( tr_isBool( isInterested ) );
672
673    if( isInterested != msgs->peer->clientIsInterested )
674        sendInterest( msgs, isInterested );
675}
676
677static bool
678popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
679{
680    if( msgs->peerAskedForMetadataCount == 0 )
681        return false;
682
683    *piece = msgs->peerAskedForMetadata[0];
684
685    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
686                               msgs->peerAskedForMetadataCount-- );
687
688    return true;
689}
690
691static bool
692popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
693{
694    if( msgs->peer->pendingReqsToClient == 0 )
695        return false;
696
697    *setme = msgs->peerAskedFor[0];
698
699    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
700                               msgs->peer->pendingReqsToClient-- );
701
702    return true;
703}
704
705static void
706cancelAllRequestsToClient( tr_peermsgs * msgs )
707{
708    struct peer_request req;
709    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
710
711    while( popNextRequest( msgs, &req ))
712        if( mustSendCancel )
713            protocolSendReject( msgs, &req );
714}
715
716void
717tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
718{
719    const time_t now = tr_time( );
720    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
721
722    assert( msgs );
723    assert( msgs->peer );
724    assert( choke == 0 || choke == 1 );
725
726    if( msgs->peer->chokeChangedAt > fibrillationTime )
727    {
728        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
729    }
730    else if( msgs->peer->peerIsChoked != choke )
731    {
732        msgs->peer->peerIsChoked = choke;
733        if( choke )
734            cancelAllRequestsToClient( msgs );
735        protocolSendChoke( msgs, choke );
736        msgs->peer->chokeChangedAt = now;
737    }
738}
739
740/**
741***
742**/
743
744void
745tr_peerMsgsHave( tr_peermsgs * msgs, uint32_t index )
746{
747    protocolSendHave( msgs, index );
748
749    /* since we have more pieces now, we might not be interested in this peer */
750    updateInterest( msgs );
751}
752
753/**
754***
755**/
756
757static bool
758reqIsValid( const tr_peermsgs * peer,
759            uint32_t            index,
760            uint32_t            offset,
761            uint32_t            length )
762{
763    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
764}
765
766static bool
767requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
768{
769    return reqIsValid( msgs, req->index, req->offset, req->length );
770}
771
772void
773tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
774{
775    struct peer_request req;
776/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
777    blockToReq( msgs->torrent, block, &req );
778    protocolSendCancel( msgs, &req );
779}
780
781/**
782***
783**/
784
785static void
786sendLtepHandshake( tr_peermsgs * msgs )
787{
788    tr_benc val, *m;
789    char * buf;
790    int len;
791    bool allow_pex;
792    bool allow_metadata_xfer;
793    struct evbuffer * out = msgs->outMessages;
794    const unsigned char * ipv6 = tr_globalIPv6();
795
796    if( msgs->clientSentLtepHandshake )
797        return;
798
799    dbgmsg( msgs, "sending an ltep handshake" );
800    msgs->clientSentLtepHandshake = 1;
801
802    /* decide if we want to advertise metadata xfer support (BEP 9) */
803    if( tr_torrentIsPrivate( msgs->torrent ) )
804        allow_metadata_xfer = 0;
805    else
806        allow_metadata_xfer = 1;
807
808    /* decide if we want to advertise pex support */
809    if( !tr_torrentAllowsPex( msgs->torrent ) )
810        allow_pex = 0;
811    else if( msgs->peerSentLtepHandshake )
812        allow_pex = msgs->peerSupportsPex ? 1 : 0;
813    else
814        allow_pex = 1;
815
816    tr_bencInitDict( &val, 8 );
817    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
818    if( ipv6 != NULL )
819        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
820    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
821                            && ( msgs->torrent->infoDictLength > 0 ) )
822        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
823    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
824    tr_bencDictAddInt( &val, "reqq", REQQ );
825    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
826    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
827    m  = tr_bencDictAddDict( &val, "m", 2 );
828    if( allow_metadata_xfer )
829        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
830    if( allow_pex )
831        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
832
833    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
834
835    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
836    evbuffer_add_uint8 ( out, BT_LTEP );
837    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
838    evbuffer_add       ( out, buf, len );
839    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
840    dbgOutMessageLen( msgs );
841
842    /* cleanup */
843    tr_bencFree( &val );
844    tr_free( buf );
845}
846
847static void
848parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
849{
850    int64_t   i;
851    tr_benc   val, * sub;
852    uint8_t * tmp = tr_new( uint8_t, len );
853    const uint8_t *addr;
854    size_t addr_len;
855    tr_pex pex;
856    int8_t seedProbability = -1;
857
858    memset( &pex, 0, sizeof( tr_pex ) );
859
860    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
861    msgs->peerSentLtepHandshake = 1;
862
863    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
864    {
865        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
866        tr_free( tmp );
867        return;
868    }
869
870    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
871
872    /* does the peer prefer encrypted connections? */
873    if( tr_bencDictFindInt( &val, "e", &i ) ) {
874        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
875                                              : ENCRYPTION_PREFERENCE_NO;
876        if( i )
877            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
878    }
879
880    /* check supported messages for utorrent pex */
881    msgs->peerSupportsPex = 0;
882    msgs->peerSupportsMetadataXfer = 0;
883
884    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
885        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
886            msgs->peerSupportsPex = i != 0;
887            msgs->ut_pex_id = (uint8_t) i;
888            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
889        }
890        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
891            msgs->peerSupportsMetadataXfer = i != 0;
892            msgs->ut_metadata_id = (uint8_t) i;
893            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
894        }
895        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
896            /* Mysterious µTorrent extension that we don't grok.  However,
897               it implies support for µTP, so use it to indicate that. */
898            tr_peerMgrSetUtpFailed( msgs->torrent,
899                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
900                                    false );
901        }
902    }
903
904    /* look for metainfo size (BEP 9) */
905    if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) {
906        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
907        msgs->metadata_size_hint = (size_t) i;
908    }
909
910    /* look for upload_only (BEP 21) */
911    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
912        seedProbability = i==0 ? 0 : 100;
913
914    /* get peer's listening port */
915    if( tr_bencDictFindInt( &val, "p", &i ) ) {
916        pex.port = htons( (uint16_t)i );
917        fireClientGotPort( msgs, pex.port );
918        dbgmsg( msgs, "peer's port is now %d", (int)i );
919    }
920
921    if( tr_peerIoIsIncoming( msgs->peer->io )
922        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
923        && ( addr_len == 4 ) )
924    {
925        pex.addr.type = TR_AF_INET;
926        memcpy( &pex.addr.addr.addr4, addr, 4 );
927        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
928    }
929
930    if( tr_peerIoIsIncoming( msgs->peer->io )
931        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
932        && ( addr_len == 16 ) )
933    {
934        pex.addr.type = TR_AF_INET6;
935        memcpy( &pex.addr.addr.addr6, addr, 16 );
936        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
937    }
938
939    /* get peer's maximum request queue size */
940    if( tr_bencDictFindInt( &val, "reqq", &i ) )
941        msgs->reqq = i;
942
943    tr_bencFree( &val );
944    tr_free( tmp );
945}
946
947static void
948parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
949{
950    tr_benc dict;
951    char * msg_end;
952    char * benc_end;
953    int64_t msg_type = -1;
954    int64_t piece = -1;
955    int64_t total_size = 0;
956    uint8_t * tmp = tr_new( uint8_t, msglen );
957
958    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
959    msg_end = (char*)tmp + msglen;
960
961    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
962    {
963        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
964        tr_bencDictFindInt( &dict, "piece", &piece );
965        tr_bencDictFindInt( &dict, "total_size", &total_size );
966        tr_bencFree( &dict );
967    }
968
969    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
970            (int)msg_type, (int)piece, (int)total_size );
971
972    if( msg_type == METADATA_MSG_TYPE_REJECT )
973    {
974        /* NOOP */
975    }
976
977    if( ( msg_type == METADATA_MSG_TYPE_DATA )
978        && ( !tr_torrentHasMetadata( msgs->torrent ) )
979        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
980        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
981    {
982        const int pieceLen = msg_end - benc_end;
983        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
984    }
985
986    if( msg_type == METADATA_MSG_TYPE_REQUEST )
987    {
988        if( ( piece >= 0 )
989            && tr_torrentHasMetadata( msgs->torrent )
990            && !tr_torrentIsPrivate( msgs->torrent )
991            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
992        {
993            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
994        }
995        else
996        {
997            tr_benc tmp;
998            int payloadLen;
999            char * payload;
1000            struct evbuffer * out = msgs->outMessages;
1001
1002            /* build the rejection message */
1003            tr_bencInitDict( &tmp, 2 );
1004            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1005            tr_bencDictAddInt( &tmp, "piece", piece );
1006            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1007            tr_bencFree( &tmp );
1008
1009            /* write it out as a LTEP message to our outMessages buffer */
1010            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1011            evbuffer_add_uint8 ( out, BT_LTEP );
1012            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1013            evbuffer_add       ( out, payload, payloadLen );
1014            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1015            dbgOutMessageLen( msgs );
1016
1017            tr_free( payload );
1018        }
1019    }
1020
1021    tr_free( tmp );
1022}
1023
1024static void
1025parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1026{
1027    int loaded = 0;
1028    uint8_t * tmp = tr_new( uint8_t, msglen );
1029    tr_benc val;
1030    tr_torrent * tor = msgs->torrent;
1031    const uint8_t * added;
1032    size_t added_len;
1033
1034    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1035
1036    if( tr_torrentAllowsPex( tor )
1037      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1038    {
1039        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1040        {
1041            tr_pex * pex;
1042            size_t i, n;
1043            size_t added_f_len = 0;
1044            const uint8_t * added_f = NULL;
1045
1046            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1047            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1048
1049            n = MIN( n, MAX_PEX_PEER_COUNT );
1050            for( i=0; i<n; ++i )
1051            {
1052                int seedProbability = -1;
1053                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1054                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1055            }
1056
1057            tr_free( pex );
1058        }
1059
1060        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1061        {
1062            tr_pex * pex;
1063            size_t i, n;
1064            size_t added_f_len = 0;
1065            const uint8_t * added_f = NULL;
1066
1067            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1068            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1069
1070            n = MIN( n, MAX_PEX_PEER_COUNT );
1071            for( i=0; i<n; ++i )
1072            {
1073                int seedProbability = -1;
1074                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1075                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1076            }
1077
1078            tr_free( pex );
1079        }
1080    }
1081
1082    if( loaded )
1083        tr_bencFree( &val );
1084    tr_free( tmp );
1085}
1086
1087static void sendPex( tr_peermsgs * msgs );
1088
1089static void
1090parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1091{
1092    uint8_t ltep_msgid;
1093
1094    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1095    msglen--;
1096
1097    if( ltep_msgid == LTEP_HANDSHAKE )
1098    {
1099        dbgmsg( msgs, "got ltep handshake" );
1100        parseLtepHandshake( msgs, msglen, inbuf );
1101        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1102        {
1103            sendLtepHandshake( msgs );
1104            sendPex( msgs );
1105        }
1106    }
1107    else if( ltep_msgid == UT_PEX_ID )
1108    {
1109        dbgmsg( msgs, "got ut pex" );
1110        msgs->peerSupportsPex = 1;
1111        parseUtPex( msgs, msglen, inbuf );
1112    }
1113    else if( ltep_msgid == UT_METADATA_ID )
1114    {
1115        dbgmsg( msgs, "got ut metadata" );
1116        msgs->peerSupportsMetadataXfer = 1;
1117        parseUtMetadata( msgs, msglen, inbuf );
1118    }
1119    else
1120    {
1121        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1122        evbuffer_drain( inbuf, msglen );
1123    }
1124}
1125
1126static int
1127readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1128{
1129    uint32_t len;
1130
1131    if( inlen < sizeof( len ) )
1132        return READ_LATER;
1133
1134    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1135
1136    if( len == 0 ) /* peer sent us a keepalive message */
1137        dbgmsg( msgs, "got KeepAlive" );
1138    else
1139    {
1140        msgs->incoming.length = len;
1141        msgs->state = AWAITING_BT_ID;
1142    }
1143
1144    return READ_NOW;
1145}
1146
1147static int readBtMessage( tr_peermsgs *, struct evbuffer *, size_t );
1148
1149static int
1150readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1151{
1152    uint8_t id;
1153
1154    if( inlen < sizeof( uint8_t ) )
1155        return READ_LATER;
1156
1157    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1158    msgs->incoming.id = id;
1159    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1160
1161    if( id == BT_PIECE )
1162    {
1163        msgs->state = AWAITING_BT_PIECE;
1164        return READ_NOW;
1165    }
1166    else if( msgs->incoming.length != 1 )
1167    {
1168        msgs->state = AWAITING_BT_MESSAGE;
1169        return READ_NOW;
1170    }
1171    else return readBtMessage( msgs, inbuf, inlen - 1 );
1172}
1173
1174static void
1175updatePeerProgress( tr_peermsgs * msgs )
1176{
1177    tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1178
1179    /*updateFastSet( msgs );*/
1180    updateInterest( msgs );
1181}
1182
1183static void
1184prefetchPieces( tr_peermsgs *msgs )
1185{
1186    int i;
1187
1188    if( !getSession(msgs)->isPrefetchEnabled )
1189        return;
1190
1191    /* Maintain 12 prefetched blocks per unchoked peer */
1192    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1193    {
1194        const struct peer_request * req = msgs->peerAskedFor + i;
1195        if( requestIsValid( msgs, req ) )
1196        {
1197            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1198            ++msgs->prefetchCount;
1199        }
1200    }
1201}
1202
1203static void
1204peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1205{
1206    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1207    const int reqIsValid = requestIsValid( msgs, req );
1208    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1209    const int peerIsChoked = msgs->peer->peerIsChoked;
1210
1211    int allow = false;
1212
1213    if( !reqIsValid )
1214        dbgmsg( msgs, "rejecting an invalid request." );
1215    else if( !clientHasPiece )
1216        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1217    else if( peerIsChoked )
1218        dbgmsg( msgs, "rejecting request from choked peer" );
1219    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1220        dbgmsg( msgs, "rejecting request ... reqq is full" );
1221    else
1222        allow = true;
1223
1224    if( allow ) {
1225        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1226        prefetchPieces( msgs );
1227    } else if( fext ) {
1228        protocolSendReject( msgs, req );
1229    }
1230}
1231
1232static bool
1233messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1234{
1235    switch( id )
1236    {
1237        case BT_CHOKE:
1238        case BT_UNCHOKE:
1239        case BT_INTERESTED:
1240        case BT_NOT_INTERESTED:
1241        case BT_FEXT_HAVE_ALL:
1242        case BT_FEXT_HAVE_NONE:
1243            return len == 1;
1244
1245        case BT_HAVE:
1246        case BT_FEXT_SUGGEST:
1247        case BT_FEXT_ALLOWED_FAST:
1248            return len == 5;
1249
1250        case BT_BITFIELD:
1251            if( tr_torrentHasMetadata( msg->torrent ) )
1252                return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1253            /* we don't know the piece count yet,
1254               so we can only guess whether to send true or false */
1255            if( msg->metadata_size_hint > 0 )
1256                return len <= msg->metadata_size_hint;
1257            return true;
1258
1259        case BT_REQUEST:
1260        case BT_CANCEL:
1261        case BT_FEXT_REJECT:
1262            return len == 13;
1263
1264        case BT_PIECE:
1265            return len > 9 && len <= 16393;
1266
1267        case BT_PORT:
1268            return len == 3;
1269
1270        case BT_LTEP:
1271            return len >= 2;
1272
1273        default:
1274            return false;
1275    }
1276}
1277
1278static int clientGotBlock( tr_peermsgs *               msgs,
1279                           struct evbuffer *           block,
1280                           const struct peer_request * req );
1281
1282static int
1283readBtPiece( tr_peermsgs      * msgs,
1284             struct evbuffer  * inbuf,
1285             size_t             inlen,
1286             size_t           * setme_piece_bytes_read )
1287{
1288    struct peer_request * req = &msgs->incoming.blockReq;
1289
1290    assert( evbuffer_get_length( inbuf ) >= inlen );
1291    dbgmsg( msgs, "In readBtPiece" );
1292
1293    if( !req->length )
1294    {
1295        if( inlen < 8 )
1296            return READ_LATER;
1297
1298        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1299        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1300        req->length = msgs->incoming.length - 9;
1301        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1302        return READ_NOW;
1303    }
1304    else
1305    {
1306        int err;
1307
1308        /* read in another chunk of data */
1309        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1310        size_t n = MIN( nLeft, inlen );
1311
1312        tr_peerIoReadBytesToBuf( msgs->peer->io, inbuf, msgs->incoming.block, n );
1313
1314        fireClientGotData( msgs, n, true );
1315        *setme_piece_bytes_read += n;
1316        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1317               n, req->index, req->offset, req->length,
1318               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1319        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1320            return READ_LATER;
1321
1322        /* pass the block along... */
1323        err = clientGotBlock( msgs, msgs->incoming.block, req );
1324        evbuffer_drain( msgs->incoming.block, evbuffer_get_length( msgs->incoming.block ) );
1325
1326        /* cleanup */
1327        req->length = 0;
1328        msgs->state = AWAITING_BT_LENGTH;
1329        return err ? READ_ERR : READ_NOW;
1330    }
1331}
1332
1333static void updateDesiredRequestCount( tr_peermsgs * msgs );
1334
1335static int
1336readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1337{
1338    uint32_t      ui32;
1339    uint32_t      msglen = msgs->incoming.length;
1340    const uint8_t id = msgs->incoming.id;
1341#ifndef NDEBUG
1342    const size_t  startBufLen = evbuffer_get_length( inbuf );
1343#endif
1344    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1345
1346    --msglen; /* id length */
1347
1348    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1349
1350    if( inlen < msglen )
1351        return READ_LATER;
1352
1353    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1354    {
1355        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1356        fireError( msgs, EMSGSIZE );
1357        return READ_ERR;
1358    }
1359
1360    switch( id )
1361    {
1362        case BT_CHOKE:
1363            dbgmsg( msgs, "got Choke" );
1364            msgs->peer->clientIsChoked = 1;
1365            if( !fext )
1366                fireGotChoke( msgs );
1367            break;
1368
1369        case BT_UNCHOKE:
1370            dbgmsg( msgs, "got Unchoke" );
1371            msgs->peer->clientIsChoked = 0;
1372            updateDesiredRequestCount( msgs );
1373            break;
1374
1375        case BT_INTERESTED:
1376            dbgmsg( msgs, "got Interested" );
1377            msgs->peer->peerIsInterested = 1;
1378            break;
1379
1380        case BT_NOT_INTERESTED:
1381            dbgmsg( msgs, "got Not Interested" );
1382            msgs->peer->peerIsInterested = 0;
1383            break;
1384
1385        case BT_HAVE:
1386            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1387            dbgmsg( msgs, "got Have: %u", ui32 );
1388            if( tr_torrentHasMetadata( msgs->torrent )
1389                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1390            {
1391                fireError( msgs, ERANGE );
1392                return READ_ERR;
1393            }
1394
1395            /* a peer can send the same HAVE message twice... */
1396            if( !tr_bitfieldHas( &msgs->peer->have, ui32 ) ) {
1397                tr_bitfieldAdd( &msgs->peer->have, ui32 );
1398                fireClientGotHave( msgs, ui32 );
1399            }
1400            updatePeerProgress( msgs );
1401            break;
1402
1403        case BT_BITFIELD: {
1404            uint8_t * tmp = tr_new( uint8_t, msglen );
1405            dbgmsg( msgs, "got a bitfield" );
1406            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1407            tr_bitfieldSetRaw( &msgs->peer->have, tmp, msglen );
1408            fireClientGotBitfield( msgs, &msgs->peer->have );
1409            updatePeerProgress( msgs );
1410            tr_free( tmp );
1411            break;
1412        }
1413
1414        case BT_REQUEST:
1415        {
1416            struct peer_request r;
1417            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1418            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1419            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1420            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1421            peerMadeRequest( msgs, &r );
1422            break;
1423        }
1424
1425        case BT_CANCEL:
1426        {
1427            int i;
1428            struct peer_request r;
1429            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1430            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1431            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1432            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1433            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1434
1435            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1436                const struct peer_request * req = msgs->peerAskedFor + i;
1437                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1438                    break;
1439            }
1440
1441            if( i < msgs->peer->pendingReqsToClient )
1442                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1443                                           msgs->peer->pendingReqsToClient-- );
1444            break;
1445        }
1446
1447        case BT_PIECE:
1448            assert( 0 ); /* handled elsewhere! */
1449            break;
1450
1451        case BT_PORT:
1452            dbgmsg( msgs, "Got a BT_PORT" );
1453            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1454            if( msgs->peer->dht_port > 0 )
1455                tr_dhtAddNode( getSession(msgs),
1456                               tr_peerAddress( msgs->peer ),
1457                               msgs->peer->dht_port, 0 );
1458            break;
1459
1460        case BT_FEXT_SUGGEST:
1461            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1462            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1463            if( fext )
1464                fireClientGotSuggest( msgs, ui32 );
1465            else {
1466                fireError( msgs, EMSGSIZE );
1467                return READ_ERR;
1468            }
1469            break;
1470
1471        case BT_FEXT_ALLOWED_FAST:
1472            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1473            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1474            if( fext )
1475                fireClientGotAllowedFast( msgs, ui32 );
1476            else {
1477                fireError( msgs, EMSGSIZE );
1478                return READ_ERR;
1479            }
1480            break;
1481
1482        case BT_FEXT_HAVE_ALL:
1483            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1484            if( fext ) {
1485                tr_bitfieldSetHasAll( &msgs->peer->have );
1486assert( tr_bitfieldHasAll( &msgs->peer->have ) );
1487                fireClientGotHaveAll( msgs );
1488                updatePeerProgress( msgs );
1489            } else {
1490                fireError( msgs, EMSGSIZE );
1491                return READ_ERR;
1492            }
1493            break;
1494
1495        case BT_FEXT_HAVE_NONE:
1496            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1497            if( fext ) {
1498                tr_bitfieldSetHasNone( &msgs->peer->have );
1499                fireClientGotHaveNone( msgs );
1500                updatePeerProgress( msgs );
1501            } else {
1502                fireError( msgs, EMSGSIZE );
1503                return READ_ERR;
1504            }
1505            break;
1506
1507        case BT_FEXT_REJECT:
1508        {
1509            struct peer_request r;
1510            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1511            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1512            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1513            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1514            if( fext )
1515                fireGotRej( msgs, &r );
1516            else {
1517                fireError( msgs, EMSGSIZE );
1518                return READ_ERR;
1519            }
1520            break;
1521        }
1522
1523        case BT_LTEP:
1524            dbgmsg( msgs, "Got a BT_LTEP" );
1525            parseLtep( msgs, msglen, inbuf );
1526            break;
1527
1528        default:
1529            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1530            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1531            break;
1532    }
1533
1534    assert( msglen + 1 == msgs->incoming.length );
1535    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1536
1537    msgs->state = AWAITING_BT_LENGTH;
1538    return READ_NOW;
1539}
1540
1541/* returns 0 on success, or an errno on failure */
1542static int
1543clientGotBlock( tr_peermsgs                * msgs,
1544                struct evbuffer            * data,
1545                const struct peer_request  * req )
1546{
1547    int err;
1548    tr_torrent * tor = msgs->torrent;
1549    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1550
1551    assert( msgs );
1552    assert( req );
1553
1554    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1555        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1556                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1557        return EMSGSIZE;
1558    }
1559
1560    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1561
1562    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1563        dbgmsg( msgs, "we didn't ask for this message..." );
1564        return 0;
1565    }
1566    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1567        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1568        return 0;
1569    }
1570
1571    /**
1572    ***  Save the block
1573    **/
1574
1575    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1576        return err;
1577
1578    tr_bitfieldAdd( &msgs->peer->blame, req->index );
1579    fireGotBlock( msgs, req );
1580    return 0;
1581}
1582
1583static int peerPulse( void * vmsgs );
1584
1585static void
1586didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1587{
1588    tr_peermsgs * msgs = vmsgs;
1589    firePeerGotData( msgs, bytesWritten, wasPieceData );
1590
1591    if ( tr_isPeerIo( io ) && io->userData )
1592        peerPulse( msgs );
1593}
1594
1595static ReadState
1596canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1597{
1598    ReadState         ret;
1599    tr_peermsgs *     msgs = vmsgs;
1600    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1601    const size_t      inlen = evbuffer_get_length( in );
1602
1603    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1604
1605    if( !inlen )
1606    {
1607        ret = READ_LATER;
1608    }
1609    else if( msgs->state == AWAITING_BT_PIECE )
1610    {
1611        ret = readBtPiece( msgs, in, inlen, piece );
1612    }
1613    else switch( msgs->state )
1614    {
1615        case AWAITING_BT_LENGTH:
1616            ret = readBtLength ( msgs, in, inlen ); break;
1617
1618        case AWAITING_BT_ID:
1619            ret = readBtId     ( msgs, in, inlen ); break;
1620
1621        case AWAITING_BT_MESSAGE:
1622            ret = readBtMessage( msgs, in, inlen ); break;
1623
1624        default:
1625            ret = READ_ERR;
1626            assert( 0 );
1627    }
1628
1629    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1630
1631    /* log the raw data that was read */
1632    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1633        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), false );
1634
1635    return ret;
1636}
1637
1638int
1639tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1640{
1641    if( msgs->state != AWAITING_BT_PIECE )
1642        return false;
1643
1644    return block == _tr_block( msgs->torrent,
1645                               msgs->incoming.blockReq.index,
1646                               msgs->incoming.blockReq.offset );
1647}
1648
1649/**
1650***
1651**/
1652
1653static void
1654updateDesiredRequestCount( tr_peermsgs * msgs )
1655{
1656    const tr_torrent * const torrent = msgs->torrent;
1657
1658    if( tr_torrentIsSeed( msgs->torrent ) )
1659    {
1660        msgs->desiredRequestCount = 0;
1661    }
1662    else if( msgs->peer->clientIsChoked )
1663    {
1664        msgs->desiredRequestCount = 0;
1665    }
1666    else if( !msgs->peer->clientIsInterested )
1667    {
1668        msgs->desiredRequestCount = 0;
1669    }
1670    else
1671    {
1672        int estimatedBlocksInPeriod;
1673        int rate_Bps;
1674        int irate_Bps;
1675        const int floor = 4;
1676        const int seconds = REQUEST_BUF_SECS;
1677        const uint64_t now = tr_time_msec( );
1678
1679        /* Get the rate limit we should use.
1680         * FIXME: this needs to consider all the other peers as well... */
1681        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1682        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1683            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1684
1685        /* honor the session limits, if enabled */
1686        if( tr_torrentUsesSessionLimits( torrent ) )
1687            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1688                rate_Bps = MIN( rate_Bps, irate_Bps );
1689
1690        /* use this desired rate to figure out how
1691         * many requests we should send to this peer */
1692        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1693        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1694
1695        /* honor the peer's maximum request count, if specified */
1696        if( msgs->reqq > 0 )
1697            if( msgs->desiredRequestCount > msgs->reqq )
1698                msgs->desiredRequestCount = msgs->reqq;
1699    }
1700}
1701
1702static void
1703updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1704{
1705    int piece;
1706
1707    if( msgs->peerSupportsMetadataXfer
1708        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1709    {
1710        tr_benc tmp;
1711        int payloadLen;
1712        char * payload;
1713        struct evbuffer * out = msgs->outMessages;
1714
1715        /* build the data message */
1716        tr_bencInitDict( &tmp, 3 );
1717        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1718        tr_bencDictAddInt( &tmp, "piece", piece );
1719        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1720        tr_bencFree( &tmp );
1721
1722        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1723
1724        /* write it out as a LTEP message to our outMessages buffer */
1725        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1726        evbuffer_add_uint8 ( out, BT_LTEP );
1727        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1728        evbuffer_add       ( out, payload, payloadLen );
1729        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1730        dbgOutMessageLen( msgs );
1731
1732        tr_free( payload );
1733    }
1734}
1735
1736static void
1737updateBlockRequests( tr_peermsgs * msgs )
1738{
1739    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1740        && ( msgs->desiredRequestCount > 0 )
1741        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1742    {
1743        int i;
1744        int n;
1745        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1746        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1747
1748        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1749
1750        for( i=0; i<n; ++i )
1751        {
1752            struct peer_request req;
1753            blockToReq( msgs->torrent, blocks[i], &req );
1754            protocolSendRequest( msgs, &req );
1755        }
1756
1757        tr_free( blocks );
1758    }
1759}
1760
1761static size_t
1762fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1763{
1764    int piece;
1765    size_t bytesWritten = 0;
1766    struct peer_request req;
1767    const bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1768    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1769
1770    /**
1771    ***  Protocol messages
1772    **/
1773
1774    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1775    {
1776        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1777        msgs->outMessagesBatchedAt = now;
1778    }
1779    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1780    {
1781        const size_t len = evbuffer_get_length( msgs->outMessages );
1782        /* flush the protocol messages */
1783        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1784        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, false );
1785        msgs->clientSentAnythingAt = now;
1786        msgs->outMessagesBatchedAt = 0;
1787        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1788        bytesWritten +=  len;
1789    }
1790
1791    /**
1792    ***  Metadata Pieces
1793    **/
1794
1795    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1796        && popNextMetadataRequest( msgs, &piece ) )
1797    {
1798        char * data;
1799        int dataLen;
1800        bool ok = false;
1801
1802        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1803        if( ( dataLen > 0 ) && ( data != NULL ) )
1804        {
1805            tr_benc tmp;
1806            int payloadLen;
1807            char * payload;
1808            struct evbuffer * out = msgs->outMessages;
1809
1810            /* build the data message */
1811            tr_bencInitDict( &tmp, 3 );
1812            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1813            tr_bencDictAddInt( &tmp, "piece", piece );
1814            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1815            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1816            tr_bencFree( &tmp );
1817
1818            /* write it out as a LTEP message to our outMessages buffer */
1819            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1820            evbuffer_add_uint8 ( out, BT_LTEP );
1821            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1822            evbuffer_add       ( out, payload, payloadLen );
1823            evbuffer_add       ( out, data, dataLen );
1824            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1825            dbgOutMessageLen( msgs );
1826
1827            tr_free( payload );
1828            tr_free( data );
1829
1830            ok = true;
1831        }
1832
1833        if( !ok ) /* send a rejection message */
1834        {
1835            tr_benc tmp;
1836            int payloadLen;
1837            char * payload;
1838            struct evbuffer * out = msgs->outMessages;
1839
1840            /* build the rejection message */
1841            tr_bencInitDict( &tmp, 2 );
1842            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1843            tr_bencDictAddInt( &tmp, "piece", piece );
1844            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1845            tr_bencFree( &tmp );
1846
1847            /* write it out as a LTEP message to our outMessages buffer */
1848            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1849            evbuffer_add_uint8 ( out, BT_LTEP );
1850            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1851            evbuffer_add       ( out, payload, payloadLen );
1852            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1853            dbgOutMessageLen( msgs );
1854
1855            tr_free( payload );
1856        }
1857    }
1858
1859    /**
1860    ***  Data Blocks
1861    **/
1862
1863    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1864        && popNextRequest( msgs, &req ) )
1865    {
1866        --msgs->prefetchCount;
1867
1868        if( requestIsValid( msgs, &req )
1869            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1870        {
1871            int err;
1872            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1873            struct evbuffer * out;
1874            struct evbuffer_iovec iovec[1];
1875
1876            out = evbuffer_new( );
1877            evbuffer_expand( out, msglen );
1878
1879            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1880            evbuffer_add_uint8 ( out, BT_PIECE );
1881            evbuffer_add_uint32( out, req.index );
1882            evbuffer_add_uint32( out, req.offset );
1883
1884            evbuffer_reserve_space( out, req.length, iovec, 1 );
1885            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1886            iovec[0].iov_len = req.length;
1887            evbuffer_commit_space( out, iovec, 1 );
1888
1889            /* check the piece if it needs checking... */
1890            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1891                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1892                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1893
1894            if( err )
1895            {
1896                if( fext )
1897                    protocolSendReject( msgs, &req );
1898            }
1899            else
1900            {
1901                const size_t n = evbuffer_get_length( out );
1902                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1903                assert( n == msglen );
1904                tr_peerIoWriteBuf( msgs->peer->io, out, true );
1905                bytesWritten += n;
1906                msgs->clientSentAnythingAt = now;
1907                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1908            }
1909
1910            evbuffer_free( out );
1911
1912            if( err )
1913            {
1914                bytesWritten = 0;
1915                msgs = NULL;
1916            }
1917        }
1918        else if( fext ) /* peer needs a reject message */
1919        {
1920            protocolSendReject( msgs, &req );
1921        }
1922
1923        if( msgs != NULL )
1924            prefetchPieces( msgs );
1925    }
1926
1927    /**
1928    ***  Keepalive
1929    **/
1930
1931    if( ( msgs != NULL )
1932        && ( msgs->clientSentAnythingAt != 0 )
1933        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1934    {
1935        dbgmsg( msgs, "sending a keepalive message" );
1936        evbuffer_add_uint32( msgs->outMessages, 0 );
1937        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1938    }
1939
1940    return bytesWritten;
1941}
1942
1943static int
1944peerPulse( void * vmsgs )
1945{
1946    tr_peermsgs * msgs = vmsgs;
1947    const time_t  now = tr_time( );
1948
1949    if ( tr_isPeerIo( msgs->peer->io ) ) {
1950        updateDesiredRequestCount( msgs );
1951        updateBlockRequests( msgs );
1952        updateMetadataRequests( msgs, now );
1953    }
1954
1955    for( ;; )
1956        if( fillOutputBuffer( msgs, now ) < 1 )
1957            break;
1958
1959    return true; /* loop forever */
1960}
1961
1962void
1963tr_peerMsgsPulse( tr_peermsgs * msgs )
1964{
1965    if( msgs != NULL )
1966        peerPulse( msgs );
1967}
1968
1969static void
1970gotError( tr_peerIo * io UNUSED, short what, void * vmsgs )
1971{
1972    if( what & BEV_EVENT_TIMEOUT )
1973        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1974    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
1975        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1976               what, errno, tr_strerror( errno ) );
1977    fireError( vmsgs, ENOTCONN );
1978}
1979
1980static void
1981sendBitfield( tr_peermsgs * msgs )
1982{
1983    size_t byte_count = 0;
1984    struct evbuffer * out = msgs->outMessages;
1985    void * bytes = tr_cpCreatePieceBitfield( &msgs->torrent->completion, &byte_count );
1986
1987    evbuffer_add_uint32( out, sizeof( uint8_t ) + byte_count );
1988    evbuffer_add_uint8 ( out, BT_BITFIELD );
1989    evbuffer_add       ( out, bytes, byte_count );
1990    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
1991    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1992
1993    tr_free( bytes );
1994}
1995
1996static void
1997tellPeerWhatWeHave( tr_peermsgs * msgs )
1998{
1999    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2000
2001    if( fext && tr_cpHasAll( &msgs->torrent->completion ) )
2002    {
2003        protocolSendHaveAll( msgs );
2004    }
2005    else if( fext && tr_cpHasNone( &msgs->torrent->completion ) )
2006    {
2007        protocolSendHaveNone( msgs );
2008    }
2009    else if( !tr_cpHasNone( &msgs->torrent->completion ) )
2010    {
2011        sendBitfield( msgs );
2012    }
2013}
2014
2015/**
2016***
2017**/
2018
2019/* some peers give us error messages if we send
2020   more than this many peers in a single pex message
2021   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2022#define MAX_PEX_ADDED 50
2023#define MAX_PEX_DROPPED 50
2024
2025typedef struct
2026{
2027    tr_pex *  added;
2028    tr_pex *  dropped;
2029    tr_pex *  elements;
2030    int       addedCount;
2031    int       droppedCount;
2032    int       elementCount;
2033}
2034PexDiffs;
2035
2036static void
2037pexAddedCb( void * vpex, void * userData )
2038{
2039    PexDiffs * diffs = userData;
2040    tr_pex *   pex = vpex;
2041
2042    if( diffs->addedCount < MAX_PEX_ADDED )
2043    {
2044        diffs->added[diffs->addedCount++] = *pex;
2045        diffs->elements[diffs->elementCount++] = *pex;
2046    }
2047}
2048
2049static inline void
2050pexDroppedCb( void * vpex, void * userData )
2051{
2052    PexDiffs * diffs = userData;
2053    tr_pex *   pex = vpex;
2054
2055    if( diffs->droppedCount < MAX_PEX_DROPPED )
2056    {
2057        diffs->dropped[diffs->droppedCount++] = *pex;
2058    }
2059}
2060
2061static inline void
2062pexElementCb( void * vpex, void * userData )
2063{
2064    PexDiffs * diffs = userData;
2065    tr_pex * pex = vpex;
2066
2067    diffs->elements[diffs->elementCount++] = *pex;
2068}
2069
2070typedef void ( tr_set_func )( void * element, void * userData );
2071
2072/**
2073 * @brief find the differences and commonalities in two sorted sets
2074 * @param a the first set
2075 * @param aCount the number of elements in the set 'a'
2076 * @param b the second set
2077 * @param bCount the number of elements in the set 'b'
2078 * @param compare the sorting method for both sets
2079 * @param elementSize the sizeof the element in the two sorted sets
2080 * @param in_a called for items in set 'a' but not set 'b'
2081 * @param in_b called for items in set 'b' but not set 'a'
2082 * @param in_both called for items that are in both sets
2083 * @param userData user data passed along to in_a, in_b, and in_both
2084 */
2085static void
2086tr_set_compare( const void * va, size_t aCount,
2087                const void * vb, size_t bCount,
2088                int compare( const void * a, const void * b ),
2089                size_t elementSize,
2090                tr_set_func in_a_cb,
2091                tr_set_func in_b_cb,
2092                tr_set_func in_both_cb,
2093                void * userData )
2094{
2095    const uint8_t * a = va;
2096    const uint8_t * b = vb;
2097    const uint8_t * aend = a + elementSize * aCount;
2098    const uint8_t * bend = b + elementSize * bCount;
2099
2100    while( a != aend || b != bend )
2101    {
2102        if( a == aend )
2103        {
2104            ( *in_b_cb )( (void*)b, userData );
2105            b += elementSize;
2106        }
2107        else if( b == bend )
2108        {
2109            ( *in_a_cb )( (void*)a, userData );
2110            a += elementSize;
2111        }
2112        else
2113        {
2114            const int val = ( *compare )( a, b );
2115
2116            if( !val )
2117            {
2118                ( *in_both_cb )( (void*)a, userData );
2119                a += elementSize;
2120                b += elementSize;
2121            }
2122            else if( val < 0 )
2123            {
2124                ( *in_a_cb )( (void*)a, userData );
2125                a += elementSize;
2126            }
2127            else if( val > 0 )
2128            {
2129                ( *in_b_cb )( (void*)b, userData );
2130                b += elementSize;
2131            }
2132        }
2133    }
2134}
2135
2136
2137static void
2138sendPex( tr_peermsgs * msgs )
2139{
2140    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2141    {
2142        PexDiffs diffs;
2143        PexDiffs diffs6;
2144        tr_pex * newPex = NULL;
2145        tr_pex * newPex6 = NULL;
2146        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2147        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2148
2149        /* build the diffs */
2150        diffs.added = tr_new( tr_pex, newCount );
2151        diffs.addedCount = 0;
2152        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2153        diffs.droppedCount = 0;
2154        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2155        diffs.elementCount = 0;
2156        tr_set_compare( msgs->pex, msgs->pexCount,
2157                        newPex, newCount,
2158                        tr_pexCompare, sizeof( tr_pex ),
2159                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2160        diffs6.added = tr_new( tr_pex, newCount6 );
2161        diffs6.addedCount = 0;
2162        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2163        diffs6.droppedCount = 0;
2164        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2165        diffs6.elementCount = 0;
2166        tr_set_compare( msgs->pex6, msgs->pexCount6,
2167                        newPex6, newCount6,
2168                        tr_pexCompare, sizeof( tr_pex ),
2169                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2170        dbgmsg(
2171            msgs,
2172            "pex: old peer count %d+%d, new peer count %d+%d, "
2173            "added %d+%d, removed %d+%d",
2174            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2175            diffs.addedCount, diffs6.addedCount,
2176            diffs.droppedCount, diffs6.droppedCount );
2177
2178        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2179            !diffs6.droppedCount )
2180        {
2181            tr_free( diffs.elements );
2182            tr_free( diffs6.elements );
2183        }
2184        else
2185        {
2186            int  i;
2187            tr_benc val;
2188            char * benc;
2189            int bencLen;
2190            uint8_t * tmp, *walk;
2191            struct evbuffer * out = msgs->outMessages;
2192
2193            /* update peer */
2194            tr_free( msgs->pex );
2195            msgs->pex = diffs.elements;
2196            msgs->pexCount = diffs.elementCount;
2197            tr_free( msgs->pex6 );
2198            msgs->pex6 = diffs6.elements;
2199            msgs->pexCount6 = diffs6.elementCount;
2200
2201            /* build the pex payload */
2202            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2203                                         * speed vs. likelihood? */
2204
2205            if( diffs.addedCount > 0)
2206            {
2207                /* "added" */
2208                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2209                for( i = 0; i < diffs.addedCount; ++i ) {
2210                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2211                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2212                }
2213                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2214                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2215                tr_free( tmp );
2216
2217                /* "added.f"
2218                 * unset each holepunch flag because we don't support it. */
2219                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2220                for( i = 0; i < diffs.addedCount; ++i )
2221                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2222                assert( ( walk - tmp ) == diffs.addedCount );
2223                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2224                tr_free( tmp );
2225            }
2226
2227            if( diffs.droppedCount > 0 )
2228            {
2229                /* "dropped" */
2230                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2231                for( i = 0; i < diffs.droppedCount; ++i ) {
2232                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2233                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2234                }
2235                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2236                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2237                tr_free( tmp );
2238            }
2239
2240            if( diffs6.addedCount > 0 )
2241            {
2242                /* "added6" */
2243                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2244                for( i = 0; i < diffs6.addedCount; ++i ) {
2245                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2246                    walk += 16;
2247                    memcpy( walk, &diffs6.added[i].port, 2 );
2248                    walk += 2;
2249                }
2250                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2251                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2252                tr_free( tmp );
2253
2254                /* "added6.f"
2255                 * unset each holepunch flag because we don't support it. */
2256                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2257                for( i = 0; i < diffs6.addedCount; ++i )
2258                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2259                assert( ( walk - tmp ) == diffs6.addedCount );
2260                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2261                tr_free( tmp );
2262            }
2263
2264            if( diffs6.droppedCount > 0 )
2265            {
2266                /* "dropped6" */
2267                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2268                for( i = 0; i < diffs6.droppedCount; ++i ) {
2269                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2270                    walk += 16;
2271                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2272                    walk += 2;
2273                }
2274                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2275                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2276                tr_free( tmp );
2277            }
2278
2279            /* write the pex message */
2280            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2281            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2282            evbuffer_add_uint8 ( out, BT_LTEP );
2283            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2284            evbuffer_add       ( out, benc, bencLen );
2285            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2286            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2287            dbgOutMessageLen( msgs );
2288
2289            tr_free( benc );
2290            tr_bencFree( &val );
2291        }
2292
2293        /* cleanup */
2294        tr_free( diffs.added );
2295        tr_free( diffs.dropped );
2296        tr_free( newPex );
2297        tr_free( diffs6.added );
2298        tr_free( diffs6.dropped );
2299        tr_free( newPex6 );
2300
2301        /*msgs->clientSentPexAt = tr_time( );*/
2302    }
2303}
2304
2305static void
2306pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2307{
2308    struct tr_peermsgs * msgs = vmsgs;
2309
2310    sendPex( msgs );
2311
2312    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2313}
2314
2315/**
2316***
2317**/
2318
2319tr_peermsgs*
2320tr_peerMsgsNew( struct tr_torrent    * torrent,
2321                struct tr_peer       * peer,
2322                tr_peer_callback     * callback,
2323                void                 * callbackData )
2324{
2325    tr_peermsgs * m;
2326
2327    assert( peer );
2328    assert( peer->io );
2329
2330    m = tr_new0( tr_peermsgs, 1 );
2331    m->callback = callback;
2332    m->callbackData = callbackData;
2333    m->peer = peer;
2334    m->torrent = torrent;
2335    m->peer->clientIsChoked = 1;
2336    m->peer->peerIsChoked = 1;
2337    m->peer->clientIsInterested = 0;
2338    m->peer->peerIsInterested = 0;
2339    m->state = AWAITING_BT_LENGTH;
2340    m->outMessages = evbuffer_new( );
2341    m->outMessagesBatchedAt = 0;
2342    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2343    m->incoming.block = evbuffer_new( );
2344    m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2345    peer->msgs = m;
2346    tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2347
2348    if( tr_peerIoSupportsUTP( peer->io ) ) {
2349        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2350        tr_peerMgrSetUtpSupported( torrent, addr );
2351        tr_peerMgrSetUtpFailed( torrent, addr, false );
2352    }
2353
2354    if( tr_peerIoSupportsLTEP( peer->io ) )
2355        sendLtepHandshake( m );
2356
2357    tellPeerWhatWeHave( m );
2358
2359    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2360    {
2361        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2362        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2363        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2364            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2365        }
2366    }
2367
2368    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2369    updateDesiredRequestCount( m );
2370
2371    return m;
2372}
2373
2374void
2375tr_peerMsgsFree( tr_peermsgs* msgs )
2376{
2377    if( msgs )
2378    {
2379        event_free( msgs->pexTimer );
2380
2381        evbuffer_free( msgs->incoming.block );
2382        evbuffer_free( msgs->outMessages );
2383        tr_free( msgs->pex6 );
2384        tr_free( msgs->pex );
2385
2386        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2387        tr_free( msgs );
2388    }
2389}
Note: See TracBrowser for help on using the repository browser.