source: trunk/libtransmission/peer-msgs.c @ 12276

Last change on this file since 12276 was 12276, checked in by jordan, 11 years ago

(trunk libT) remove dead logic branch detected by clang static analyzer

  • Property svn:keywords set to Date Rev Author Id
File size: 71.3 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 12276 2011-03-31 04:33:49Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <event2/buffer.h>
20#include <event2/bufferevent.h>
21#include <event2/event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "cache.h"
26#include "completion.h"
27#include "crypto.h" /* tr_sha1() */
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-msgs.h"
31#include "session.h"
32#include "torrent.h"
33#include "torrent-magnet.h"
34#include "tr-dht.h"
35#include "utils.h"
36#include "version.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54
55    BT_FEXT_SUGGEST         = 13,
56    BT_FEXT_HAVE_ALL        = 14,
57    BT_FEXT_HAVE_NONE       = 15,
58    BT_FEXT_REJECT          = 16,
59    BT_FEXT_ALLOWED_FAST    = 17,
60
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    UT_PEX_ID               = 1,
66    UT_METADATA_ID          = 3,
67
68    MAX_PEX_PEER_COUNT      = 50,
69
70    MIN_CHOKE_PERIOD_SEC    = 10,
71
72    /* idle seconds before we send a keepalive */
73    KEEPALIVE_INTERVAL_SECS = 100,
74
75    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
76
77    REQQ                    = 512,
78
79    METADATA_REQQ           = 64,
80
81    /* used in lowering the outMessages queue period */
82    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
83    HIGH_PRIORITY_INTERVAL_SECS = 2,
84    LOW_PRIORITY_INTERVAL_SECS = 10,
85
86    /* number of pieces we'll allow in our fast set */
87    MAX_FAST_SET_SIZE = 3,
88
89    /* defined in BEP #9 */
90    METADATA_MSG_TYPE_REQUEST = 0,
91    METADATA_MSG_TYPE_DATA = 1,
92    METADATA_MSG_TYPE_REJECT = 2
93};
94
95enum
96{
97    AWAITING_BT_LENGTH,
98    AWAITING_BT_ID,
99    AWAITING_BT_MESSAGE,
100    AWAITING_BT_PIECE
101};
102
103/**
104***
105**/
106
107struct peer_request
108{
109    uint32_t    index;
110    uint32_t    offset;
111    uint32_t    length;
112};
113
114static void
115blockToReq( const tr_torrent     * tor,
116            tr_block_index_t       block,
117            struct peer_request  * setme )
118{
119    tr_torrentGetBlockLocation( tor, block, &setme->index,
120                                            &setme->offset,
121                                            &setme->length );
122}
123
124/**
125***
126**/
127
128/* this is raw, unchanged data from the peer regarding
129 * the current message that it's sending us. */
130struct tr_incoming
131{
132    uint8_t                id;
133    uint32_t               length; /* includes the +1 for id length */
134    struct peer_request    blockReq; /* metadata for incoming blocks */
135    struct evbuffer *      block; /* piece data for incoming blocks */
136};
137
138/**
139 * Low-level communication state information about a connected peer.
140 *
141 * This structure remembers the low-level protocol states that we're
142 * in with this peer, such as active requests, pex messages, and so on.
143 * Its fields are all private to peer-msgs.c.
144 *
145 * Data not directly involved with sending & receiving messages is
146 * stored in tr_peer, where it can be accessed by both peermsgs and
147 * the peer manager.
148 *
149 * @see struct peer_atom
150 * @see tr_peer
151 */
152struct tr_peermsgs
153{
154    bool            peerSupportsPex;
155    bool            peerSupportsMetadataXfer;
156    bool            clientSentLtepHandshake;
157    bool            peerSentLtepHandshake;
158
159    /*bool          haveFastSet;*/
160
161    int             desiredRequestCount;
162
163    int             prefetchCount;
164
165    /* how long the outMessages batch should be allowed to grow before
166     * it's flushed -- some messages (like requests >:) should be sent
167     * very quickly; others aren't as urgent. */
168    int8_t          outMessagesBatchPeriod;
169
170    uint8_t         state;
171    uint8_t         ut_pex_id;
172    uint8_t         ut_metadata_id;
173    uint16_t        pexCount;
174    uint16_t        pexCount6;
175
176    size_t          metadata_size_hint;
177#if 0
178    size_t                 fastsetSize;
179    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
180#endif
181
182    tr_peer *              peer;
183
184    tr_torrent *           torrent;
185
186    tr_peer_callback      * callback;
187    void                  * callbackData;
188
189    struct evbuffer *      outMessages; /* all the non-piece messages */
190
191    struct peer_request    peerAskedFor[REQQ];
192
193    int                    peerAskedForMetadata[METADATA_REQQ];
194    int                    peerAskedForMetadataCount;
195
196    tr_pex               * pex;
197    tr_pex               * pex6;
198
199    /*time_t                 clientSentPexAt;*/
200    time_t                 clientSentAnythingAt;
201
202    /* when we started batching the outMessages */
203    time_t                outMessagesBatchedAt;
204
205    struct tr_incoming    incoming;
206
207    /* if the peer supports the Extension Protocol in BEP 10 and
208       supplied a reqq argument, it's stored here. Otherwise, the
209       value is zero and should be ignored. */
210    int64_t               reqq;
211
212    struct event        * pexTimer;
213};
214
215/**
216***
217**/
218
219static inline tr_session*
220getSession( struct tr_peermsgs * msgs )
221{
222    return msgs->torrent->session;
223}
224
225/**
226***
227**/
228
229static void
230myDebug( const char * file, int line,
231         const struct tr_peermsgs * msgs,
232         const char * fmt, ... )
233{
234    FILE * fp = tr_getLog( );
235
236    if( fp )
237    {
238        va_list           args;
239        char              timestr[64];
240        struct evbuffer * buf = evbuffer_new( );
241        char *            base = tr_basename( file );
242
243        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
244                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
245                             tr_torrentName( msgs->torrent ),
246                             tr_peerIoGetAddrStr( msgs->peer->io ),
247                             msgs->peer->client );
248        va_start( args, fmt );
249        evbuffer_add_vprintf( buf, fmt, args );
250        va_end( args );
251        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
252        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
253
254        tr_free( base );
255        evbuffer_free( buf );
256    }
257}
258
259#define dbgmsg( msgs, ... ) \
260    do { \
261        if( tr_deepLoggingIsActive( ) ) \
262            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
263    } while( 0 )
264
265/**
266***
267**/
268
269static void
270pokeBatchPeriod( tr_peermsgs * msgs, int interval )
271{
272    if( msgs->outMessagesBatchPeriod > interval )
273    {
274        msgs->outMessagesBatchPeriod = interval;
275        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
276    }
277}
278
279static void
280dbgOutMessageLen( tr_peermsgs * msgs )
281{
282    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
283}
284
285static void
286protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
287{
288    struct evbuffer * out = msgs->outMessages;
289
290    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
291
292    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
293    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
294    evbuffer_add_uint32( out, req->index );
295    evbuffer_add_uint32( out, req->offset );
296    evbuffer_add_uint32( out, req->length );
297
298    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
299    dbgOutMessageLen( msgs );
300}
301
302static void
303protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
304{
305    struct evbuffer * out = msgs->outMessages;
306
307    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
308    evbuffer_add_uint8 ( out, BT_REQUEST );
309    evbuffer_add_uint32( out, req->index );
310    evbuffer_add_uint32( out, req->offset );
311    evbuffer_add_uint32( out, req->length );
312
313    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
314    dbgOutMessageLen( msgs );
315    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
316}
317
318static void
319protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
320{
321    struct evbuffer * out = msgs->outMessages;
322
323    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
324    evbuffer_add_uint8 ( out, BT_CANCEL );
325    evbuffer_add_uint32( out, req->index );
326    evbuffer_add_uint32( out, req->offset );
327    evbuffer_add_uint32( out, req->length );
328
329    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
330    dbgOutMessageLen( msgs );
331    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
332}
333
334static void
335protocolSendPort(tr_peermsgs *msgs, uint16_t port)
336{
337    struct evbuffer * out = msgs->outMessages;
338
339    dbgmsg( msgs, "sending Port %u", port);
340    evbuffer_add_uint32( out, 3 );
341    evbuffer_add_uint8 ( out, BT_PORT );
342    evbuffer_add_uint16( out, port);
343}
344
345static void
346protocolSendHave( tr_peermsgs * msgs, uint32_t index )
347{
348    struct evbuffer * out = msgs->outMessages;
349
350    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
351    evbuffer_add_uint8 ( out, BT_HAVE );
352    evbuffer_add_uint32( out, index );
353
354    dbgmsg( msgs, "sending Have %u", index );
355    dbgOutMessageLen( msgs );
356    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
357}
358
359#if 0
360static void
361protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
362{
363    tr_peerIo       * io  = msgs->peer->io;
364    struct evbuffer * out = msgs->outMessages;
365
366    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
367
368    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
369    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
370    evbuffer_add_uint32( io, out, pieceIndex );
371
372    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
373    dbgOutMessageLen( msgs );
374}
375#endif
376
377static void
378protocolSendChoke( tr_peermsgs * msgs, int choke )
379{
380    struct evbuffer * out = msgs->outMessages;
381
382    evbuffer_add_uint32( out, sizeof( uint8_t ) );
383    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
384
385    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
386    dbgOutMessageLen( msgs );
387    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
388}
389
390static void
391protocolSendHaveAll( tr_peermsgs * msgs )
392{
393    struct evbuffer * out = msgs->outMessages;
394
395    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
396
397    evbuffer_add_uint32( out, sizeof( uint8_t ) );
398    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
399
400    dbgmsg( msgs, "sending HAVE_ALL..." );
401    dbgOutMessageLen( msgs );
402    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
403}
404
405static void
406protocolSendHaveNone( tr_peermsgs * msgs )
407{
408    struct evbuffer * out = msgs->outMessages;
409
410    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
411
412    evbuffer_add_uint32( out, sizeof( uint8_t ) );
413    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
414
415    dbgmsg( msgs, "sending HAVE_NONE..." );
416    dbgOutMessageLen( msgs );
417    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
418}
419
420/**
421***  EVENTS
422**/
423
424static void
425publish( tr_peermsgs * msgs, tr_peer_event * e )
426{
427    assert( msgs->peer );
428    assert( msgs->peer->msgs == msgs );
429
430    if( msgs->callback != NULL )
431        msgs->callback( msgs->peer, e, msgs->callbackData );
432}
433
434static void
435fireError( tr_peermsgs * msgs, int err )
436{
437    tr_peer_event e = TR_PEER_EVENT_INIT;
438    e.eventType = TR_PEER_ERROR;
439    e.err = err;
440    publish( msgs, &e );
441}
442
443static void
444fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
445{
446    tr_peer_event e = TR_PEER_EVENT_INIT;
447    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
448    e.pieceIndex = req->index;
449    e.offset = req->offset;
450    e.length = req->length;
451    publish( msgs, &e );
452}
453
454static void
455fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
456{
457    tr_peer_event e = TR_PEER_EVENT_INIT;
458    e.eventType = TR_PEER_CLIENT_GOT_REJ;
459    e.pieceIndex = req->index;
460    e.offset = req->offset;
461    e.length = req->length;
462    publish( msgs, &e );
463}
464
465static void
466fireGotChoke( tr_peermsgs * msgs )
467{
468    tr_peer_event e = TR_PEER_EVENT_INIT;
469    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
470    publish( msgs, &e );
471}
472
473static void
474fireClientGotHaveAll( tr_peermsgs * msgs )
475{
476    tr_peer_event e = TR_PEER_EVENT_INIT;
477    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
478    publish( msgs, &e );
479}
480
481static void
482fireClientGotHaveNone( tr_peermsgs * msgs )
483{
484    tr_peer_event e = TR_PEER_EVENT_INIT;
485    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
486    publish( msgs, &e );
487}
488
489static void
490fireClientGotData( tr_peermsgs * msgs, uint32_t length, int wasPieceData )
491{
492    tr_peer_event e = TR_PEER_EVENT_INIT;
493
494    e.length = length;
495    e.eventType = TR_PEER_CLIENT_GOT_DATA;
496    e.wasPieceData = wasPieceData;
497    publish( msgs, &e );
498}
499
500static void
501fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
502{
503    tr_peer_event e = TR_PEER_EVENT_INIT;
504    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
505    e.pieceIndex = pieceIndex;
506    publish( msgs, &e );
507}
508
509static void
510fireClientGotPort( tr_peermsgs * msgs, tr_port port )
511{
512    tr_peer_event e = TR_PEER_EVENT_INIT;
513    e.eventType = TR_PEER_CLIENT_GOT_PORT;
514    e.port = port;
515    publish( msgs, &e );
516}
517
518static void
519fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
520{
521    tr_peer_event e = TR_PEER_EVENT_INIT;
522    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
523    e.pieceIndex = pieceIndex;
524    publish( msgs, &e );
525}
526
527static void
528fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
529{
530    tr_peer_event e = TR_PEER_EVENT_INIT;
531    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
532    e.bitfield = bitfield;
533    publish( msgs, &e );
534}
535
536static void
537fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
538{
539    tr_peer_event e = TR_PEER_EVENT_INIT;
540    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
541    e.pieceIndex = index;
542    publish( msgs, &e );
543}
544
545static void
546firePeerGotData( tr_peermsgs * msgs, uint32_t length, bool wasPieceData )
547{
548    tr_peer_event e = TR_PEER_EVENT_INIT;
549
550    e.length = length;
551    e.eventType = TR_PEER_PEER_GOT_DATA;
552    e.wasPieceData = wasPieceData;
553
554    publish( msgs, &e );
555}
556
557/**
558***  ALLOWED FAST SET
559***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
560**/
561
562#if 0
563size_t
564tr_generateAllowedSet( tr_piece_index_t * setmePieces,
565                       size_t             desiredSetSize,
566                       size_t             pieceCount,
567                       const uint8_t    * infohash,
568                       const tr_address * addr )
569{
570    size_t setSize = 0;
571
572    assert( setmePieces );
573    assert( desiredSetSize <= pieceCount );
574    assert( desiredSetSize );
575    assert( pieceCount );
576    assert( infohash );
577    assert( addr );
578
579    if( addr->type == TR_AF_INET )
580    {
581        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
582        uint8_t x[SHA_DIGEST_LENGTH];
583
584        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
585        memcpy( w, &ui32, sizeof( uint32_t ) );
586        walk += sizeof( uint32_t );
587        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
588        walk += SHA_DIGEST_LENGTH;
589        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
590        assert( sizeof( w ) == walk-w );
591
592        while( setSize<desiredSetSize )
593        {
594            int i;
595            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
596            {
597                size_t k;
598                uint32_t j = i * 4;                                  /* (5) */
599                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
600                uint32_t index = y % pieceCount;                     /* (7) */
601
602                for( k=0; k<setSize; ++k )                           /* (8) */
603                    if( setmePieces[k] == index )
604                        break;
605
606                if( k == setSize )
607                    setmePieces[setSize++] = index;                  /* (9) */
608            }
609
610            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
611        }
612    }
613
614    return setSize;
615}
616
617static void
618updateFastSet( tr_peermsgs * msgs UNUSED )
619{
620    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
621    const int peerIsNeedy = msgs->peer->progress < 0.10;
622
623    if( fext && peerIsNeedy && !msgs->haveFastSet )
624    {
625        size_t i;
626        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
627        const tr_info * inf = &msgs->torrent->info;
628        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
629
630        /* build the fast set */
631        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
632        msgs->haveFastSet = 1;
633
634        /* send it to the peer */
635        for( i=0; i<msgs->fastsetSize; ++i )
636            protocolSendAllowedFast( msgs, msgs->fastset[i] );
637    }
638}
639#endif
640
641/**
642***  INTEREST
643**/
644
645static void
646sendInterest( tr_peermsgs * msgs, bool clientIsInterested )
647{
648    struct evbuffer * out = msgs->outMessages;
649
650    assert( msgs );
651    assert( tr_isBool( clientIsInterested ) );
652
653    msgs->peer->clientIsInterested = clientIsInterested;
654    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
655    evbuffer_add_uint32( out, sizeof( uint8_t ) );
656    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
657
658    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
659    dbgOutMessageLen( msgs );
660}
661
662static void
663updateInterest( tr_peermsgs * msgs UNUSED )
664{
665    /* FIXME -- might need to poke the mgr on startup */
666}
667
668void
669tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
670{
671    assert( tr_isBool( isInterested ) );
672
673    if( isInterested != msgs->peer->clientIsInterested )
674        sendInterest( msgs, isInterested );
675}
676
677static bool
678popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
679{
680    if( msgs->peerAskedForMetadataCount == 0 )
681        return false;
682
683    *piece = msgs->peerAskedForMetadata[0];
684
685    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
686                               msgs->peerAskedForMetadataCount-- );
687
688    return true;
689}
690
691static bool
692popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
693{
694    if( msgs->peer->pendingReqsToClient == 0 )
695        return false;
696
697    *setme = msgs->peerAskedFor[0];
698
699    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
700                               msgs->peer->pendingReqsToClient-- );
701
702    return true;
703}
704
705static void
706cancelAllRequestsToClient( tr_peermsgs * msgs )
707{
708    struct peer_request req;
709    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
710
711    while( popNextRequest( msgs, &req ))
712        if( mustSendCancel )
713            protocolSendReject( msgs, &req );
714}
715
716void
717tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
718{
719    const time_t now = tr_time( );
720    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
721
722    assert( msgs );
723    assert( msgs->peer );
724    assert( choke == 0 || choke == 1 );
725
726    if( msgs->peer->chokeChangedAt > fibrillationTime )
727    {
728        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
729    }
730    else if( msgs->peer->peerIsChoked != choke )
731    {
732        msgs->peer->peerIsChoked = choke;
733        if( choke )
734            cancelAllRequestsToClient( msgs );
735        protocolSendChoke( msgs, choke );
736        msgs->peer->chokeChangedAt = now;
737    }
738}
739
740/**
741***
742**/
743
744void
745tr_peerMsgsHave( tr_peermsgs * msgs, uint32_t index )
746{
747    protocolSendHave( msgs, index );
748
749    /* since we have more pieces now, we might not be interested in this peer */
750    updateInterest( msgs );
751}
752
753/**
754***
755**/
756
757static bool
758reqIsValid( const tr_peermsgs * peer,
759            uint32_t            index,
760            uint32_t            offset,
761            uint32_t            length )
762{
763    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
764}
765
766static bool
767requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
768{
769    return reqIsValid( msgs, req->index, req->offset, req->length );
770}
771
772void
773tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
774{
775    struct peer_request req;
776/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
777    blockToReq( msgs->torrent, block, &req );
778    protocolSendCancel( msgs, &req );
779}
780
781/**
782***
783**/
784
785static void
786sendLtepHandshake( tr_peermsgs * msgs )
787{
788    tr_benc val, *m;
789    char * buf;
790    int len;
791    bool allow_pex;
792    bool allow_metadata_xfer;
793    struct evbuffer * out = msgs->outMessages;
794    const unsigned char * ipv6 = tr_globalIPv6();
795
796    if( msgs->clientSentLtepHandshake )
797        return;
798
799    dbgmsg( msgs, "sending an ltep handshake" );
800    msgs->clientSentLtepHandshake = 1;
801
802    /* decide if we want to advertise metadata xfer support (BEP 9) */
803    if( tr_torrentIsPrivate( msgs->torrent ) )
804        allow_metadata_xfer = 0;
805    else
806        allow_metadata_xfer = 1;
807
808    /* decide if we want to advertise pex support */
809    if( !tr_torrentAllowsPex( msgs->torrent ) )
810        allow_pex = 0;
811    else if( msgs->peerSentLtepHandshake )
812        allow_pex = msgs->peerSupportsPex ? 1 : 0;
813    else
814        allow_pex = 1;
815
816    tr_bencInitDict( &val, 8 );
817    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
818    if( ipv6 != NULL )
819        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
820    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
821                            && ( msgs->torrent->infoDictLength > 0 ) )
822        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
823    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
824    tr_bencDictAddInt( &val, "reqq", REQQ );
825    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
826    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
827    m  = tr_bencDictAddDict( &val, "m", 2 );
828    if( allow_metadata_xfer )
829        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
830    if( allow_pex )
831        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
832
833    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
834
835    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
836    evbuffer_add_uint8 ( out, BT_LTEP );
837    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
838    evbuffer_add       ( out, buf, len );
839    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
840    dbgOutMessageLen( msgs );
841
842    /* cleanup */
843    tr_bencFree( &val );
844    tr_free( buf );
845}
846
847static void
848parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
849{
850    int64_t   i;
851    tr_benc   val, * sub;
852    uint8_t * tmp = tr_new( uint8_t, len );
853    const uint8_t *addr;
854    size_t addr_len;
855    tr_pex pex;
856    int8_t seedProbability = -1;
857
858    memset( &pex, 0, sizeof( tr_pex ) );
859
860    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
861    msgs->peerSentLtepHandshake = 1;
862
863    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
864    {
865        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
866        tr_free( tmp );
867        return;
868    }
869
870    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
871
872    /* does the peer prefer encrypted connections? */
873    if( tr_bencDictFindInt( &val, "e", &i ) ) {
874        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
875                                              : ENCRYPTION_PREFERENCE_NO;
876        if( i )
877            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
878    }
879
880    /* check supported messages for utorrent pex */
881    msgs->peerSupportsPex = 0;
882    msgs->peerSupportsMetadataXfer = 0;
883
884    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
885        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
886            msgs->peerSupportsPex = i != 0;
887            msgs->ut_pex_id = (uint8_t) i;
888            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
889        }
890        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
891            msgs->peerSupportsMetadataXfer = i != 0;
892            msgs->ut_metadata_id = (uint8_t) i;
893            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
894        }
895        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
896            /* Mysterious µTorrent extension that we don't grok.  However,
897               it implies support for µTP, so use it to indicate that. */
898            tr_peerMgrSetUtpFailed( msgs->torrent,
899                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
900                                    false );
901        }
902    }
903
904    /* look for metainfo size (BEP 9) */
905    if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) {
906        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
907        msgs->metadata_size_hint = (size_t) i;
908    }
909
910    /* look for upload_only (BEP 21) */
911    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
912        seedProbability = i==0 ? 0 : 100;
913
914    /* get peer's listening port */
915    if( tr_bencDictFindInt( &val, "p", &i ) ) {
916        pex.port = htons( (uint16_t)i );
917        fireClientGotPort( msgs, pex.port );
918        dbgmsg( msgs, "peer's port is now %d", (int)i );
919    }
920
921    if( tr_peerIoIsIncoming( msgs->peer->io )
922        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
923        && ( addr_len == 4 ) )
924    {
925        pex.addr.type = TR_AF_INET;
926        memcpy( &pex.addr.addr.addr4, addr, 4 );
927        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
928    }
929
930    if( tr_peerIoIsIncoming( msgs->peer->io )
931        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
932        && ( addr_len == 16 ) )
933    {
934        pex.addr.type = TR_AF_INET6;
935        memcpy( &pex.addr.addr.addr6, addr, 16 );
936        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
937    }
938
939    /* get peer's maximum request queue size */
940    if( tr_bencDictFindInt( &val, "reqq", &i ) )
941        msgs->reqq = i;
942
943    tr_bencFree( &val );
944    tr_free( tmp );
945}
946
947static void
948parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
949{
950    tr_benc dict;
951    char * msg_end;
952    char * benc_end;
953    int64_t msg_type = -1;
954    int64_t piece = -1;
955    int64_t total_size = 0;
956    uint8_t * tmp = tr_new( uint8_t, msglen );
957
958    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
959    msg_end = (char*)tmp + msglen;
960
961    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
962    {
963        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
964        tr_bencDictFindInt( &dict, "piece", &piece );
965        tr_bencDictFindInt( &dict, "total_size", &total_size );
966        tr_bencFree( &dict );
967    }
968
969    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
970            (int)msg_type, (int)piece, (int)total_size );
971
972    if( msg_type == METADATA_MSG_TYPE_REJECT )
973    {
974        /* NOOP */
975    }
976
977    if( ( msg_type == METADATA_MSG_TYPE_DATA )
978        && ( !tr_torrentHasMetadata( msgs->torrent ) )
979        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
980        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
981    {
982        const int pieceLen = msg_end - benc_end;
983        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
984    }
985
986    if( msg_type == METADATA_MSG_TYPE_REQUEST )
987    {
988        if( ( piece >= 0 )
989            && tr_torrentHasMetadata( msgs->torrent )
990            && !tr_torrentIsPrivate( msgs->torrent )
991            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
992        {
993            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
994        }
995        else
996        {
997            tr_benc tmp;
998            int payloadLen;
999            char * payload;
1000            struct evbuffer * out = msgs->outMessages;
1001
1002            /* build the rejection message */
1003            tr_bencInitDict( &tmp, 2 );
1004            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1005            tr_bencDictAddInt( &tmp, "piece", piece );
1006            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1007            tr_bencFree( &tmp );
1008
1009            /* write it out as a LTEP message to our outMessages buffer */
1010            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1011            evbuffer_add_uint8 ( out, BT_LTEP );
1012            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1013            evbuffer_add       ( out, payload, payloadLen );
1014            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1015            dbgOutMessageLen( msgs );
1016
1017            tr_free( payload );
1018        }
1019    }
1020
1021    tr_free( tmp );
1022}
1023
1024static void
1025parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1026{
1027    int loaded = 0;
1028    uint8_t * tmp = tr_new( uint8_t, msglen );
1029    tr_benc val;
1030    tr_torrent * tor = msgs->torrent;
1031    const uint8_t * added;
1032    size_t added_len;
1033
1034    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1035
1036    if( tr_torrentAllowsPex( tor )
1037      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1038    {
1039        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1040        {
1041            tr_pex * pex;
1042            size_t i, n;
1043            size_t added_f_len = 0;
1044            const uint8_t * added_f = NULL;
1045
1046            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1047            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1048
1049            n = MIN( n, MAX_PEX_PEER_COUNT );
1050            for( i=0; i<n; ++i )
1051            {
1052                int seedProbability = -1;
1053                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1054                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1055            }
1056
1057            tr_free( pex );
1058        }
1059
1060        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1061        {
1062            tr_pex * pex;
1063            size_t i, n;
1064            size_t added_f_len = 0;
1065            const uint8_t * added_f = NULL;
1066
1067            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1068            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1069
1070            n = MIN( n, MAX_PEX_PEER_COUNT );
1071            for( i=0; i<n; ++i )
1072            {
1073                int seedProbability = -1;
1074                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1075                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1076            }
1077
1078            tr_free( pex );
1079        }
1080    }
1081
1082    if( loaded )
1083        tr_bencFree( &val );
1084    tr_free( tmp );
1085}
1086
1087static void sendPex( tr_peermsgs * msgs );
1088
1089static void
1090parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1091{
1092    uint8_t ltep_msgid;
1093
1094    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1095    msglen--;
1096
1097    if( ltep_msgid == LTEP_HANDSHAKE )
1098    {
1099        dbgmsg( msgs, "got ltep handshake" );
1100        parseLtepHandshake( msgs, msglen, inbuf );
1101        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1102        {
1103            sendLtepHandshake( msgs );
1104            sendPex( msgs );
1105        }
1106    }
1107    else if( ltep_msgid == UT_PEX_ID )
1108    {
1109        dbgmsg( msgs, "got ut pex" );
1110        msgs->peerSupportsPex = 1;
1111        parseUtPex( msgs, msglen, inbuf );
1112    }
1113    else if( ltep_msgid == UT_METADATA_ID )
1114    {
1115        dbgmsg( msgs, "got ut metadata" );
1116        msgs->peerSupportsMetadataXfer = 1;
1117        parseUtMetadata( msgs, msglen, inbuf );
1118    }
1119    else
1120    {
1121        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1122        evbuffer_drain( inbuf, msglen );
1123    }
1124}
1125
1126static int
1127readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1128{
1129    uint32_t len;
1130
1131    if( inlen < sizeof( len ) )
1132        return READ_LATER;
1133
1134    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1135
1136    if( len == 0 ) /* peer sent us a keepalive message */
1137        dbgmsg( msgs, "got KeepAlive" );
1138    else
1139    {
1140        msgs->incoming.length = len;
1141        msgs->state = AWAITING_BT_ID;
1142    }
1143
1144    return READ_NOW;
1145}
1146
1147static int readBtMessage( tr_peermsgs *, struct evbuffer *, size_t );
1148
1149static int
1150readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1151{
1152    uint8_t id;
1153
1154    if( inlen < sizeof( uint8_t ) )
1155        return READ_LATER;
1156
1157    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1158    msgs->incoming.id = id;
1159    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1160
1161    if( id == BT_PIECE )
1162    {
1163        msgs->state = AWAITING_BT_PIECE;
1164        return READ_NOW;
1165    }
1166    else if( msgs->incoming.length != 1 )
1167    {
1168        msgs->state = AWAITING_BT_MESSAGE;
1169        return READ_NOW;
1170    }
1171    else return readBtMessage( msgs, inbuf, inlen - 1 );
1172}
1173
1174static void
1175updatePeerProgress( tr_peermsgs * msgs )
1176{
1177    tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1178
1179    /*updateFastSet( msgs );*/
1180    updateInterest( msgs );
1181}
1182
1183static void
1184prefetchPieces( tr_peermsgs *msgs )
1185{
1186    int i;
1187
1188    if( !getSession(msgs)->isPrefetchEnabled )
1189        return;
1190
1191    /* Maintain 12 prefetched blocks per unchoked peer */
1192    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1193    {
1194        const struct peer_request * req = msgs->peerAskedFor + i;
1195        if( requestIsValid( msgs, req ) )
1196        {
1197            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1198            ++msgs->prefetchCount;
1199        }
1200    }
1201}
1202
1203static void
1204peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1205{
1206    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1207    const int reqIsValid = requestIsValid( msgs, req );
1208    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1209    const int peerIsChoked = msgs->peer->peerIsChoked;
1210
1211    int allow = false;
1212
1213    if( !reqIsValid )
1214        dbgmsg( msgs, "rejecting an invalid request." );
1215    else if( !clientHasPiece )
1216        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1217    else if( peerIsChoked )
1218        dbgmsg( msgs, "rejecting request from choked peer" );
1219    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1220        dbgmsg( msgs, "rejecting request ... reqq is full" );
1221    else
1222        allow = true;
1223
1224    if( allow ) {
1225        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1226        prefetchPieces( msgs );
1227    } else if( fext ) {
1228        protocolSendReject( msgs, req );
1229    }
1230}
1231
1232static bool
1233messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1234{
1235    switch( id )
1236    {
1237        case BT_CHOKE:
1238        case BT_UNCHOKE:
1239        case BT_INTERESTED:
1240        case BT_NOT_INTERESTED:
1241        case BT_FEXT_HAVE_ALL:
1242        case BT_FEXT_HAVE_NONE:
1243            return len == 1;
1244
1245        case BT_HAVE:
1246        case BT_FEXT_SUGGEST:
1247        case BT_FEXT_ALLOWED_FAST:
1248            return len == 5;
1249
1250        case BT_BITFIELD:
1251            if( tr_torrentHasMetadata( msg->torrent ) )
1252                return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1253            /* we don't know the piece count yet,
1254               so we can only guess whether to send true or false */
1255            if( msg->metadata_size_hint > 0 )
1256                return len <= msg->metadata_size_hint;
1257            return true;
1258
1259        case BT_REQUEST:
1260        case BT_CANCEL:
1261        case BT_FEXT_REJECT:
1262            return len == 13;
1263
1264        case BT_PIECE:
1265            return len > 9 && len <= 16393;
1266
1267        case BT_PORT:
1268            return len == 3;
1269
1270        case BT_LTEP:
1271            return len >= 2;
1272
1273        default:
1274            return false;
1275    }
1276}
1277
1278static int clientGotBlock( tr_peermsgs *               msgs,
1279                           struct evbuffer *           block,
1280                           const struct peer_request * req );
1281
1282static int
1283readBtPiece( tr_peermsgs      * msgs,
1284             struct evbuffer  * inbuf,
1285             size_t             inlen,
1286             size_t           * setme_piece_bytes_read )
1287{
1288    struct peer_request * req = &msgs->incoming.blockReq;
1289
1290    assert( evbuffer_get_length( inbuf ) >= inlen );
1291    dbgmsg( msgs, "In readBtPiece" );
1292
1293    if( !req->length )
1294    {
1295        if( inlen < 8 )
1296            return READ_LATER;
1297
1298        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1299        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1300        req->length = msgs->incoming.length - 9;
1301        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1302        return READ_NOW;
1303    }
1304    else
1305    {
1306        int err;
1307
1308        /* read in another chunk of data */
1309        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1310        size_t n = MIN( nLeft, inlen );
1311        size_t i = n;
1312        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1313        const size_t buflen = SESSION_BUFFER_SIZE;
1314
1315        while( i > 0 )
1316        {
1317            const size_t thisPass = MIN( i, buflen );
1318            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1319            evbuffer_add( msgs->incoming.block, buf, thisPass );
1320            i -= thisPass;
1321        }
1322
1323        tr_sessionReleaseBuffer( getSession( msgs ) );
1324        buf = NULL;
1325
1326        fireClientGotData( msgs, n, true );
1327        *setme_piece_bytes_read += n;
1328        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1329               n, req->index, req->offset, req->length,
1330               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1331        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1332            return READ_LATER;
1333
1334        /* we've got the whole block ... process it */
1335        err = clientGotBlock( msgs, msgs->incoming.block, req );
1336
1337        /* cleanup */
1338        evbuffer_free( msgs->incoming.block );
1339        msgs->incoming.block = evbuffer_new( );
1340        req->length = 0;
1341        msgs->state = AWAITING_BT_LENGTH;
1342        return err ? READ_ERR : READ_NOW;
1343    }
1344}
1345
1346static void updateDesiredRequestCount( tr_peermsgs * msgs );
1347
1348static int
1349readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1350{
1351    uint32_t      ui32;
1352    uint32_t      msglen = msgs->incoming.length;
1353    const uint8_t id = msgs->incoming.id;
1354#ifndef NDEBUG
1355    const size_t  startBufLen = evbuffer_get_length( inbuf );
1356#endif
1357    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1358
1359    --msglen; /* id length */
1360
1361    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1362
1363    if( inlen < msglen )
1364        return READ_LATER;
1365
1366    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1367    {
1368        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1369        fireError( msgs, EMSGSIZE );
1370        return READ_ERR;
1371    }
1372
1373    switch( id )
1374    {
1375        case BT_CHOKE:
1376            dbgmsg( msgs, "got Choke" );
1377            msgs->peer->clientIsChoked = 1;
1378            if( !fext )
1379                fireGotChoke( msgs );
1380            break;
1381
1382        case BT_UNCHOKE:
1383            dbgmsg( msgs, "got Unchoke" );
1384            msgs->peer->clientIsChoked = 0;
1385            updateDesiredRequestCount( msgs );
1386            break;
1387
1388        case BT_INTERESTED:
1389            dbgmsg( msgs, "got Interested" );
1390            msgs->peer->peerIsInterested = 1;
1391            break;
1392
1393        case BT_NOT_INTERESTED:
1394            dbgmsg( msgs, "got Not Interested" );
1395            msgs->peer->peerIsInterested = 0;
1396            break;
1397
1398        case BT_HAVE:
1399            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1400            dbgmsg( msgs, "got Have: %u", ui32 );
1401            if( tr_torrentHasMetadata( msgs->torrent )
1402                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1403            {
1404                fireError( msgs, ERANGE );
1405                return READ_ERR;
1406            }
1407
1408            /* a peer can send the same HAVE message twice... */
1409            if( !tr_bitfieldHas( &msgs->peer->have, ui32 ) ) {
1410                tr_bitfieldAdd( &msgs->peer->have, ui32 );
1411                fireClientGotHave( msgs, ui32 );
1412            }
1413            updatePeerProgress( msgs );
1414            break;
1415
1416        case BT_BITFIELD: {
1417            uint8_t * tmp = tr_new( uint8_t, msglen );
1418            dbgmsg( msgs, "got a bitfield" );
1419            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1420            tr_bitfieldSetRaw( &msgs->peer->have, tmp, msglen );
1421            fireClientGotBitfield( msgs, &msgs->peer->have );
1422            updatePeerProgress( msgs );
1423            tr_free( tmp );
1424            break;
1425        }
1426
1427        case BT_REQUEST:
1428        {
1429            struct peer_request r;
1430            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1431            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1432            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1433            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1434            peerMadeRequest( msgs, &r );
1435            break;
1436        }
1437
1438        case BT_CANCEL:
1439        {
1440            int i;
1441            struct peer_request r;
1442            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1443            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1444            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1445            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1446            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1447
1448            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1449                const struct peer_request * req = msgs->peerAskedFor + i;
1450                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1451                    break;
1452            }
1453
1454            if( i < msgs->peer->pendingReqsToClient )
1455                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1456                                           msgs->peer->pendingReqsToClient-- );
1457            break;
1458        }
1459
1460        case BT_PIECE:
1461            assert( 0 ); /* handled elsewhere! */
1462            break;
1463
1464        case BT_PORT:
1465            dbgmsg( msgs, "Got a BT_PORT" );
1466            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1467            if( msgs->peer->dht_port > 0 )
1468                tr_dhtAddNode( getSession(msgs),
1469                               tr_peerAddress( msgs->peer ),
1470                               msgs->peer->dht_port, 0 );
1471            break;
1472
1473        case BT_FEXT_SUGGEST:
1474            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1475            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1476            if( fext )
1477                fireClientGotSuggest( msgs, ui32 );
1478            else {
1479                fireError( msgs, EMSGSIZE );
1480                return READ_ERR;
1481            }
1482            break;
1483
1484        case BT_FEXT_ALLOWED_FAST:
1485            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1486            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1487            if( fext )
1488                fireClientGotAllowedFast( msgs, ui32 );
1489            else {
1490                fireError( msgs, EMSGSIZE );
1491                return READ_ERR;
1492            }
1493            break;
1494
1495        case BT_FEXT_HAVE_ALL:
1496            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1497            if( fext ) {
1498                tr_bitfieldSetHasAll( &msgs->peer->have );
1499assert( tr_bitfieldHasAll( &msgs->peer->have ) );
1500                fireClientGotHaveAll( msgs );
1501                updatePeerProgress( msgs );
1502            } else {
1503                fireError( msgs, EMSGSIZE );
1504                return READ_ERR;
1505            }
1506            break;
1507
1508        case BT_FEXT_HAVE_NONE:
1509            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1510            if( fext ) {
1511                tr_bitfieldSetHasNone( &msgs->peer->have );
1512                fireClientGotHaveNone( msgs );
1513                updatePeerProgress( msgs );
1514            } else {
1515                fireError( msgs, EMSGSIZE );
1516                return READ_ERR;
1517            }
1518            break;
1519
1520        case BT_FEXT_REJECT:
1521        {
1522            struct peer_request r;
1523            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1524            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1525            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1526            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1527            if( fext )
1528                fireGotRej( msgs, &r );
1529            else {
1530                fireError( msgs, EMSGSIZE );
1531                return READ_ERR;
1532            }
1533            break;
1534        }
1535
1536        case BT_LTEP:
1537            dbgmsg( msgs, "Got a BT_LTEP" );
1538            parseLtep( msgs, msglen, inbuf );
1539            break;
1540
1541        default:
1542            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1543            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1544            break;
1545    }
1546
1547    assert( msglen + 1 == msgs->incoming.length );
1548    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1549
1550    msgs->state = AWAITING_BT_LENGTH;
1551    return READ_NOW;
1552}
1553
1554/* returns 0 on success, or an errno on failure */
1555static int
1556clientGotBlock( tr_peermsgs                * msgs,
1557                struct evbuffer            * data,
1558                const struct peer_request  * req )
1559{
1560    int err;
1561    tr_torrent * tor = msgs->torrent;
1562    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1563
1564    assert( msgs );
1565    assert( req );
1566
1567    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1568        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1569                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1570        return EMSGSIZE;
1571    }
1572
1573    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1574
1575    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1576        dbgmsg( msgs, "we didn't ask for this message..." );
1577        return 0;
1578    }
1579    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1580        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1581        return 0;
1582    }
1583
1584    /**
1585    ***  Save the block
1586    **/
1587
1588    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1589        return err;
1590
1591    tr_bitfieldAdd( &msgs->peer->blame, req->index );
1592    fireGotBlock( msgs, req );
1593    return 0;
1594}
1595
1596static int peerPulse( void * vmsgs );
1597
1598static void
1599didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1600{
1601    tr_peermsgs * msgs = vmsgs;
1602    firePeerGotData( msgs, bytesWritten, wasPieceData );
1603
1604    if ( tr_isPeerIo( io ) && io->userData )
1605        peerPulse( msgs );
1606}
1607
1608static ReadState
1609canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1610{
1611    ReadState         ret;
1612    tr_peermsgs *     msgs = vmsgs;
1613    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1614    const size_t      inlen = evbuffer_get_length( in );
1615
1616    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1617
1618    if( !inlen )
1619    {
1620        ret = READ_LATER;
1621    }
1622    else if( msgs->state == AWAITING_BT_PIECE )
1623    {
1624        ret = readBtPiece( msgs, in, inlen, piece );
1625    }
1626    else switch( msgs->state )
1627    {
1628        case AWAITING_BT_LENGTH:
1629            ret = readBtLength ( msgs, in, inlen ); break;
1630
1631        case AWAITING_BT_ID:
1632            ret = readBtId     ( msgs, in, inlen ); break;
1633
1634        case AWAITING_BT_MESSAGE:
1635            ret = readBtMessage( msgs, in, inlen ); break;
1636
1637        default:
1638            ret = READ_ERR;
1639            assert( 0 );
1640    }
1641
1642    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1643
1644    /* log the raw data that was read */
1645    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1646        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), false );
1647
1648    return ret;
1649}
1650
1651int
1652tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1653{
1654    if( msgs->state != AWAITING_BT_PIECE )
1655        return false;
1656
1657    return block == _tr_block( msgs->torrent,
1658                               msgs->incoming.blockReq.index,
1659                               msgs->incoming.blockReq.offset );
1660}
1661
1662/**
1663***
1664**/
1665
1666static void
1667updateDesiredRequestCount( tr_peermsgs * msgs )
1668{
1669    const tr_torrent * const torrent = msgs->torrent;
1670
1671    if( tr_torrentIsSeed( msgs->torrent ) )
1672    {
1673        msgs->desiredRequestCount = 0;
1674    }
1675    else if( msgs->peer->clientIsChoked )
1676    {
1677        msgs->desiredRequestCount = 0;
1678    }
1679    else if( !msgs->peer->clientIsInterested )
1680    {
1681        msgs->desiredRequestCount = 0;
1682    }
1683    else
1684    {
1685        int estimatedBlocksInPeriod;
1686        int rate_Bps;
1687        int irate_Bps;
1688        const int floor = 4;
1689        const int seconds = REQUEST_BUF_SECS;
1690        const uint64_t now = tr_time_msec( );
1691
1692        /* Get the rate limit we should use.
1693         * FIXME: this needs to consider all the other peers as well... */
1694        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1695        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1696            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1697
1698        /* honor the session limits, if enabled */
1699        if( tr_torrentUsesSessionLimits( torrent ) )
1700            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1701                rate_Bps = MIN( rate_Bps, irate_Bps );
1702
1703        /* use this desired rate to figure out how
1704         * many requests we should send to this peer */
1705        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1706        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1707
1708        /* honor the peer's maximum request count, if specified */
1709        if( msgs->reqq > 0 )
1710            if( msgs->desiredRequestCount > msgs->reqq )
1711                msgs->desiredRequestCount = msgs->reqq;
1712    }
1713}
1714
1715static void
1716updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1717{
1718    int piece;
1719
1720    if( msgs->peerSupportsMetadataXfer
1721        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1722    {
1723        tr_benc tmp;
1724        int payloadLen;
1725        char * payload;
1726        struct evbuffer * out = msgs->outMessages;
1727
1728        /* build the data message */
1729        tr_bencInitDict( &tmp, 3 );
1730        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1731        tr_bencDictAddInt( &tmp, "piece", piece );
1732        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1733        tr_bencFree( &tmp );
1734
1735        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1736
1737        /* write it out as a LTEP message to our outMessages buffer */
1738        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1739        evbuffer_add_uint8 ( out, BT_LTEP );
1740        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1741        evbuffer_add       ( out, payload, payloadLen );
1742        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1743        dbgOutMessageLen( msgs );
1744
1745        tr_free( payload );
1746    }
1747}
1748
1749static void
1750updateBlockRequests( tr_peermsgs * msgs )
1751{
1752    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1753        && ( msgs->desiredRequestCount > 0 )
1754        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1755    {
1756        int i;
1757        int n;
1758        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1759        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1760
1761        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1762
1763        for( i=0; i<n; ++i )
1764        {
1765            struct peer_request req;
1766            blockToReq( msgs->torrent, blocks[i], &req );
1767            protocolSendRequest( msgs, &req );
1768        }
1769
1770        tr_free( blocks );
1771    }
1772}
1773
1774static size_t
1775fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1776{
1777    int piece;
1778    size_t bytesWritten = 0;
1779    struct peer_request req;
1780    const bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1781    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1782
1783    /**
1784    ***  Protocol messages
1785    **/
1786
1787    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1788    {
1789        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1790        msgs->outMessagesBatchedAt = now;
1791    }
1792    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1793    {
1794        const size_t len = evbuffer_get_length( msgs->outMessages );
1795        /* flush the protocol messages */
1796        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1797        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, false );
1798        msgs->clientSentAnythingAt = now;
1799        msgs->outMessagesBatchedAt = 0;
1800        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1801        bytesWritten +=  len;
1802    }
1803
1804    /**
1805    ***  Metadata Pieces
1806    **/
1807
1808    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1809        && popNextMetadataRequest( msgs, &piece ) )
1810    {
1811        char * data;
1812        int dataLen;
1813        bool ok = false;
1814
1815        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1816        if( ( dataLen > 0 ) && ( data != NULL ) )
1817        {
1818            tr_benc tmp;
1819            int payloadLen;
1820            char * payload;
1821            struct evbuffer * out = msgs->outMessages;
1822
1823            /* build the data message */
1824            tr_bencInitDict( &tmp, 3 );
1825            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1826            tr_bencDictAddInt( &tmp, "piece", piece );
1827            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1828            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1829            tr_bencFree( &tmp );
1830
1831            /* write it out as a LTEP message to our outMessages buffer */
1832            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1833            evbuffer_add_uint8 ( out, BT_LTEP );
1834            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1835            evbuffer_add       ( out, payload, payloadLen );
1836            evbuffer_add       ( out, data, dataLen );
1837            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1838            dbgOutMessageLen( msgs );
1839
1840            tr_free( payload );
1841            tr_free( data );
1842
1843            ok = true;
1844        }
1845
1846        if( !ok ) /* send a rejection message */
1847        {
1848            tr_benc tmp;
1849            int payloadLen;
1850            char * payload;
1851            struct evbuffer * out = msgs->outMessages;
1852
1853            /* build the rejection message */
1854            tr_bencInitDict( &tmp, 2 );
1855            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1856            tr_bencDictAddInt( &tmp, "piece", piece );
1857            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1858            tr_bencFree( &tmp );
1859
1860            /* write it out as a LTEP message to our outMessages buffer */
1861            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1862            evbuffer_add_uint8 ( out, BT_LTEP );
1863            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1864            evbuffer_add       ( out, payload, payloadLen );
1865            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1866            dbgOutMessageLen( msgs );
1867
1868            tr_free( payload );
1869        }
1870    }
1871
1872    /**
1873    ***  Data Blocks
1874    **/
1875
1876    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1877        && popNextRequest( msgs, &req ) )
1878    {
1879        --msgs->prefetchCount;
1880
1881        if( requestIsValid( msgs, &req )
1882            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1883        {
1884            int err;
1885            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1886            struct evbuffer * out;
1887            struct evbuffer_iovec iovec[1];
1888
1889            out = evbuffer_new( );
1890            evbuffer_expand( out, msglen );
1891
1892            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1893            evbuffer_add_uint8 ( out, BT_PIECE );
1894            evbuffer_add_uint32( out, req.index );
1895            evbuffer_add_uint32( out, req.offset );
1896
1897            evbuffer_reserve_space( out, req.length, iovec, 1 );
1898            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1899            iovec[0].iov_len = req.length;
1900            evbuffer_commit_space( out, iovec, 1 );
1901
1902            /* check the piece if it needs checking... */
1903            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1904                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1905                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1906
1907            if( err )
1908            {
1909                if( fext )
1910                    protocolSendReject( msgs, &req );
1911            }
1912            else
1913            {
1914                const size_t n = evbuffer_get_length( out );
1915                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1916                assert( n == msglen );
1917                tr_peerIoWriteBuf( msgs->peer->io, out, true );
1918                bytesWritten += n;
1919                msgs->clientSentAnythingAt = now;
1920                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1921            }
1922
1923            evbuffer_free( out );
1924
1925            if( err )
1926            {
1927                bytesWritten = 0;
1928                msgs = NULL;
1929            }
1930        }
1931        else if( fext ) /* peer needs a reject message */
1932        {
1933            protocolSendReject( msgs, &req );
1934        }
1935
1936        if( msgs != NULL )
1937            prefetchPieces( msgs );
1938    }
1939
1940    /**
1941    ***  Keepalive
1942    **/
1943
1944    if( ( msgs != NULL )
1945        && ( msgs->clientSentAnythingAt != 0 )
1946        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1947    {
1948        dbgmsg( msgs, "sending a keepalive message" );
1949        evbuffer_add_uint32( msgs->outMessages, 0 );
1950        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1951    }
1952
1953    return bytesWritten;
1954}
1955
1956static int
1957peerPulse( void * vmsgs )
1958{
1959    tr_peermsgs * msgs = vmsgs;
1960    const time_t  now = tr_time( );
1961
1962    if ( tr_isPeerIo( msgs->peer->io ) ) {
1963        updateDesiredRequestCount( msgs );
1964        updateBlockRequests( msgs );
1965        updateMetadataRequests( msgs, now );
1966    }
1967
1968    for( ;; )
1969        if( fillOutputBuffer( msgs, now ) < 1 )
1970            break;
1971
1972    return true; /* loop forever */
1973}
1974
1975void
1976tr_peerMsgsPulse( tr_peermsgs * msgs )
1977{
1978    if( msgs != NULL )
1979        peerPulse( msgs );
1980}
1981
1982static void
1983gotError( tr_peerIo * io UNUSED, short what, void * vmsgs )
1984{
1985    if( what & BEV_EVENT_TIMEOUT )
1986        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1987    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
1988        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1989               what, errno, tr_strerror( errno ) );
1990    fireError( vmsgs, ENOTCONN );
1991}
1992
1993static void
1994sendBitfield( tr_peermsgs * msgs )
1995{
1996    size_t byte_count = 0;
1997    struct evbuffer * out = msgs->outMessages;
1998    void * bytes = tr_cpCreatePieceBitfield( &msgs->torrent->completion, &byte_count );
1999
2000    evbuffer_add_uint32( out, sizeof( uint8_t ) + byte_count );
2001    evbuffer_add_uint8 ( out, BT_BITFIELD );
2002    evbuffer_add       ( out, bytes, byte_count );
2003    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
2004    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2005
2006    tr_free( bytes );
2007}
2008
2009static void
2010tellPeerWhatWeHave( tr_peermsgs * msgs )
2011{
2012    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2013
2014    if( fext && tr_cpHasAll( &msgs->torrent->completion ) )
2015    {
2016        protocolSendHaveAll( msgs );
2017    }
2018    else if( fext && tr_cpHasNone( &msgs->torrent->completion ) )
2019    {
2020        protocolSendHaveNone( msgs );
2021    }
2022    else if( !tr_cpHasNone( &msgs->torrent->completion ) )
2023    {
2024        sendBitfield( msgs );
2025    }
2026}
2027
2028/**
2029***
2030**/
2031
2032/* some peers give us error messages if we send
2033   more than this many peers in a single pex message
2034   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2035#define MAX_PEX_ADDED 50
2036#define MAX_PEX_DROPPED 50
2037
2038typedef struct
2039{
2040    tr_pex *  added;
2041    tr_pex *  dropped;
2042    tr_pex *  elements;
2043    int       addedCount;
2044    int       droppedCount;
2045    int       elementCount;
2046}
2047PexDiffs;
2048
2049static void
2050pexAddedCb( void * vpex, void * userData )
2051{
2052    PexDiffs * diffs = userData;
2053    tr_pex *   pex = vpex;
2054
2055    if( diffs->addedCount < MAX_PEX_ADDED )
2056    {
2057        diffs->added[diffs->addedCount++] = *pex;
2058        diffs->elements[diffs->elementCount++] = *pex;
2059    }
2060}
2061
2062static inline void
2063pexDroppedCb( void * vpex, void * userData )
2064{
2065    PexDiffs * diffs = userData;
2066    tr_pex *   pex = vpex;
2067
2068    if( diffs->droppedCount < MAX_PEX_DROPPED )
2069    {
2070        diffs->dropped[diffs->droppedCount++] = *pex;
2071    }
2072}
2073
2074static inline void
2075pexElementCb( void * vpex, void * userData )
2076{
2077    PexDiffs * diffs = userData;
2078    tr_pex * pex = vpex;
2079
2080    diffs->elements[diffs->elementCount++] = *pex;
2081}
2082
2083typedef void ( tr_set_func )( void * element, void * userData );
2084
2085/**
2086 * @brief find the differences and commonalities in two sorted sets
2087 * @param a the first set
2088 * @param aCount the number of elements in the set 'a'
2089 * @param b the second set
2090 * @param bCount the number of elements in the set 'b'
2091 * @param compare the sorting method for both sets
2092 * @param elementSize the sizeof the element in the two sorted sets
2093 * @param in_a called for items in set 'a' but not set 'b'
2094 * @param in_b called for items in set 'b' but not set 'a'
2095 * @param in_both called for items that are in both sets
2096 * @param userData user data passed along to in_a, in_b, and in_both
2097 */
2098static void
2099tr_set_compare( const void * va, size_t aCount,
2100                const void * vb, size_t bCount,
2101                int compare( const void * a, const void * b ),
2102                size_t elementSize,
2103                tr_set_func in_a_cb,
2104                tr_set_func in_b_cb,
2105                tr_set_func in_both_cb,
2106                void * userData )
2107{
2108    const uint8_t * a = va;
2109    const uint8_t * b = vb;
2110    const uint8_t * aend = a + elementSize * aCount;
2111    const uint8_t * bend = b + elementSize * bCount;
2112
2113    while( a != aend || b != bend )
2114    {
2115        if( a == aend )
2116        {
2117            ( *in_b_cb )( (void*)b, userData );
2118            b += elementSize;
2119        }
2120        else if( b == bend )
2121        {
2122            ( *in_a_cb )( (void*)a, userData );
2123            a += elementSize;
2124        }
2125        else
2126        {
2127            const int val = ( *compare )( a, b );
2128
2129            if( !val )
2130            {
2131                ( *in_both_cb )( (void*)a, userData );
2132                a += elementSize;
2133                b += elementSize;
2134            }
2135            else if( val < 0 )
2136            {
2137                ( *in_a_cb )( (void*)a, userData );
2138                a += elementSize;
2139            }
2140            else if( val > 0 )
2141            {
2142                ( *in_b_cb )( (void*)b, userData );
2143                b += elementSize;
2144            }
2145        }
2146    }
2147}
2148
2149
2150static void
2151sendPex( tr_peermsgs * msgs )
2152{
2153    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2154    {
2155        PexDiffs diffs;
2156        PexDiffs diffs6;
2157        tr_pex * newPex = NULL;
2158        tr_pex * newPex6 = NULL;
2159        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2160        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2161
2162        /* build the diffs */
2163        diffs.added = tr_new( tr_pex, newCount );
2164        diffs.addedCount = 0;
2165        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2166        diffs.droppedCount = 0;
2167        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2168        diffs.elementCount = 0;
2169        tr_set_compare( msgs->pex, msgs->pexCount,
2170                        newPex, newCount,
2171                        tr_pexCompare, sizeof( tr_pex ),
2172                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2173        diffs6.added = tr_new( tr_pex, newCount6 );
2174        diffs6.addedCount = 0;
2175        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2176        diffs6.droppedCount = 0;
2177        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2178        diffs6.elementCount = 0;
2179        tr_set_compare( msgs->pex6, msgs->pexCount6,
2180                        newPex6, newCount6,
2181                        tr_pexCompare, sizeof( tr_pex ),
2182                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2183        dbgmsg(
2184            msgs,
2185            "pex: old peer count %d+%d, new peer count %d+%d, "
2186            "added %d+%d, removed %d+%d",
2187            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2188            diffs.addedCount, diffs6.addedCount,
2189            diffs.droppedCount, diffs6.droppedCount );
2190
2191        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2192            !diffs6.droppedCount )
2193        {
2194            tr_free( diffs.elements );
2195            tr_free( diffs6.elements );
2196        }
2197        else
2198        {
2199            int  i;
2200            tr_benc val;
2201            char * benc;
2202            int bencLen;
2203            uint8_t * tmp, *walk;
2204            struct evbuffer * out = msgs->outMessages;
2205
2206            /* update peer */
2207            tr_free( msgs->pex );
2208            msgs->pex = diffs.elements;
2209            msgs->pexCount = diffs.elementCount;
2210            tr_free( msgs->pex6 );
2211            msgs->pex6 = diffs6.elements;
2212            msgs->pexCount6 = diffs6.elementCount;
2213
2214            /* build the pex payload */
2215            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2216                                         * speed vs. likelihood? */
2217
2218            if( diffs.addedCount > 0)
2219            {
2220                /* "added" */
2221                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2222                for( i = 0; i < diffs.addedCount; ++i ) {
2223                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2224                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2225                }
2226                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2227                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2228                tr_free( tmp );
2229
2230                /* "added.f"
2231                 * unset each holepunch flag because we don't support it. */
2232                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2233                for( i = 0; i < diffs.addedCount; ++i )
2234                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2235                assert( ( walk - tmp ) == diffs.addedCount );
2236                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2237                tr_free( tmp );
2238            }
2239
2240            if( diffs.droppedCount > 0 )
2241            {
2242                /* "dropped" */
2243                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2244                for( i = 0; i < diffs.droppedCount; ++i ) {
2245                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2246                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2247                }
2248                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2249                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2250                tr_free( tmp );
2251            }
2252
2253            if( diffs6.addedCount > 0 )
2254            {
2255                /* "added6" */
2256                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2257                for( i = 0; i < diffs6.addedCount; ++i ) {
2258                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2259                    walk += 16;
2260                    memcpy( walk, &diffs6.added[i].port, 2 );
2261                    walk += 2;
2262                }
2263                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2264                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2265                tr_free( tmp );
2266
2267                /* "added6.f"
2268                 * unset each holepunch flag because we don't support it. */
2269                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2270                for( i = 0; i < diffs6.addedCount; ++i )
2271                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2272                assert( ( walk - tmp ) == diffs6.addedCount );
2273                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2274                tr_free( tmp );
2275            }
2276
2277            if( diffs6.droppedCount > 0 )
2278            {
2279                /* "dropped6" */
2280                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2281                for( i = 0; i < diffs6.droppedCount; ++i ) {
2282                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2283                    walk += 16;
2284                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2285                    walk += 2;
2286                }
2287                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2288                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2289                tr_free( tmp );
2290            }
2291
2292            /* write the pex message */
2293            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2294            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2295            evbuffer_add_uint8 ( out, BT_LTEP );
2296            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2297            evbuffer_add       ( out, benc, bencLen );
2298            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2299            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2300            dbgOutMessageLen( msgs );
2301
2302            tr_free( benc );
2303            tr_bencFree( &val );
2304        }
2305
2306        /* cleanup */
2307        tr_free( diffs.added );
2308        tr_free( diffs.dropped );
2309        tr_free( newPex );
2310        tr_free( diffs6.added );
2311        tr_free( diffs6.dropped );
2312        tr_free( newPex6 );
2313
2314        /*msgs->clientSentPexAt = tr_time( );*/
2315    }
2316}
2317
2318static void
2319pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2320{
2321    struct tr_peermsgs * msgs = vmsgs;
2322
2323    sendPex( msgs );
2324
2325    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2326}
2327
2328/**
2329***
2330**/
2331
2332tr_peermsgs*
2333tr_peerMsgsNew( struct tr_torrent    * torrent,
2334                struct tr_peer       * peer,
2335                tr_peer_callback     * callback,
2336                void                 * callbackData )
2337{
2338    tr_peermsgs * m;
2339
2340    assert( peer );
2341    assert( peer->io );
2342
2343    m = tr_new0( tr_peermsgs, 1 );
2344    m->callback = callback;
2345    m->callbackData = callbackData;
2346    m->peer = peer;
2347    m->torrent = torrent;
2348    m->peer->clientIsChoked = 1;
2349    m->peer->peerIsChoked = 1;
2350    m->peer->clientIsInterested = 0;
2351    m->peer->peerIsInterested = 0;
2352    m->state = AWAITING_BT_LENGTH;
2353    m->outMessages = evbuffer_new( );
2354    m->outMessagesBatchedAt = 0;
2355    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2356    m->incoming.block = evbuffer_new( );
2357    m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2358    peer->msgs = m;
2359    tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2360
2361    if( tr_peerIoSupportsUTP( peer->io ) ) {
2362        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2363        tr_peerMgrSetUtpSupported( torrent, addr );
2364        tr_peerMgrSetUtpFailed( torrent, addr, false );
2365    }
2366
2367    if( tr_peerIoSupportsLTEP( peer->io ) )
2368        sendLtepHandshake( m );
2369
2370    tellPeerWhatWeHave( m );
2371
2372    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2373    {
2374        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2375        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2376        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2377            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2378        }
2379    }
2380
2381    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2382    updateDesiredRequestCount( m );
2383
2384    return m;
2385}
2386
2387void
2388tr_peerMsgsFree( tr_peermsgs* msgs )
2389{
2390    if( msgs )
2391    {
2392        event_free( msgs->pexTimer );
2393
2394        evbuffer_free( msgs->incoming.block );
2395        evbuffer_free( msgs->outMessages );
2396        tr_free( msgs->pex6 );
2397        tr_free( msgs->pex );
2398
2399        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2400        tr_free( msgs );
2401    }
2402}
Note: See TracBrowser for help on using the repository browser.