source: trunk/libtransmission/peer-msgs.c @ 12366

Last change on this file since 12366 was 12366, checked in by jordan, 11 years ago

(trunk libT) heap and event pruning: don't create evtimers for periodic pex messages if the torrent doesn't allow pex (such as, if it's on a private tracker).

Previously, we unconditionally created the evtimer, and then checked each time to see if pex was allowed.

  • Property svn:keywords set to Date Rev Author Id
File size: 71.0 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 12366 2011-04-17 05:55:46Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <alloca.h>
20
21#include <event2/buffer.h>
22#include <event2/bufferevent.h>
23#include <event2/event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "cache.h"
28#include "completion.h"
29#include "crypto.h" /* tr_sha1() */
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "session.h"
34#include "torrent.h"
35#include "torrent-magnet.h"
36#include "tr-dht.h"
37#include "utils.h"
38#include "version.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56
57    BT_FEXT_SUGGEST         = 13,
58    BT_FEXT_HAVE_ALL        = 14,
59    BT_FEXT_HAVE_NONE       = 15,
60    BT_FEXT_REJECT          = 16,
61    BT_FEXT_ALLOWED_FAST    = 17,
62
63    BT_LTEP                 = 20,
64
65    LTEP_HANDSHAKE          = 0,
66
67    UT_PEX_ID               = 1,
68    UT_METADATA_ID          = 3,
69
70    MAX_PEX_PEER_COUNT      = 50,
71
72    MIN_CHOKE_PERIOD_SEC    = 10,
73
74    /* idle seconds before we send a keepalive */
75    KEEPALIVE_INTERVAL_SECS = 100,
76
77    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
78
79    REQQ                    = 512,
80
81    METADATA_REQQ           = 64,
82
83    /* used in lowering the outMessages queue period */
84    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
85    HIGH_PRIORITY_INTERVAL_SECS = 2,
86    LOW_PRIORITY_INTERVAL_SECS = 10,
87
88    /* number of pieces we'll allow in our fast set */
89    MAX_FAST_SET_SIZE = 3,
90
91    /* defined in BEP #9 */
92    METADATA_MSG_TYPE_REQUEST = 0,
93    METADATA_MSG_TYPE_DATA = 1,
94    METADATA_MSG_TYPE_REJECT = 2
95};
96
97enum
98{
99    AWAITING_BT_LENGTH,
100    AWAITING_BT_ID,
101    AWAITING_BT_MESSAGE,
102    AWAITING_BT_PIECE
103};
104
105/**
106***
107**/
108
109struct peer_request
110{
111    uint32_t    index;
112    uint32_t    offset;
113    uint32_t    length;
114};
115
116static void
117blockToReq( const tr_torrent     * tor,
118            tr_block_index_t       block,
119            struct peer_request  * setme )
120{
121    tr_torrentGetBlockLocation( tor, block, &setme->index,
122                                            &setme->offset,
123                                            &setme->length );
124}
125
126/**
127***
128**/
129
130/* this is raw, unchanged data from the peer regarding
131 * the current message that it's sending us. */
132struct tr_incoming
133{
134    uint8_t                id;
135    uint32_t               length; /* includes the +1 for id length */
136    struct peer_request    blockReq; /* metadata for incoming blocks */
137    struct evbuffer *      block; /* piece data for incoming blocks */
138};
139
140/**
141 * Low-level communication state information about a connected peer.
142 *
143 * This structure remembers the low-level protocol states that we're
144 * in with this peer, such as active requests, pex messages, and so on.
145 * Its fields are all private to peer-msgs.c.
146 *
147 * Data not directly involved with sending & receiving messages is
148 * stored in tr_peer, where it can be accessed by both peermsgs and
149 * the peer manager.
150 *
151 * @see struct peer_atom
152 * @see tr_peer
153 */
154struct tr_peermsgs
155{
156    bool            peerSupportsPex;
157    bool            peerSupportsMetadataXfer;
158    bool            clientSentLtepHandshake;
159    bool            peerSentLtepHandshake;
160
161    /*bool          haveFastSet;*/
162
163    int             desiredRequestCount;
164
165    int             prefetchCount;
166
167    /* how long the outMessages batch should be allowed to grow before
168     * it's flushed -- some messages (like requests >:) should be sent
169     * very quickly; others aren't as urgent. */
170    int8_t          outMessagesBatchPeriod;
171
172    uint8_t         state;
173    uint8_t         ut_pex_id;
174    uint8_t         ut_metadata_id;
175    uint16_t        pexCount;
176    uint16_t        pexCount6;
177
178    size_t          metadata_size_hint;
179#if 0
180    size_t                 fastsetSize;
181    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
182#endif
183
184    tr_peer *              peer;
185
186    tr_torrent *           torrent;
187
188    tr_peer_callback      * callback;
189    void                  * callbackData;
190
191    struct evbuffer *      outMessages; /* all the non-piece messages */
192
193    struct peer_request    peerAskedFor[REQQ];
194
195    int                    peerAskedForMetadata[METADATA_REQQ];
196    int                    peerAskedForMetadataCount;
197
198    tr_pex               * pex;
199    tr_pex               * pex6;
200
201    /*time_t                 clientSentPexAt;*/
202    time_t                 clientSentAnythingAt;
203
204    /* when we started batching the outMessages */
205    time_t                outMessagesBatchedAt;
206
207    struct tr_incoming    incoming;
208
209    /* if the peer supports the Extension Protocol in BEP 10 and
210       supplied a reqq argument, it's stored here. Otherwise, the
211       value is zero and should be ignored. */
212    int64_t               reqq;
213
214    struct event        * pexTimer;
215};
216
217/**
218***
219**/
220
221static inline tr_session*
222getSession( struct tr_peermsgs * msgs )
223{
224    return msgs->torrent->session;
225}
226
227/**
228***
229**/
230
231static void
232myDebug( const char * file, int line,
233         const struct tr_peermsgs * msgs,
234         const char * fmt, ... )
235{
236    FILE * fp = tr_getLog( );
237
238    if( fp )
239    {
240        va_list           args;
241        char              timestr[64];
242        struct evbuffer * buf = evbuffer_new( );
243        char *            base = tr_basename( file );
244
245        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
246                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
247                             tr_torrentName( msgs->torrent ),
248                             tr_peerIoGetAddrStr( msgs->peer->io ),
249                             msgs->peer->client );
250        va_start( args, fmt );
251        evbuffer_add_vprintf( buf, fmt, args );
252        va_end( args );
253        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
254        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
255
256        tr_free( base );
257        evbuffer_free( buf );
258    }
259}
260
261#define dbgmsg( msgs, ... ) \
262    do { \
263        if( tr_deepLoggingIsActive( ) ) \
264            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
265    } while( 0 )
266
267/**
268***
269**/
270
271static void
272pokeBatchPeriod( tr_peermsgs * msgs, int interval )
273{
274    if( msgs->outMessagesBatchPeriod > interval )
275    {
276        msgs->outMessagesBatchPeriod = interval;
277        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
278    }
279}
280
281static void
282dbgOutMessageLen( tr_peermsgs * msgs )
283{
284    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
285}
286
287static void
288protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
289{
290    struct evbuffer * out = msgs->outMessages;
291
292    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
293
294    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
295    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
296    evbuffer_add_uint32( out, req->index );
297    evbuffer_add_uint32( out, req->offset );
298    evbuffer_add_uint32( out, req->length );
299
300    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
301    dbgOutMessageLen( msgs );
302}
303
304static void
305protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
306{
307    struct evbuffer * out = msgs->outMessages;
308
309    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
310    evbuffer_add_uint8 ( out, BT_REQUEST );
311    evbuffer_add_uint32( out, req->index );
312    evbuffer_add_uint32( out, req->offset );
313    evbuffer_add_uint32( out, req->length );
314
315    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
316    dbgOutMessageLen( msgs );
317    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
318}
319
320static void
321protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
322{
323    struct evbuffer * out = msgs->outMessages;
324
325    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
326    evbuffer_add_uint8 ( out, BT_CANCEL );
327    evbuffer_add_uint32( out, req->index );
328    evbuffer_add_uint32( out, req->offset );
329    evbuffer_add_uint32( out, req->length );
330
331    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
332    dbgOutMessageLen( msgs );
333    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
334}
335
336static void
337protocolSendPort(tr_peermsgs *msgs, uint16_t port)
338{
339    struct evbuffer * out = msgs->outMessages;
340
341    dbgmsg( msgs, "sending Port %u", port);
342    evbuffer_add_uint32( out, 3 );
343    evbuffer_add_uint8 ( out, BT_PORT );
344    evbuffer_add_uint16( out, port);
345}
346
347static void
348protocolSendHave( tr_peermsgs * msgs, uint32_t index )
349{
350    struct evbuffer * out = msgs->outMessages;
351
352    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
353    evbuffer_add_uint8 ( out, BT_HAVE );
354    evbuffer_add_uint32( out, index );
355
356    dbgmsg( msgs, "sending Have %u", index );
357    dbgOutMessageLen( msgs );
358    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
359}
360
361#if 0
362static void
363protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
364{
365    tr_peerIo       * io  = msgs->peer->io;
366    struct evbuffer * out = msgs->outMessages;
367
368    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
369
370    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
371    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
372    evbuffer_add_uint32( io, out, pieceIndex );
373
374    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
375    dbgOutMessageLen( msgs );
376}
377#endif
378
379static void
380protocolSendChoke( tr_peermsgs * msgs, int choke )
381{
382    struct evbuffer * out = msgs->outMessages;
383
384    evbuffer_add_uint32( out, sizeof( uint8_t ) );
385    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
386
387    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
388    dbgOutMessageLen( msgs );
389    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
390}
391
392static void
393protocolSendHaveAll( tr_peermsgs * msgs )
394{
395    struct evbuffer * out = msgs->outMessages;
396
397    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
398
399    evbuffer_add_uint32( out, sizeof( uint8_t ) );
400    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
401
402    dbgmsg( msgs, "sending HAVE_ALL..." );
403    dbgOutMessageLen( msgs );
404    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
405}
406
407static void
408protocolSendHaveNone( tr_peermsgs * msgs )
409{
410    struct evbuffer * out = msgs->outMessages;
411
412    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
413
414    evbuffer_add_uint32( out, sizeof( uint8_t ) );
415    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
416
417    dbgmsg( msgs, "sending HAVE_NONE..." );
418    dbgOutMessageLen( msgs );
419    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
420}
421
422/**
423***  EVENTS
424**/
425
426static void
427publish( tr_peermsgs * msgs, tr_peer_event * e )
428{
429    assert( msgs->peer );
430    assert( msgs->peer->msgs == msgs );
431
432    if( msgs->callback != NULL )
433        msgs->callback( msgs->peer, e, msgs->callbackData );
434}
435
436static void
437fireError( tr_peermsgs * msgs, int err )
438{
439    tr_peer_event e = TR_PEER_EVENT_INIT;
440    e.eventType = TR_PEER_ERROR;
441    e.err = err;
442    publish( msgs, &e );
443}
444
445static void
446fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
447{
448    tr_peer_event e = TR_PEER_EVENT_INIT;
449    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
450    e.pieceIndex = req->index;
451    e.offset = req->offset;
452    e.length = req->length;
453    publish( msgs, &e );
454}
455
456static void
457fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
458{
459    tr_peer_event e = TR_PEER_EVENT_INIT;
460    e.eventType = TR_PEER_CLIENT_GOT_REJ;
461    e.pieceIndex = req->index;
462    e.offset = req->offset;
463    e.length = req->length;
464    publish( msgs, &e );
465}
466
467static void
468fireGotChoke( tr_peermsgs * msgs )
469{
470    tr_peer_event e = TR_PEER_EVENT_INIT;
471    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
472    publish( msgs, &e );
473}
474
475static void
476fireClientGotHaveAll( tr_peermsgs * msgs )
477{
478    tr_peer_event e = TR_PEER_EVENT_INIT;
479    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
480    publish( msgs, &e );
481}
482
483static void
484fireClientGotHaveNone( tr_peermsgs * msgs )
485{
486    tr_peer_event e = TR_PEER_EVENT_INIT;
487    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
488    publish( msgs, &e );
489}
490
491static void
492fireClientGotData( tr_peermsgs * msgs, uint32_t length, int wasPieceData )
493{
494    tr_peer_event e = TR_PEER_EVENT_INIT;
495
496    e.length = length;
497    e.eventType = TR_PEER_CLIENT_GOT_DATA;
498    e.wasPieceData = wasPieceData;
499    publish( msgs, &e );
500}
501
502static void
503fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
504{
505    tr_peer_event e = TR_PEER_EVENT_INIT;
506    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
507    e.pieceIndex = pieceIndex;
508    publish( msgs, &e );
509}
510
511static void
512fireClientGotPort( tr_peermsgs * msgs, tr_port port )
513{
514    tr_peer_event e = TR_PEER_EVENT_INIT;
515    e.eventType = TR_PEER_CLIENT_GOT_PORT;
516    e.port = port;
517    publish( msgs, &e );
518}
519
520static void
521fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
522{
523    tr_peer_event e = TR_PEER_EVENT_INIT;
524    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
525    e.pieceIndex = pieceIndex;
526    publish( msgs, &e );
527}
528
529static void
530fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
531{
532    tr_peer_event e = TR_PEER_EVENT_INIT;
533    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
534    e.bitfield = bitfield;
535    publish( msgs, &e );
536}
537
538static void
539fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
540{
541    tr_peer_event e = TR_PEER_EVENT_INIT;
542    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
543    e.pieceIndex = index;
544    publish( msgs, &e );
545}
546
547static void
548firePeerGotData( tr_peermsgs * msgs, uint32_t length, bool wasPieceData )
549{
550    tr_peer_event e = TR_PEER_EVENT_INIT;
551
552    e.length = length;
553    e.eventType = TR_PEER_PEER_GOT_DATA;
554    e.wasPieceData = wasPieceData;
555
556    publish( msgs, &e );
557}
558
559/**
560***  ALLOWED FAST SET
561***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
562**/
563
564#if 0
565size_t
566tr_generateAllowedSet( tr_piece_index_t * setmePieces,
567                       size_t             desiredSetSize,
568                       size_t             pieceCount,
569                       const uint8_t    * infohash,
570                       const tr_address * addr )
571{
572    size_t setSize = 0;
573
574    assert( setmePieces );
575    assert( desiredSetSize <= pieceCount );
576    assert( desiredSetSize );
577    assert( pieceCount );
578    assert( infohash );
579    assert( addr );
580
581    if( addr->type == TR_AF_INET )
582    {
583        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
584        uint8_t x[SHA_DIGEST_LENGTH];
585
586        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
587        memcpy( w, &ui32, sizeof( uint32_t ) );
588        walk += sizeof( uint32_t );
589        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
590        walk += SHA_DIGEST_LENGTH;
591        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
592        assert( sizeof( w ) == walk-w );
593
594        while( setSize<desiredSetSize )
595        {
596            int i;
597            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
598            {
599                size_t k;
600                uint32_t j = i * 4;                                  /* (5) */
601                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
602                uint32_t index = y % pieceCount;                     /* (7) */
603
604                for( k=0; k<setSize; ++k )                           /* (8) */
605                    if( setmePieces[k] == index )
606                        break;
607
608                if( k == setSize )
609                    setmePieces[setSize++] = index;                  /* (9) */
610            }
611
612            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
613        }
614    }
615
616    return setSize;
617}
618
619static void
620updateFastSet( tr_peermsgs * msgs UNUSED )
621{
622    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
623    const int peerIsNeedy = msgs->peer->progress < 0.10;
624
625    if( fext && peerIsNeedy && !msgs->haveFastSet )
626    {
627        size_t i;
628        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
629        const tr_info * inf = &msgs->torrent->info;
630        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
631
632        /* build the fast set */
633        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
634        msgs->haveFastSet = 1;
635
636        /* send it to the peer */
637        for( i=0; i<msgs->fastsetSize; ++i )
638            protocolSendAllowedFast( msgs, msgs->fastset[i] );
639    }
640}
641#endif
642
643/**
644***  INTEREST
645**/
646
647static void
648sendInterest( tr_peermsgs * msgs, bool clientIsInterested )
649{
650    struct evbuffer * out = msgs->outMessages;
651
652    assert( msgs );
653    assert( tr_isBool( clientIsInterested ) );
654
655    msgs->peer->clientIsInterested = clientIsInterested;
656    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
657    evbuffer_add_uint32( out, sizeof( uint8_t ) );
658    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
659
660    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
661    dbgOutMessageLen( msgs );
662}
663
664static void
665updateInterest( tr_peermsgs * msgs UNUSED )
666{
667    /* FIXME -- might need to poke the mgr on startup */
668}
669
670void
671tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
672{
673    assert( tr_isBool( isInterested ) );
674
675    if( isInterested != msgs->peer->clientIsInterested )
676        sendInterest( msgs, isInterested );
677}
678
679static bool
680popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
681{
682    if( msgs->peerAskedForMetadataCount == 0 )
683        return false;
684
685    *piece = msgs->peerAskedForMetadata[0];
686
687    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
688                               msgs->peerAskedForMetadataCount-- );
689
690    return true;
691}
692
693static bool
694popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
695{
696    if( msgs->peer->pendingReqsToClient == 0 )
697        return false;
698
699    *setme = msgs->peerAskedFor[0];
700
701    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
702                               msgs->peer->pendingReqsToClient-- );
703
704    return true;
705}
706
707static void
708cancelAllRequestsToClient( tr_peermsgs * msgs )
709{
710    struct peer_request req;
711    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
712
713    while( popNextRequest( msgs, &req ))
714        if( mustSendCancel )
715            protocolSendReject( msgs, &req );
716}
717
718void
719tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
720{
721    const time_t now = tr_time( );
722    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
723
724    assert( msgs );
725    assert( msgs->peer );
726    assert( choke == 0 || choke == 1 );
727
728    if( msgs->peer->chokeChangedAt > fibrillationTime )
729    {
730        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
731    }
732    else if( msgs->peer->peerIsChoked != choke )
733    {
734        msgs->peer->peerIsChoked = choke;
735        if( choke )
736            cancelAllRequestsToClient( msgs );
737        protocolSendChoke( msgs, choke );
738        msgs->peer->chokeChangedAt = now;
739    }
740}
741
742/**
743***
744**/
745
746void
747tr_peerMsgsHave( tr_peermsgs * msgs, uint32_t index )
748{
749    protocolSendHave( msgs, index );
750
751    /* since we have more pieces now, we might not be interested in this peer */
752    updateInterest( msgs );
753}
754
755/**
756***
757**/
758
759static bool
760reqIsValid( const tr_peermsgs * peer,
761            uint32_t            index,
762            uint32_t            offset,
763            uint32_t            length )
764{
765    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
766}
767
768static bool
769requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
770{
771    return reqIsValid( msgs, req->index, req->offset, req->length );
772}
773
774void
775tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
776{
777    struct peer_request req;
778/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
779    blockToReq( msgs->torrent, block, &req );
780    protocolSendCancel( msgs, &req );
781}
782
783/**
784***
785**/
786
787static void
788sendLtepHandshake( tr_peermsgs * msgs )
789{
790    tr_benc val, *m;
791    char * buf;
792    int len;
793    bool allow_pex;
794    bool allow_metadata_xfer;
795    struct evbuffer * out = msgs->outMessages;
796    const unsigned char * ipv6 = tr_globalIPv6();
797
798    if( msgs->clientSentLtepHandshake )
799        return;
800
801    dbgmsg( msgs, "sending an ltep handshake" );
802    msgs->clientSentLtepHandshake = 1;
803
804    /* decide if we want to advertise metadata xfer support (BEP 9) */
805    if( tr_torrentIsPrivate( msgs->torrent ) )
806        allow_metadata_xfer = 0;
807    else
808        allow_metadata_xfer = 1;
809
810    /* decide if we want to advertise pex support */
811    if( !tr_torrentAllowsPex( msgs->torrent ) )
812        allow_pex = 0;
813    else if( msgs->peerSentLtepHandshake )
814        allow_pex = msgs->peerSupportsPex ? 1 : 0;
815    else
816        allow_pex = 1;
817
818    tr_bencInitDict( &val, 8 );
819    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
820    if( ipv6 != NULL )
821        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
822    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
823                            && ( msgs->torrent->infoDictLength > 0 ) )
824        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
825    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
826    tr_bencDictAddInt( &val, "reqq", REQQ );
827    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
828    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
829    m  = tr_bencDictAddDict( &val, "m", 2 );
830    if( allow_metadata_xfer )
831        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
832    if( allow_pex )
833        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
834
835    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
836
837    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
838    evbuffer_add_uint8 ( out, BT_LTEP );
839    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
840    evbuffer_add       ( out, buf, len );
841    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
842    dbgOutMessageLen( msgs );
843
844    /* cleanup */
845    tr_bencFree( &val );
846    tr_free( buf );
847}
848
849static void
850parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
851{
852    int64_t   i;
853    tr_benc   val, * sub;
854    uint8_t * tmp = tr_new( uint8_t, len );
855    const uint8_t *addr;
856    size_t addr_len;
857    tr_pex pex;
858    int8_t seedProbability = -1;
859
860    memset( &pex, 0, sizeof( tr_pex ) );
861
862    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
863    msgs->peerSentLtepHandshake = 1;
864
865    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
866    {
867        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
868        tr_free( tmp );
869        return;
870    }
871
872    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
873
874    /* does the peer prefer encrypted connections? */
875    if( tr_bencDictFindInt( &val, "e", &i ) ) {
876        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
877                                              : ENCRYPTION_PREFERENCE_NO;
878        if( i )
879            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
880    }
881
882    /* check supported messages for utorrent pex */
883    msgs->peerSupportsPex = 0;
884    msgs->peerSupportsMetadataXfer = 0;
885
886    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
887        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
888            msgs->peerSupportsPex = i != 0;
889            msgs->ut_pex_id = (uint8_t) i;
890            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
891        }
892        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
893            msgs->peerSupportsMetadataXfer = i != 0;
894            msgs->ut_metadata_id = (uint8_t) i;
895            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
896        }
897        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
898            /* Mysterious µTorrent extension that we don't grok.  However,
899               it implies support for µTP, so use it to indicate that. */
900            tr_peerMgrSetUtpFailed( msgs->torrent,
901                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
902                                    false );
903        }
904    }
905
906    /* look for metainfo size (BEP 9) */
907    if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) {
908        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
909        msgs->metadata_size_hint = (size_t) i;
910    }
911
912    /* look for upload_only (BEP 21) */
913    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
914        seedProbability = i==0 ? 0 : 100;
915
916    /* get peer's listening port */
917    if( tr_bencDictFindInt( &val, "p", &i ) ) {
918        pex.port = htons( (uint16_t)i );
919        fireClientGotPort( msgs, pex.port );
920        dbgmsg( msgs, "peer's port is now %d", (int)i );
921    }
922
923    if( tr_peerIoIsIncoming( msgs->peer->io )
924        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
925        && ( addr_len == 4 ) )
926    {
927        pex.addr.type = TR_AF_INET;
928        memcpy( &pex.addr.addr.addr4, addr, 4 );
929        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
930    }
931
932    if( tr_peerIoIsIncoming( msgs->peer->io )
933        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
934        && ( addr_len == 16 ) )
935    {
936        pex.addr.type = TR_AF_INET6;
937        memcpy( &pex.addr.addr.addr6, addr, 16 );
938        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
939    }
940
941    /* get peer's maximum request queue size */
942    if( tr_bencDictFindInt( &val, "reqq", &i ) )
943        msgs->reqq = i;
944
945    tr_bencFree( &val );
946    tr_free( tmp );
947}
948
949static void
950parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
951{
952    tr_benc dict;
953    char * msg_end;
954    char * benc_end;
955    int64_t msg_type = -1;
956    int64_t piece = -1;
957    int64_t total_size = 0;
958    uint8_t * tmp = tr_new( uint8_t, msglen );
959
960    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
961    msg_end = (char*)tmp + msglen;
962
963    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
964    {
965        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
966        tr_bencDictFindInt( &dict, "piece", &piece );
967        tr_bencDictFindInt( &dict, "total_size", &total_size );
968        tr_bencFree( &dict );
969    }
970
971    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
972            (int)msg_type, (int)piece, (int)total_size );
973
974    if( msg_type == METADATA_MSG_TYPE_REJECT )
975    {
976        /* NOOP */
977    }
978
979    if( ( msg_type == METADATA_MSG_TYPE_DATA )
980        && ( !tr_torrentHasMetadata( msgs->torrent ) )
981        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
982        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
983    {
984        const int pieceLen = msg_end - benc_end;
985        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
986    }
987
988    if( msg_type == METADATA_MSG_TYPE_REQUEST )
989    {
990        if( ( piece >= 0 )
991            && tr_torrentHasMetadata( msgs->torrent )
992            && !tr_torrentIsPrivate( msgs->torrent )
993            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
994        {
995            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
996        }
997        else
998        {
999            tr_benc tmp;
1000            int payloadLen;
1001            char * payload;
1002            struct evbuffer * out = msgs->outMessages;
1003
1004            /* build the rejection message */
1005            tr_bencInitDict( &tmp, 2 );
1006            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1007            tr_bencDictAddInt( &tmp, "piece", piece );
1008            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1009            tr_bencFree( &tmp );
1010
1011            /* write it out as a LTEP message to our outMessages buffer */
1012            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1013            evbuffer_add_uint8 ( out, BT_LTEP );
1014            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1015            evbuffer_add       ( out, payload, payloadLen );
1016            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1017            dbgOutMessageLen( msgs );
1018
1019            tr_free( payload );
1020        }
1021    }
1022
1023    tr_free( tmp );
1024}
1025
1026static void
1027parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1028{
1029    int loaded = 0;
1030    uint8_t * tmp = tr_new( uint8_t, msglen );
1031    tr_benc val;
1032    tr_torrent * tor = msgs->torrent;
1033    const uint8_t * added;
1034    size_t added_len;
1035
1036    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1037
1038    if( tr_torrentAllowsPex( tor )
1039      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1040    {
1041        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1042        {
1043            tr_pex * pex;
1044            size_t i, n;
1045            size_t added_f_len = 0;
1046            const uint8_t * added_f = NULL;
1047
1048            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1049            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1050
1051            n = MIN( n, MAX_PEX_PEER_COUNT );
1052            for( i=0; i<n; ++i )
1053            {
1054                int seedProbability = -1;
1055                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1056                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1057            }
1058
1059            tr_free( pex );
1060        }
1061
1062        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1063        {
1064            tr_pex * pex;
1065            size_t i, n;
1066            size_t added_f_len = 0;
1067            const uint8_t * added_f = NULL;
1068
1069            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1070            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1071
1072            n = MIN( n, MAX_PEX_PEER_COUNT );
1073            for( i=0; i<n; ++i )
1074            {
1075                int seedProbability = -1;
1076                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1077                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1078            }
1079
1080            tr_free( pex );
1081        }
1082    }
1083
1084    if( loaded )
1085        tr_bencFree( &val );
1086    tr_free( tmp );
1087}
1088
1089static void sendPex( tr_peermsgs * msgs );
1090
1091static void
1092parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1093{
1094    uint8_t ltep_msgid;
1095
1096    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1097    msglen--;
1098
1099    if( ltep_msgid == LTEP_HANDSHAKE )
1100    {
1101        dbgmsg( msgs, "got ltep handshake" );
1102        parseLtepHandshake( msgs, msglen, inbuf );
1103        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1104        {
1105            sendLtepHandshake( msgs );
1106            sendPex( msgs );
1107        }
1108    }
1109    else if( ltep_msgid == UT_PEX_ID )
1110    {
1111        dbgmsg( msgs, "got ut pex" );
1112        msgs->peerSupportsPex = 1;
1113        parseUtPex( msgs, msglen, inbuf );
1114    }
1115    else if( ltep_msgid == UT_METADATA_ID )
1116    {
1117        dbgmsg( msgs, "got ut metadata" );
1118        msgs->peerSupportsMetadataXfer = 1;
1119        parseUtMetadata( msgs, msglen, inbuf );
1120    }
1121    else
1122    {
1123        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1124        evbuffer_drain( inbuf, msglen );
1125    }
1126}
1127
1128static int
1129readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1130{
1131    uint32_t len;
1132
1133    if( inlen < sizeof( len ) )
1134        return READ_LATER;
1135
1136    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1137
1138    if( len == 0 ) /* peer sent us a keepalive message */
1139        dbgmsg( msgs, "got KeepAlive" );
1140    else
1141    {
1142        msgs->incoming.length = len;
1143        msgs->state = AWAITING_BT_ID;
1144    }
1145
1146    return READ_NOW;
1147}
1148
1149static int readBtMessage( tr_peermsgs *, struct evbuffer *, size_t );
1150
1151static int
1152readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1153{
1154    uint8_t id;
1155
1156    if( inlen < sizeof( uint8_t ) )
1157        return READ_LATER;
1158
1159    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1160    msgs->incoming.id = id;
1161    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1162
1163    if( id == BT_PIECE )
1164    {
1165        msgs->state = AWAITING_BT_PIECE;
1166        return READ_NOW;
1167    }
1168    else if( msgs->incoming.length != 1 )
1169    {
1170        msgs->state = AWAITING_BT_MESSAGE;
1171        return READ_NOW;
1172    }
1173    else return readBtMessage( msgs, inbuf, inlen - 1 );
1174}
1175
1176static void
1177updatePeerProgress( tr_peermsgs * msgs )
1178{
1179    tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1180
1181    /*updateFastSet( msgs );*/
1182    updateInterest( msgs );
1183}
1184
1185static void
1186prefetchPieces( tr_peermsgs *msgs )
1187{
1188    int i;
1189
1190    if( !getSession(msgs)->isPrefetchEnabled )
1191        return;
1192
1193    /* Maintain 12 prefetched blocks per unchoked peer */
1194    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1195    {
1196        const struct peer_request * req = msgs->peerAskedFor + i;
1197        if( requestIsValid( msgs, req ) )
1198        {
1199            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1200            ++msgs->prefetchCount;
1201        }
1202    }
1203}
1204
1205static void
1206peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1207{
1208    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1209    const int reqIsValid = requestIsValid( msgs, req );
1210    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1211    const int peerIsChoked = msgs->peer->peerIsChoked;
1212
1213    int allow = false;
1214
1215    if( !reqIsValid )
1216        dbgmsg( msgs, "rejecting an invalid request." );
1217    else if( !clientHasPiece )
1218        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1219    else if( peerIsChoked )
1220        dbgmsg( msgs, "rejecting request from choked peer" );
1221    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1222        dbgmsg( msgs, "rejecting request ... reqq is full" );
1223    else
1224        allow = true;
1225
1226    if( allow ) {
1227        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1228        prefetchPieces( msgs );
1229    } else if( fext ) {
1230        protocolSendReject( msgs, req );
1231    }
1232}
1233
1234static bool
1235messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1236{
1237    switch( id )
1238    {
1239        case BT_CHOKE:
1240        case BT_UNCHOKE:
1241        case BT_INTERESTED:
1242        case BT_NOT_INTERESTED:
1243        case BT_FEXT_HAVE_ALL:
1244        case BT_FEXT_HAVE_NONE:
1245            return len == 1;
1246
1247        case BT_HAVE:
1248        case BT_FEXT_SUGGEST:
1249        case BT_FEXT_ALLOWED_FAST:
1250            return len == 5;
1251
1252        case BT_BITFIELD:
1253            if( tr_torrentHasMetadata( msg->torrent ) )
1254                return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1255            /* we don't know the piece count yet,
1256               so we can only guess whether to send true or false */
1257            if( msg->metadata_size_hint > 0 )
1258                return len <= msg->metadata_size_hint;
1259            return true;
1260
1261        case BT_REQUEST:
1262        case BT_CANCEL:
1263        case BT_FEXT_REJECT:
1264            return len == 13;
1265
1266        case BT_PIECE:
1267            return len > 9 && len <= 16393;
1268
1269        case BT_PORT:
1270            return len == 3;
1271
1272        case BT_LTEP:
1273            return len >= 2;
1274
1275        default:
1276            return false;
1277    }
1278}
1279
1280static int clientGotBlock( tr_peermsgs *               msgs,
1281                           struct evbuffer *           block,
1282                           const struct peer_request * req );
1283
1284static int
1285readBtPiece( tr_peermsgs      * msgs,
1286             struct evbuffer  * inbuf,
1287             size_t             inlen,
1288             size_t           * setme_piece_bytes_read )
1289{
1290    struct peer_request * req = &msgs->incoming.blockReq;
1291
1292    assert( evbuffer_get_length( inbuf ) >= inlen );
1293    dbgmsg( msgs, "In readBtPiece" );
1294
1295    if( !req->length )
1296    {
1297        if( inlen < 8 )
1298            return READ_LATER;
1299
1300        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1301        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1302        req->length = msgs->incoming.length - 9;
1303        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1304        return READ_NOW;
1305    }
1306    else
1307    {
1308        int err;
1309
1310        /* read in another chunk of data */
1311        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1312        size_t n = MIN( nLeft, inlen );
1313
1314        tr_peerIoReadBytesToBuf( msgs->peer->io, inbuf, msgs->incoming.block, n );
1315
1316        fireClientGotData( msgs, n, true );
1317        *setme_piece_bytes_read += n;
1318        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1319               n, req->index, req->offset, req->length,
1320               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1321        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1322            return READ_LATER;
1323
1324        /* pass the block along... */
1325        err = clientGotBlock( msgs, msgs->incoming.block, req );
1326        evbuffer_drain( msgs->incoming.block, evbuffer_get_length( msgs->incoming.block ) );
1327
1328        /* cleanup */
1329        req->length = 0;
1330        msgs->state = AWAITING_BT_LENGTH;
1331        return err ? READ_ERR : READ_NOW;
1332    }
1333}
1334
1335static void updateDesiredRequestCount( tr_peermsgs * msgs );
1336
1337static int
1338readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1339{
1340    uint32_t      ui32;
1341    uint32_t      msglen = msgs->incoming.length;
1342    const uint8_t id = msgs->incoming.id;
1343#ifndef NDEBUG
1344    const size_t  startBufLen = evbuffer_get_length( inbuf );
1345#endif
1346    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1347
1348    --msglen; /* id length */
1349
1350    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1351
1352    if( inlen < msglen )
1353        return READ_LATER;
1354
1355    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1356    {
1357        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1358        fireError( msgs, EMSGSIZE );
1359        return READ_ERR;
1360    }
1361
1362    switch( id )
1363    {
1364        case BT_CHOKE:
1365            dbgmsg( msgs, "got Choke" );
1366            msgs->peer->clientIsChoked = 1;
1367            if( !fext )
1368                fireGotChoke( msgs );
1369            break;
1370
1371        case BT_UNCHOKE:
1372            dbgmsg( msgs, "got Unchoke" );
1373            msgs->peer->clientIsChoked = 0;
1374            updateDesiredRequestCount( msgs );
1375            break;
1376
1377        case BT_INTERESTED:
1378            dbgmsg( msgs, "got Interested" );
1379            msgs->peer->peerIsInterested = 1;
1380            break;
1381
1382        case BT_NOT_INTERESTED:
1383            dbgmsg( msgs, "got Not Interested" );
1384            msgs->peer->peerIsInterested = 0;
1385            break;
1386
1387        case BT_HAVE:
1388            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1389            dbgmsg( msgs, "got Have: %u", ui32 );
1390            if( tr_torrentHasMetadata( msgs->torrent )
1391                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1392            {
1393                fireError( msgs, ERANGE );
1394                return READ_ERR;
1395            }
1396
1397            /* a peer can send the same HAVE message twice... */
1398            if( !tr_bitfieldHas( &msgs->peer->have, ui32 ) ) {
1399                tr_bitfieldAdd( &msgs->peer->have, ui32 );
1400                fireClientGotHave( msgs, ui32 );
1401            }
1402            updatePeerProgress( msgs );
1403            break;
1404
1405        case BT_BITFIELD: {
1406            uint8_t * tmp = tr_new( uint8_t, msglen );
1407            dbgmsg( msgs, "got a bitfield" );
1408            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1409            tr_bitfieldSetRaw( &msgs->peer->have, tmp, msglen );
1410            fireClientGotBitfield( msgs, &msgs->peer->have );
1411            updatePeerProgress( msgs );
1412            tr_free( tmp );
1413            break;
1414        }
1415
1416        case BT_REQUEST:
1417        {
1418            struct peer_request r;
1419            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1420            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1421            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1422            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1423            peerMadeRequest( msgs, &r );
1424            break;
1425        }
1426
1427        case BT_CANCEL:
1428        {
1429            int i;
1430            struct peer_request r;
1431            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1432            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1433            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1434            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1435            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1436
1437            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1438                const struct peer_request * req = msgs->peerAskedFor + i;
1439                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1440                    break;
1441            }
1442
1443            if( i < msgs->peer->pendingReqsToClient )
1444                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1445                                           msgs->peer->pendingReqsToClient-- );
1446            break;
1447        }
1448
1449        case BT_PIECE:
1450            assert( 0 ); /* handled elsewhere! */
1451            break;
1452
1453        case BT_PORT:
1454            dbgmsg( msgs, "Got a BT_PORT" );
1455            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1456            if( msgs->peer->dht_port > 0 )
1457                tr_dhtAddNode( getSession(msgs),
1458                               tr_peerAddress( msgs->peer ),
1459                               msgs->peer->dht_port, 0 );
1460            break;
1461
1462        case BT_FEXT_SUGGEST:
1463            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1464            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1465            if( fext )
1466                fireClientGotSuggest( msgs, ui32 );
1467            else {
1468                fireError( msgs, EMSGSIZE );
1469                return READ_ERR;
1470            }
1471            break;
1472
1473        case BT_FEXT_ALLOWED_FAST:
1474            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1475            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1476            if( fext )
1477                fireClientGotAllowedFast( msgs, ui32 );
1478            else {
1479                fireError( msgs, EMSGSIZE );
1480                return READ_ERR;
1481            }
1482            break;
1483
1484        case BT_FEXT_HAVE_ALL:
1485            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1486            if( fext ) {
1487                tr_bitfieldSetHasAll( &msgs->peer->have );
1488assert( tr_bitfieldHasAll( &msgs->peer->have ) );
1489                fireClientGotHaveAll( msgs );
1490                updatePeerProgress( msgs );
1491            } else {
1492                fireError( msgs, EMSGSIZE );
1493                return READ_ERR;
1494            }
1495            break;
1496
1497        case BT_FEXT_HAVE_NONE:
1498            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1499            if( fext ) {
1500                tr_bitfieldSetHasNone( &msgs->peer->have );
1501                fireClientGotHaveNone( msgs );
1502                updatePeerProgress( msgs );
1503            } else {
1504                fireError( msgs, EMSGSIZE );
1505                return READ_ERR;
1506            }
1507            break;
1508
1509        case BT_FEXT_REJECT:
1510        {
1511            struct peer_request r;
1512            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1513            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1514            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1515            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1516            if( fext )
1517                fireGotRej( msgs, &r );
1518            else {
1519                fireError( msgs, EMSGSIZE );
1520                return READ_ERR;
1521            }
1522            break;
1523        }
1524
1525        case BT_LTEP:
1526            dbgmsg( msgs, "Got a BT_LTEP" );
1527            parseLtep( msgs, msglen, inbuf );
1528            break;
1529
1530        default:
1531            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1532            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1533            break;
1534    }
1535
1536    assert( msglen + 1 == msgs->incoming.length );
1537    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1538
1539    msgs->state = AWAITING_BT_LENGTH;
1540    return READ_NOW;
1541}
1542
1543/* returns 0 on success, or an errno on failure */
1544static int
1545clientGotBlock( tr_peermsgs                * msgs,
1546                struct evbuffer            * data,
1547                const struct peer_request  * req )
1548{
1549    int err;
1550    tr_torrent * tor = msgs->torrent;
1551    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1552
1553    assert( msgs );
1554    assert( req );
1555
1556    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1557        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1558                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1559        return EMSGSIZE;
1560    }
1561
1562    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1563
1564    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1565        dbgmsg( msgs, "we didn't ask for this message..." );
1566        return 0;
1567    }
1568    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1569        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1570        return 0;
1571    }
1572
1573    /**
1574    ***  Save the block
1575    **/
1576
1577    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1578        return err;
1579
1580    tr_bitfieldAdd( &msgs->peer->blame, req->index );
1581    fireGotBlock( msgs, req );
1582    return 0;
1583}
1584
1585static int peerPulse( void * vmsgs );
1586
1587static void
1588didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1589{
1590    tr_peermsgs * msgs = vmsgs;
1591    firePeerGotData( msgs, bytesWritten, wasPieceData );
1592
1593    if ( tr_isPeerIo( io ) && io->userData )
1594        peerPulse( msgs );
1595}
1596
1597static ReadState
1598canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1599{
1600    ReadState         ret;
1601    tr_peermsgs *     msgs = vmsgs;
1602    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1603    const size_t      inlen = evbuffer_get_length( in );
1604
1605    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1606
1607    if( !inlen )
1608    {
1609        ret = READ_LATER;
1610    }
1611    else if( msgs->state == AWAITING_BT_PIECE )
1612    {
1613        ret = readBtPiece( msgs, in, inlen, piece );
1614    }
1615    else switch( msgs->state )
1616    {
1617        case AWAITING_BT_LENGTH:
1618            ret = readBtLength ( msgs, in, inlen ); break;
1619
1620        case AWAITING_BT_ID:
1621            ret = readBtId     ( msgs, in, inlen ); break;
1622
1623        case AWAITING_BT_MESSAGE:
1624            ret = readBtMessage( msgs, in, inlen ); break;
1625
1626        default:
1627            ret = READ_ERR;
1628            assert( 0 );
1629    }
1630
1631    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1632
1633    /* log the raw data that was read */
1634    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1635        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), false );
1636
1637    return ret;
1638}
1639
1640int
1641tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1642{
1643    if( msgs->state != AWAITING_BT_PIECE )
1644        return false;
1645
1646    return block == _tr_block( msgs->torrent,
1647                               msgs->incoming.blockReq.index,
1648                               msgs->incoming.blockReq.offset );
1649}
1650
1651/**
1652***
1653**/
1654
1655static void
1656updateDesiredRequestCount( tr_peermsgs * msgs )
1657{
1658    const tr_torrent * const torrent = msgs->torrent;
1659
1660    if( tr_torrentIsSeed( msgs->torrent ) )
1661    {
1662        msgs->desiredRequestCount = 0;
1663    }
1664    else if( msgs->peer->clientIsChoked )
1665    {
1666        msgs->desiredRequestCount = 0;
1667    }
1668    else if( !msgs->peer->clientIsInterested )
1669    {
1670        msgs->desiredRequestCount = 0;
1671    }
1672    else
1673    {
1674        int estimatedBlocksInPeriod;
1675        int rate_Bps;
1676        int irate_Bps;
1677        const int floor = 4;
1678        const int seconds = REQUEST_BUF_SECS;
1679        const uint64_t now = tr_time_msec( );
1680
1681        /* Get the rate limit we should use.
1682         * FIXME: this needs to consider all the other peers as well... */
1683        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1684        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1685            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1686
1687        /* honor the session limits, if enabled */
1688        if( tr_torrentUsesSessionLimits( torrent ) )
1689            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1690                rate_Bps = MIN( rate_Bps, irate_Bps );
1691
1692        /* use this desired rate to figure out how
1693         * many requests we should send to this peer */
1694        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1695        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1696
1697        /* honor the peer's maximum request count, if specified */
1698        if( msgs->reqq > 0 )
1699            if( msgs->desiredRequestCount > msgs->reqq )
1700                msgs->desiredRequestCount = msgs->reqq;
1701    }
1702}
1703
1704static void
1705updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1706{
1707    int piece;
1708
1709    if( msgs->peerSupportsMetadataXfer
1710        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1711    {
1712        tr_benc tmp;
1713        int payloadLen;
1714        char * payload;
1715        struct evbuffer * out = msgs->outMessages;
1716
1717        /* build the data message */
1718        tr_bencInitDict( &tmp, 3 );
1719        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1720        tr_bencDictAddInt( &tmp, "piece", piece );
1721        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1722        tr_bencFree( &tmp );
1723
1724        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1725
1726        /* write it out as a LTEP message to our outMessages buffer */
1727        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1728        evbuffer_add_uint8 ( out, BT_LTEP );
1729        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1730        evbuffer_add       ( out, payload, payloadLen );
1731        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1732        dbgOutMessageLen( msgs );
1733
1734        tr_free( payload );
1735    }
1736}
1737
1738static void
1739updateBlockRequests( tr_peermsgs * msgs )
1740{
1741    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1742        && ( msgs->desiredRequestCount > 0 )
1743        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1744    {
1745        int i;
1746        int n;
1747        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1748        tr_block_index_t * blocks = alloca( sizeof( tr_block_index_t ) * numwant );
1749
1750        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1751
1752        for( i=0; i<n; ++i )
1753        {
1754            struct peer_request req;
1755            blockToReq( msgs->torrent, blocks[i], &req );
1756            protocolSendRequest( msgs, &req );
1757        }
1758    }
1759}
1760
1761static size_t
1762fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1763{
1764    int piece;
1765    size_t bytesWritten = 0;
1766    struct peer_request req;
1767    const bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1768    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1769
1770    /**
1771    ***  Protocol messages
1772    **/
1773
1774    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1775    {
1776        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1777        msgs->outMessagesBatchedAt = now;
1778    }
1779    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1780    {
1781        const size_t len = evbuffer_get_length( msgs->outMessages );
1782        /* flush the protocol messages */
1783        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1784        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, false );
1785        msgs->clientSentAnythingAt = now;
1786        msgs->outMessagesBatchedAt = 0;
1787        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1788        bytesWritten +=  len;
1789    }
1790
1791    /**
1792    ***  Metadata Pieces
1793    **/
1794
1795    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1796        && popNextMetadataRequest( msgs, &piece ) )
1797    {
1798        char * data;
1799        int dataLen;
1800        bool ok = false;
1801
1802        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1803        if( ( dataLen > 0 ) && ( data != NULL ) )
1804        {
1805            tr_benc tmp;
1806            int payloadLen;
1807            char * payload;
1808            struct evbuffer * out = msgs->outMessages;
1809
1810            /* build the data message */
1811            tr_bencInitDict( &tmp, 3 );
1812            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1813            tr_bencDictAddInt( &tmp, "piece", piece );
1814            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1815            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1816            tr_bencFree( &tmp );
1817
1818            /* write it out as a LTEP message to our outMessages buffer */
1819            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1820            evbuffer_add_uint8 ( out, BT_LTEP );
1821            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1822            evbuffer_add       ( out, payload, payloadLen );
1823            evbuffer_add       ( out, data, dataLen );
1824            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1825            dbgOutMessageLen( msgs );
1826
1827            tr_free( payload );
1828            tr_free( data );
1829
1830            ok = true;
1831        }
1832
1833        if( !ok ) /* send a rejection message */
1834        {
1835            tr_benc tmp;
1836            int payloadLen;
1837            char * payload;
1838            struct evbuffer * out = msgs->outMessages;
1839
1840            /* build the rejection message */
1841            tr_bencInitDict( &tmp, 2 );
1842            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1843            tr_bencDictAddInt( &tmp, "piece", piece );
1844            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1845            tr_bencFree( &tmp );
1846
1847            /* write it out as a LTEP message to our outMessages buffer */
1848            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1849            evbuffer_add_uint8 ( out, BT_LTEP );
1850            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1851            evbuffer_add       ( out, payload, payloadLen );
1852            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1853            dbgOutMessageLen( msgs );
1854
1855            tr_free( payload );
1856        }
1857    }
1858
1859    /**
1860    ***  Data Blocks
1861    **/
1862
1863    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1864        && popNextRequest( msgs, &req ) )
1865    {
1866        --msgs->prefetchCount;
1867
1868        if( requestIsValid( msgs, &req )
1869            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1870        {
1871            int err;
1872            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1873            struct evbuffer * out;
1874            struct evbuffer_iovec iovec[1];
1875
1876            out = evbuffer_new( );
1877            evbuffer_expand( out, msglen );
1878
1879            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1880            evbuffer_add_uint8 ( out, BT_PIECE );
1881            evbuffer_add_uint32( out, req.index );
1882            evbuffer_add_uint32( out, req.offset );
1883
1884            evbuffer_reserve_space( out, req.length, iovec, 1 );
1885            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1886            iovec[0].iov_len = req.length;
1887            evbuffer_commit_space( out, iovec, 1 );
1888
1889            /* check the piece if it needs checking... */
1890            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1891                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1892                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1893
1894            if( err )
1895            {
1896                if( fext )
1897                    protocolSendReject( msgs, &req );
1898            }
1899            else
1900            {
1901                const size_t n = evbuffer_get_length( out );
1902                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1903                assert( n == msglen );
1904                tr_peerIoWriteBuf( msgs->peer->io, out, true );
1905                bytesWritten += n;
1906                msgs->clientSentAnythingAt = now;
1907                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1908            }
1909
1910            evbuffer_free( out );
1911
1912            if( err )
1913            {
1914                bytesWritten = 0;
1915                msgs = NULL;
1916            }
1917        }
1918        else if( fext ) /* peer needs a reject message */
1919        {
1920            protocolSendReject( msgs, &req );
1921        }
1922
1923        if( msgs != NULL )
1924            prefetchPieces( msgs );
1925    }
1926
1927    /**
1928    ***  Keepalive
1929    **/
1930
1931    if( ( msgs != NULL )
1932        && ( msgs->clientSentAnythingAt != 0 )
1933        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1934    {
1935        dbgmsg( msgs, "sending a keepalive message" );
1936        evbuffer_add_uint32( msgs->outMessages, 0 );
1937        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1938    }
1939
1940    return bytesWritten;
1941}
1942
1943static int
1944peerPulse( void * vmsgs )
1945{
1946    tr_peermsgs * msgs = vmsgs;
1947    const time_t  now = tr_time( );
1948
1949    if ( tr_isPeerIo( msgs->peer->io ) ) {
1950        updateDesiredRequestCount( msgs );
1951        updateBlockRequests( msgs );
1952        updateMetadataRequests( msgs, now );
1953    }
1954
1955    for( ;; )
1956        if( fillOutputBuffer( msgs, now ) < 1 )
1957            break;
1958
1959    return true; /* loop forever */
1960}
1961
1962void
1963tr_peerMsgsPulse( tr_peermsgs * msgs )
1964{
1965    if( msgs != NULL )
1966        peerPulse( msgs );
1967}
1968
1969static void
1970gotError( tr_peerIo * io UNUSED, short what, void * vmsgs )
1971{
1972    if( what & BEV_EVENT_TIMEOUT )
1973        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1974    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
1975        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1976               what, errno, tr_strerror( errno ) );
1977    fireError( vmsgs, ENOTCONN );
1978}
1979
1980static void
1981sendBitfield( tr_peermsgs * msgs )
1982{
1983    size_t byte_count = 0;
1984    struct evbuffer * out = msgs->outMessages;
1985    void * bytes = tr_cpCreatePieceBitfield( &msgs->torrent->completion, &byte_count );
1986
1987    evbuffer_add_uint32( out, sizeof( uint8_t ) + byte_count );
1988    evbuffer_add_uint8 ( out, BT_BITFIELD );
1989    evbuffer_add       ( out, bytes, byte_count );
1990    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
1991    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1992
1993    tr_free( bytes );
1994}
1995
1996static void
1997tellPeerWhatWeHave( tr_peermsgs * msgs )
1998{
1999    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2000
2001    if( fext && tr_cpHasAll( &msgs->torrent->completion ) )
2002    {
2003        protocolSendHaveAll( msgs );
2004    }
2005    else if( fext && tr_cpHasNone( &msgs->torrent->completion ) )
2006    {
2007        protocolSendHaveNone( msgs );
2008    }
2009    else if( !tr_cpHasNone( &msgs->torrent->completion ) )
2010    {
2011        sendBitfield( msgs );
2012    }
2013}
2014
2015/**
2016***
2017**/
2018
2019/* some peers give us error messages if we send
2020   more than this many peers in a single pex message
2021   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2022#define MAX_PEX_ADDED 50
2023#define MAX_PEX_DROPPED 50
2024
2025typedef struct
2026{
2027    tr_pex *  added;
2028    tr_pex *  dropped;
2029    tr_pex *  elements;
2030    int       addedCount;
2031    int       droppedCount;
2032    int       elementCount;
2033}
2034PexDiffs;
2035
2036static void
2037pexAddedCb( void * vpex, void * userData )
2038{
2039    PexDiffs * diffs = userData;
2040    tr_pex *   pex = vpex;
2041
2042    if( diffs->addedCount < MAX_PEX_ADDED )
2043    {
2044        diffs->added[diffs->addedCount++] = *pex;
2045        diffs->elements[diffs->elementCount++] = *pex;
2046    }
2047}
2048
2049static inline void
2050pexDroppedCb( void * vpex, void * userData )
2051{
2052    PexDiffs * diffs = userData;
2053    tr_pex *   pex = vpex;
2054
2055    if( diffs->droppedCount < MAX_PEX_DROPPED )
2056    {
2057        diffs->dropped[diffs->droppedCount++] = *pex;
2058    }
2059}
2060
2061static inline void
2062pexElementCb( void * vpex, void * userData )
2063{
2064    PexDiffs * diffs = userData;
2065    tr_pex * pex = vpex;
2066
2067    diffs->elements[diffs->elementCount++] = *pex;
2068}
2069
2070typedef void ( tr_set_func )( void * element, void * userData );
2071
2072/**
2073 * @brief find the differences and commonalities in two sorted sets
2074 * @param a the first set
2075 * @param aCount the number of elements in the set 'a'
2076 * @param b the second set
2077 * @param bCount the number of elements in the set 'b'
2078 * @param compare the sorting method for both sets
2079 * @param elementSize the sizeof the element in the two sorted sets
2080 * @param in_a called for items in set 'a' but not set 'b'
2081 * @param in_b called for items in set 'b' but not set 'a'
2082 * @param in_both called for items that are in both sets
2083 * @param userData user data passed along to in_a, in_b, and in_both
2084 */
2085static void
2086tr_set_compare( const void * va, size_t aCount,
2087                const void * vb, size_t bCount,
2088                int compare( const void * a, const void * b ),
2089                size_t elementSize,
2090                tr_set_func in_a_cb,
2091                tr_set_func in_b_cb,
2092                tr_set_func in_both_cb,
2093                void * userData )
2094{
2095    const uint8_t * a = va;
2096    const uint8_t * b = vb;
2097    const uint8_t * aend = a + elementSize * aCount;
2098    const uint8_t * bend = b + elementSize * bCount;
2099
2100    while( a != aend || b != bend )
2101    {
2102        if( a == aend )
2103        {
2104            ( *in_b_cb )( (void*)b, userData );
2105            b += elementSize;
2106        }
2107        else if( b == bend )
2108        {
2109            ( *in_a_cb )( (void*)a, userData );
2110            a += elementSize;
2111        }
2112        else
2113        {
2114            const int val = ( *compare )( a, b );
2115
2116            if( !val )
2117            {
2118                ( *in_both_cb )( (void*)a, userData );
2119                a += elementSize;
2120                b += elementSize;
2121            }
2122            else if( val < 0 )
2123            {
2124                ( *in_a_cb )( (void*)a, userData );
2125                a += elementSize;
2126            }
2127            else if( val > 0 )
2128            {
2129                ( *in_b_cb )( (void*)b, userData );
2130                b += elementSize;
2131            }
2132        }
2133    }
2134}
2135
2136
2137static void
2138sendPex( tr_peermsgs * msgs )
2139{
2140    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2141    {
2142        PexDiffs diffs;
2143        PexDiffs diffs6;
2144        tr_pex * newPex = NULL;
2145        tr_pex * newPex6 = NULL;
2146        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2147        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2148
2149        /* build the diffs */
2150        diffs.added = tr_new( tr_pex, newCount );
2151        diffs.addedCount = 0;
2152        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2153        diffs.droppedCount = 0;
2154        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2155        diffs.elementCount = 0;
2156        tr_set_compare( msgs->pex, msgs->pexCount,
2157                        newPex, newCount,
2158                        tr_pexCompare, sizeof( tr_pex ),
2159                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2160        diffs6.added = tr_new( tr_pex, newCount6 );
2161        diffs6.addedCount = 0;
2162        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2163        diffs6.droppedCount = 0;
2164        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2165        diffs6.elementCount = 0;
2166        tr_set_compare( msgs->pex6, msgs->pexCount6,
2167                        newPex6, newCount6,
2168                        tr_pexCompare, sizeof( tr_pex ),
2169                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2170        dbgmsg(
2171            msgs,
2172            "pex: old peer count %d+%d, new peer count %d+%d, "
2173            "added %d+%d, removed %d+%d",
2174            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2175            diffs.addedCount, diffs6.addedCount,
2176            diffs.droppedCount, diffs6.droppedCount );
2177
2178        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2179            !diffs6.droppedCount )
2180        {
2181            tr_free( diffs.elements );
2182            tr_free( diffs6.elements );
2183        }
2184        else
2185        {
2186            int  i;
2187            tr_benc val;
2188            char * benc;
2189            int bencLen;
2190            uint8_t * tmp, *walk;
2191            struct evbuffer * out = msgs->outMessages;
2192
2193            /* update peer */
2194            tr_free( msgs->pex );
2195            msgs->pex = diffs.elements;
2196            msgs->pexCount = diffs.elementCount;
2197            tr_free( msgs->pex6 );
2198            msgs->pex6 = diffs6.elements;
2199            msgs->pexCount6 = diffs6.elementCount;
2200
2201            /* build the pex payload */
2202            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2203                                         * speed vs. likelihood? */
2204
2205            if( diffs.addedCount > 0)
2206            {
2207                /* "added" */
2208                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2209                for( i = 0; i < diffs.addedCount; ++i ) {
2210                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2211                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2212                }
2213                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2214                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2215                tr_free( tmp );
2216
2217                /* "added.f"
2218                 * unset each holepunch flag because we don't support it. */
2219                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2220                for( i = 0; i < diffs.addedCount; ++i )
2221                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2222                assert( ( walk - tmp ) == diffs.addedCount );
2223                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2224                tr_free( tmp );
2225            }
2226
2227            if( diffs.droppedCount > 0 )
2228            {
2229                /* "dropped" */
2230                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2231                for( i = 0; i < diffs.droppedCount; ++i ) {
2232                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2233                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2234                }
2235                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2236                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2237                tr_free( tmp );
2238            }
2239
2240            if( diffs6.addedCount > 0 )
2241            {
2242                /* "added6" */
2243                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2244                for( i = 0; i < diffs6.addedCount; ++i ) {
2245                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2246                    walk += 16;
2247                    memcpy( walk, &diffs6.added[i].port, 2 );
2248                    walk += 2;
2249                }
2250                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2251                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2252                tr_free( tmp );
2253
2254                /* "added6.f"
2255                 * unset each holepunch flag because we don't support it. */
2256                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2257                for( i = 0; i < diffs6.addedCount; ++i )
2258                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2259                assert( ( walk - tmp ) == diffs6.addedCount );
2260                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2261                tr_free( tmp );
2262            }
2263
2264            if( diffs6.droppedCount > 0 )
2265            {
2266                /* "dropped6" */
2267                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2268                for( i = 0; i < diffs6.droppedCount; ++i ) {
2269                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2270                    walk += 16;
2271                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2272                    walk += 2;
2273                }
2274                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2275                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2276                tr_free( tmp );
2277            }
2278
2279            /* write the pex message */
2280            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2281            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2282            evbuffer_add_uint8 ( out, BT_LTEP );
2283            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2284            evbuffer_add       ( out, benc, bencLen );
2285            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2286            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2287            dbgOutMessageLen( msgs );
2288
2289            tr_free( benc );
2290            tr_bencFree( &val );
2291        }
2292
2293        /* cleanup */
2294        tr_free( diffs.added );
2295        tr_free( diffs.dropped );
2296        tr_free( newPex );
2297        tr_free( diffs6.added );
2298        tr_free( diffs6.dropped );
2299        tr_free( newPex6 );
2300
2301        /*msgs->clientSentPexAt = tr_time( );*/
2302    }
2303}
2304
2305static void
2306pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2307{
2308    struct tr_peermsgs * msgs = vmsgs;
2309
2310    sendPex( msgs );
2311
2312    assert( msgs->pexTimer != NULL );
2313    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2314}
2315
2316/**
2317***
2318**/
2319
2320tr_peermsgs*
2321tr_peerMsgsNew( struct tr_torrent    * torrent,
2322                struct tr_peer       * peer,
2323                tr_peer_callback     * callback,
2324                void                 * callbackData )
2325{
2326    tr_peermsgs * m;
2327
2328    assert( peer );
2329    assert( peer->io );
2330
2331    m = tr_new0( tr_peermsgs, 1 );
2332    m->callback = callback;
2333    m->callbackData = callbackData;
2334    m->peer = peer;
2335    m->torrent = torrent;
2336    m->peer->clientIsChoked = 1;
2337    m->peer->peerIsChoked = 1;
2338    m->peer->clientIsInterested = 0;
2339    m->peer->peerIsInterested = 0;
2340    m->state = AWAITING_BT_LENGTH;
2341    m->outMessages = evbuffer_new( );
2342    m->outMessagesBatchedAt = 0;
2343    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2344    m->incoming.block = evbuffer_new( );
2345    peer->msgs = m;
2346
2347    if( tr_torrentAllowsPex( torrent ) ) {
2348        m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2349        tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2350    }
2351
2352    if( tr_peerIoSupportsUTP( peer->io ) ) {
2353        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2354        tr_peerMgrSetUtpSupported( torrent, addr );
2355        tr_peerMgrSetUtpFailed( torrent, addr, false );
2356    }
2357
2358    if( tr_peerIoSupportsLTEP( peer->io ) )
2359        sendLtepHandshake( m );
2360
2361    tellPeerWhatWeHave( m );
2362
2363    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2364    {
2365        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2366        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2367        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2368            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2369        }
2370    }
2371
2372    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2373    updateDesiredRequestCount( m );
2374
2375    return m;
2376}
2377
2378void
2379tr_peerMsgsFree( tr_peermsgs* msgs )
2380{
2381    if( msgs )
2382    {
2383        if( msgs->pexTimer != NULL )
2384            event_free( msgs->pexTimer );
2385
2386        evbuffer_free( msgs->incoming.block );
2387        evbuffer_free( msgs->outMessages );
2388        tr_free( msgs->pex6 );
2389        tr_free( msgs->pex );
2390
2391        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2392        tr_free( msgs );
2393    }
2394}
Note: See TracBrowser for help on using the repository browser.