source: trunk/libtransmission/peer-msgs.c @ 13300

Last change on this file since 13300 was 13300, checked in by jordan, 9 years ago

(trunk libT) #4894 -- don't use evbuffer_add_printf() and evbuffer_pullup() together.

  • Property svn:keywords set to Date Rev Author Id
File size: 71.5 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 13300 2012-05-17 17:40:31Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <event2/buffer.h>
20#include <event2/bufferevent.h>
21#include <event2/event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "cache.h"
26#include "completion.h"
27#include "crypto.h" /* tr_sha1() */
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-msgs.h"
31#include "session.h"
32#include "torrent.h"
33#include "torrent-magnet.h"
34#include "tr-dht.h"
35#include "utils.h"
36#include "version.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54
55    BT_FEXT_SUGGEST         = 13,
56    BT_FEXT_HAVE_ALL        = 14,
57    BT_FEXT_HAVE_NONE       = 15,
58    BT_FEXT_REJECT          = 16,
59    BT_FEXT_ALLOWED_FAST    = 17,
60
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    UT_PEX_ID               = 1,
66    UT_METADATA_ID          = 3,
67
68    MAX_PEX_PEER_COUNT      = 50,
69
70    MIN_CHOKE_PERIOD_SEC    = 10,
71
72    /* idle seconds before we send a keepalive */
73    KEEPALIVE_INTERVAL_SECS = 100,
74
75    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
76
77    REQQ                    = 512,
78
79    METADATA_REQQ           = 64,
80
81    /* used in lowering the outMessages queue period */
82    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
83    HIGH_PRIORITY_INTERVAL_SECS = 2,
84    LOW_PRIORITY_INTERVAL_SECS = 10,
85
86    /* number of pieces we'll allow in our fast set */
87    MAX_FAST_SET_SIZE = 3,
88
89    /* defined in BEP #9 */
90    METADATA_MSG_TYPE_REQUEST = 0,
91    METADATA_MSG_TYPE_DATA = 1,
92    METADATA_MSG_TYPE_REJECT = 2
93};
94
95enum
96{
97    AWAITING_BT_LENGTH,
98    AWAITING_BT_ID,
99    AWAITING_BT_MESSAGE,
100    AWAITING_BT_PIECE
101};
102
103/**
104***
105**/
106
107struct peer_request
108{
109    uint32_t    index;
110    uint32_t    offset;
111    uint32_t    length;
112};
113
114static void
115blockToReq( const tr_torrent     * tor,
116            tr_block_index_t       block,
117            struct peer_request  * setme )
118{
119    tr_torrentGetBlockLocation( tor, block, &setme->index,
120                                            &setme->offset,
121                                            &setme->length );
122}
123
124/**
125***
126**/
127
128/* this is raw, unchanged data from the peer regarding
129 * the current message that it's sending us. */
130struct tr_incoming
131{
132    uint8_t                id;
133    uint32_t               length; /* includes the +1 for id length */
134    struct peer_request    blockReq; /* metadata for incoming blocks */
135    struct evbuffer *      block; /* piece data for incoming blocks */
136};
137
138/**
139 * Low-level communication state information about a connected peer.
140 *
141 * This structure remembers the low-level protocol states that we're
142 * in with this peer, such as active requests, pex messages, and so on.
143 * Its fields are all private to peer-msgs.c.
144 *
145 * Data not directly involved with sending & receiving messages is
146 * stored in tr_peer, where it can be accessed by both peermsgs and
147 * the peer manager.
148 *
149 * @see struct peer_atom
150 * @see tr_peer
151 */
152struct tr_peermsgs
153{
154    bool            peerSupportsPex;
155    bool            peerSupportsMetadataXfer;
156    bool            clientSentLtepHandshake;
157    bool            peerSentLtepHandshake;
158
159    /*bool          haveFastSet;*/
160
161    int             desiredRequestCount;
162
163    int             prefetchCount;
164
165    /* how long the outMessages batch should be allowed to grow before
166     * it's flushed -- some messages (like requests >:) should be sent
167     * very quickly; others aren't as urgent. */
168    int8_t          outMessagesBatchPeriod;
169
170    uint8_t         state;
171    uint8_t         ut_pex_id;
172    uint8_t         ut_metadata_id;
173    uint16_t        pexCount;
174    uint16_t        pexCount6;
175
176    size_t          metadata_size_hint;
177#if 0
178    size_t                 fastsetSize;
179    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
180#endif
181
182    tr_peer *              peer;
183
184    tr_torrent *           torrent;
185
186    tr_peer_callback      * callback;
187    void                  * callbackData;
188
189    struct evbuffer *      outMessages; /* all the non-piece messages */
190
191    struct peer_request    peerAskedFor[REQQ];
192
193    int                    peerAskedForMetadata[METADATA_REQQ];
194    int                    peerAskedForMetadataCount;
195
196    tr_pex               * pex;
197    tr_pex               * pex6;
198
199    /*time_t                 clientSentPexAt;*/
200    time_t                 clientSentAnythingAt;
201
202    /* when we started batching the outMessages */
203    time_t                outMessagesBatchedAt;
204
205    struct tr_incoming    incoming;
206
207    /* if the peer supports the Extension Protocol in BEP 10 and
208       supplied a reqq argument, it's stored here. Otherwise, the
209       value is zero and should be ignored. */
210    int64_t               reqq;
211
212    struct event        * pexTimer;
213};
214
215/**
216***
217**/
218
219static inline tr_session*
220getSession( struct tr_peermsgs * msgs )
221{
222    return msgs->torrent->session;
223}
224
225/**
226***
227**/
228
229static void
230myDebug( const char * file, int line,
231         const struct tr_peermsgs * msgs,
232         const char * fmt, ... )
233{
234    FILE * fp = tr_getLog( );
235
236    if( fp )
237    {
238        va_list           args;
239        char              timestr[64];
240        struct evbuffer * buf = evbuffer_new( );
241        char *            base = tr_basename( file );
242        char *            message;
243
244        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
245                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
246                             tr_torrentName( msgs->torrent ),
247                             tr_peerIoGetAddrStr( msgs->peer->io ),
248                             msgs->peer->client );
249        va_start( args, fmt );
250        evbuffer_add_vprintf( buf, fmt, args );
251        va_end( args );
252        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
253
254        message = evbuffer_free_to_str( buf );
255        fputs( message, fp );
256
257        tr_free( base );
258        tr_free( message );
259    }
260}
261
262#define dbgmsg( msgs, ... ) \
263    do { \
264        if( tr_deepLoggingIsActive( ) ) \
265            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
266    } while( 0 )
267
268/**
269***
270**/
271
272static void
273pokeBatchPeriod( tr_peermsgs * msgs, int interval )
274{
275    if( msgs->outMessagesBatchPeriod > interval )
276    {
277        msgs->outMessagesBatchPeriod = interval;
278        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
279    }
280}
281
282static void
283dbgOutMessageLen( tr_peermsgs * msgs )
284{
285    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
286}
287
288static void
289protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
290{
291    struct evbuffer * out = msgs->outMessages;
292
293    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
294
295    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
296    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
297    evbuffer_add_uint32( out, req->index );
298    evbuffer_add_uint32( out, req->offset );
299    evbuffer_add_uint32( out, req->length );
300
301    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
302    dbgOutMessageLen( msgs );
303}
304
305static void
306protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
307{
308    struct evbuffer * out = msgs->outMessages;
309
310    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
311    evbuffer_add_uint8 ( out, BT_REQUEST );
312    evbuffer_add_uint32( out, req->index );
313    evbuffer_add_uint32( out, req->offset );
314    evbuffer_add_uint32( out, req->length );
315
316    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
317    dbgOutMessageLen( msgs );
318    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
319}
320
321static void
322protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
323{
324    struct evbuffer * out = msgs->outMessages;
325
326    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
327    evbuffer_add_uint8 ( out, BT_CANCEL );
328    evbuffer_add_uint32( out, req->index );
329    evbuffer_add_uint32( out, req->offset );
330    evbuffer_add_uint32( out, req->length );
331
332    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
333    dbgOutMessageLen( msgs );
334    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
335}
336
337static void
338protocolSendPort(tr_peermsgs *msgs, uint16_t port)
339{
340    struct evbuffer * out = msgs->outMessages;
341
342    dbgmsg( msgs, "sending Port %u", port);
343    evbuffer_add_uint32( out, 3 );
344    evbuffer_add_uint8 ( out, BT_PORT );
345    evbuffer_add_uint16( out, port);
346}
347
348static void
349protocolSendHave( tr_peermsgs * msgs, uint32_t index )
350{
351    struct evbuffer * out = msgs->outMessages;
352
353    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
354    evbuffer_add_uint8 ( out, BT_HAVE );
355    evbuffer_add_uint32( out, index );
356
357    dbgmsg( msgs, "sending Have %u", index );
358    dbgOutMessageLen( msgs );
359    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
360}
361
362#if 0
363static void
364protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
365{
366    tr_peerIo       * io  = msgs->peer->io;
367    struct evbuffer * out = msgs->outMessages;
368
369    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
370
371    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
372    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
373    evbuffer_add_uint32( io, out, pieceIndex );
374
375    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
376    dbgOutMessageLen( msgs );
377}
378#endif
379
380static void
381protocolSendChoke( tr_peermsgs * msgs, int choke )
382{
383    struct evbuffer * out = msgs->outMessages;
384
385    evbuffer_add_uint32( out, sizeof( uint8_t ) );
386    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
387
388    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
389    dbgOutMessageLen( msgs );
390    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
391}
392
393static void
394protocolSendHaveAll( tr_peermsgs * msgs )
395{
396    struct evbuffer * out = msgs->outMessages;
397
398    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
399
400    evbuffer_add_uint32( out, sizeof( uint8_t ) );
401    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
402
403    dbgmsg( msgs, "sending HAVE_ALL..." );
404    dbgOutMessageLen( msgs );
405    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
406}
407
408static void
409protocolSendHaveNone( tr_peermsgs * msgs )
410{
411    struct evbuffer * out = msgs->outMessages;
412
413    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
414
415    evbuffer_add_uint32( out, sizeof( uint8_t ) );
416    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
417
418    dbgmsg( msgs, "sending HAVE_NONE..." );
419    dbgOutMessageLen( msgs );
420    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
421}
422
423/**
424***  EVENTS
425**/
426
427static void
428publish( tr_peermsgs * msgs, tr_peer_event * e )
429{
430    assert( msgs->peer );
431    assert( msgs->peer->msgs == msgs );
432
433    if( msgs->callback != NULL )
434        msgs->callback( msgs->peer, e, msgs->callbackData );
435}
436
437static void
438fireError( tr_peermsgs * msgs, int err )
439{
440    tr_peer_event e = TR_PEER_EVENT_INIT;
441    e.eventType = TR_PEER_ERROR;
442    e.err = err;
443    publish( msgs, &e );
444}
445
446static void
447fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
448{
449    tr_peer_event e = TR_PEER_EVENT_INIT;
450    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
451    e.pieceIndex = req->index;
452    e.offset = req->offset;
453    e.length = req->length;
454    publish( msgs, &e );
455}
456
457static void
458fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
459{
460    tr_peer_event e = TR_PEER_EVENT_INIT;
461    e.eventType = TR_PEER_CLIENT_GOT_REJ;
462    e.pieceIndex = req->index;
463    e.offset = req->offset;
464    e.length = req->length;
465    publish( msgs, &e );
466}
467
468static void
469fireGotChoke( tr_peermsgs * msgs )
470{
471    tr_peer_event e = TR_PEER_EVENT_INIT;
472    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
473    publish( msgs, &e );
474}
475
476static void
477fireClientGotHaveAll( tr_peermsgs * msgs )
478{
479    tr_peer_event e = TR_PEER_EVENT_INIT;
480    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
481    publish( msgs, &e );
482}
483
484static void
485fireClientGotHaveNone( tr_peermsgs * msgs )
486{
487    tr_peer_event e = TR_PEER_EVENT_INIT;
488    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
489    publish( msgs, &e );
490}
491
492static void
493fireClientGotData( tr_peermsgs * msgs, uint32_t length, int wasPieceData )
494{
495    tr_peer_event e = TR_PEER_EVENT_INIT;
496
497    e.length = length;
498    e.eventType = TR_PEER_CLIENT_GOT_DATA;
499    e.wasPieceData = wasPieceData;
500    publish( msgs, &e );
501}
502
503static void
504fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
505{
506    tr_peer_event e = TR_PEER_EVENT_INIT;
507    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
508    e.pieceIndex = pieceIndex;
509    publish( msgs, &e );
510}
511
512static void
513fireClientGotPort( tr_peermsgs * msgs, tr_port port )
514{
515    tr_peer_event e = TR_PEER_EVENT_INIT;
516    e.eventType = TR_PEER_CLIENT_GOT_PORT;
517    e.port = port;
518    publish( msgs, &e );
519}
520
521static void
522fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
523{
524    tr_peer_event e = TR_PEER_EVENT_INIT;
525    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
526    e.pieceIndex = pieceIndex;
527    publish( msgs, &e );
528}
529
530static void
531fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
532{
533    tr_peer_event e = TR_PEER_EVENT_INIT;
534    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
535    e.bitfield = bitfield;
536    publish( msgs, &e );
537}
538
539static void
540fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
541{
542    tr_peer_event e = TR_PEER_EVENT_INIT;
543    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
544    e.pieceIndex = index;
545    publish( msgs, &e );
546}
547
548static void
549firePeerGotData( tr_peermsgs * msgs, uint32_t length, bool wasPieceData )
550{
551    tr_peer_event e = TR_PEER_EVENT_INIT;
552
553    e.length = length;
554    e.eventType = TR_PEER_PEER_GOT_DATA;
555    e.wasPieceData = wasPieceData;
556
557    publish( msgs, &e );
558}
559
560/**
561***  ALLOWED FAST SET
562***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
563**/
564
565#if 0
566size_t
567tr_generateAllowedSet( tr_piece_index_t * setmePieces,
568                       size_t             desiredSetSize,
569                       size_t             pieceCount,
570                       const uint8_t    * infohash,
571                       const tr_address * addr )
572{
573    size_t setSize = 0;
574
575    assert( setmePieces );
576    assert( desiredSetSize <= pieceCount );
577    assert( desiredSetSize );
578    assert( pieceCount );
579    assert( infohash );
580    assert( addr );
581
582    if( addr->type == TR_AF_INET )
583    {
584        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
585        uint8_t x[SHA_DIGEST_LENGTH];
586
587        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
588        memcpy( w, &ui32, sizeof( uint32_t ) );
589        walk += sizeof( uint32_t );
590        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
591        walk += SHA_DIGEST_LENGTH;
592        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
593        assert( sizeof( w ) == walk-w );
594
595        while( setSize<desiredSetSize )
596        {
597            int i;
598            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
599            {
600                size_t k;
601                uint32_t j = i * 4;                                  /* (5) */
602                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
603                uint32_t index = y % pieceCount;                     /* (7) */
604
605                for( k=0; k<setSize; ++k )                           /* (8) */
606                    if( setmePieces[k] == index )
607                        break;
608
609                if( k == setSize )
610                    setmePieces[setSize++] = index;                  /* (9) */
611            }
612
613            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
614        }
615    }
616
617    return setSize;
618}
619
620static void
621updateFastSet( tr_peermsgs * msgs UNUSED )
622{
623    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
624    const int peerIsNeedy = msgs->peer->progress < 0.10;
625
626    if( fext && peerIsNeedy && !msgs->haveFastSet )
627    {
628        size_t i;
629        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
630        const tr_info * inf = &msgs->torrent->info;
631        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
632
633        /* build the fast set */
634        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
635        msgs->haveFastSet = 1;
636
637        /* send it to the peer */
638        for( i=0; i<msgs->fastsetSize; ++i )
639            protocolSendAllowedFast( msgs, msgs->fastset[i] );
640    }
641}
642#endif
643
644/**
645***  INTEREST
646**/
647
648static void
649sendInterest( tr_peermsgs * msgs, bool clientIsInterested )
650{
651    struct evbuffer * out = msgs->outMessages;
652
653    assert( msgs );
654    assert( tr_isBool( clientIsInterested ) );
655
656    msgs->peer->clientIsInterested = clientIsInterested;
657    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
658    evbuffer_add_uint32( out, sizeof( uint8_t ) );
659    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
660
661    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
662    dbgOutMessageLen( msgs );
663}
664
665static void
666updateInterest( tr_peermsgs * msgs UNUSED )
667{
668    /* FIXME -- might need to poke the mgr on startup */
669}
670
671void
672tr_peerMsgsSetInterested( tr_peermsgs * msgs, bool clientIsInterested )
673{
674    assert( tr_isBool( clientIsInterested ) );
675
676    if( clientIsInterested != msgs->peer->clientIsInterested )
677        sendInterest( msgs, clientIsInterested );
678}
679
680static bool
681popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
682{
683    if( msgs->peerAskedForMetadataCount == 0 )
684        return false;
685
686    *piece = msgs->peerAskedForMetadata[0];
687
688    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
689                               msgs->peerAskedForMetadataCount-- );
690
691    return true;
692}
693
694static bool
695popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
696{
697    if( msgs->peer->pendingReqsToClient == 0 )
698        return false;
699
700    *setme = msgs->peerAskedFor[0];
701
702    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
703                               msgs->peer->pendingReqsToClient-- );
704
705    return true;
706}
707
708static void
709cancelAllRequestsToClient( tr_peermsgs * msgs )
710{
711    struct peer_request req;
712    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
713
714    while( popNextRequest( msgs, &req ))
715        if( mustSendCancel )
716            protocolSendReject( msgs, &req );
717}
718
719void
720tr_peerMsgsSetChoke( tr_peermsgs * msgs, bool peerIsChoked )
721{
722    const time_t now = tr_time( );
723    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
724
725    assert( msgs );
726    assert( msgs->peer );
727    assert( tr_isBool( peerIsChoked ) );
728
729    if( msgs->peer->chokeChangedAt > fibrillationTime )
730    {
731        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", peerIsChoked );
732    }
733    else if( msgs->peer->peerIsChoked != peerIsChoked )
734    {
735        msgs->peer->peerIsChoked = peerIsChoked;
736        if( peerIsChoked )
737            cancelAllRequestsToClient( msgs );
738        protocolSendChoke( msgs, peerIsChoked );
739        msgs->peer->chokeChangedAt = now;
740    }
741}
742
743/**
744***
745**/
746
747void
748tr_peerMsgsHave( tr_peermsgs * msgs, uint32_t index )
749{
750    protocolSendHave( msgs, index );
751
752    /* since we have more pieces now, we might not be interested in this peer */
753    updateInterest( msgs );
754}
755
756/**
757***
758**/
759
760static bool
761reqIsValid( const tr_peermsgs * peer,
762            uint32_t            index,
763            uint32_t            offset,
764            uint32_t            length )
765{
766    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
767}
768
769static bool
770requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
771{
772    return reqIsValid( msgs, req->index, req->offset, req->length );
773}
774
775void
776tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
777{
778    struct peer_request req;
779/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
780    blockToReq( msgs->torrent, block, &req );
781    protocolSendCancel( msgs, &req );
782}
783
784/**
785***
786**/
787
788static void
789sendLtepHandshake( tr_peermsgs * msgs )
790{
791    tr_benc val;
792    bool allow_pex;
793    bool allow_metadata_xfer;
794    struct evbuffer * payload;
795    struct evbuffer * out = msgs->outMessages;
796    const unsigned char * ipv6 = tr_globalIPv6();
797
798    if( msgs->clientSentLtepHandshake )
799        return;
800
801    dbgmsg( msgs, "sending an ltep handshake" );
802    msgs->clientSentLtepHandshake = 1;
803
804    /* decide if we want to advertise metadata xfer support (BEP 9) */
805    if( tr_torrentIsPrivate( msgs->torrent ) )
806        allow_metadata_xfer = 0;
807    else
808        allow_metadata_xfer = 1;
809
810    /* decide if we want to advertise pex support */
811    if( !tr_torrentAllowsPex( msgs->torrent ) )
812        allow_pex = 0;
813    else if( msgs->peerSentLtepHandshake )
814        allow_pex = msgs->peerSupportsPex ? 1 : 0;
815    else
816        allow_pex = 1;
817
818    tr_bencInitDict( &val, 8 );
819    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
820    if( ipv6 != NULL )
821        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
822    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
823                            && ( msgs->torrent->infoDictLength > 0 ) )
824        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
825    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
826    tr_bencDictAddInt( &val, "reqq", REQQ );
827    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
828    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
829    if( allow_metadata_xfer || allow_pex ) {
830        tr_benc * m  = tr_bencDictAddDict( &val, "m", 2 );
831        if( allow_metadata_xfer )
832            tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
833        if( allow_pex )
834            tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
835    }
836
837    payload = tr_bencToBuf( &val, TR_FMT_BENC );
838
839    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
840    evbuffer_add_uint8 ( out, BT_LTEP );
841    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
842    evbuffer_add_buffer( out, payload );
843    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
844    dbgOutMessageLen( msgs );
845
846    /* cleanup */
847    evbuffer_free( payload );
848    tr_bencFree( &val );
849}
850
851static void
852parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
853{
854    int64_t   i;
855    tr_benc   val, * sub;
856    uint8_t * tmp = tr_new( uint8_t, len );
857    const uint8_t *addr;
858    size_t addr_len;
859    tr_pex pex;
860    int8_t seedProbability = -1;
861
862    memset( &pex, 0, sizeof( tr_pex ) );
863
864    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
865    msgs->peerSentLtepHandshake = 1;
866
867    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
868    {
869        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
870        tr_free( tmp );
871        return;
872    }
873
874    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
875
876    /* does the peer prefer encrypted connections? */
877    if( tr_bencDictFindInt( &val, "e", &i ) ) {
878        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
879                                              : ENCRYPTION_PREFERENCE_NO;
880        if( i )
881            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
882    }
883
884    /* check supported messages for utorrent pex */
885    msgs->peerSupportsPex = 0;
886    msgs->peerSupportsMetadataXfer = 0;
887
888    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
889        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
890            msgs->peerSupportsPex = i != 0;
891            msgs->ut_pex_id = (uint8_t) i;
892            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
893        }
894        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
895            msgs->peerSupportsMetadataXfer = i != 0;
896            msgs->ut_metadata_id = (uint8_t) i;
897            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
898        }
899        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
900            /* Mysterious µTorrent extension that we don't grok.  However,
901               it implies support for µTP, so use it to indicate that. */
902            tr_peerMgrSetUtpFailed( msgs->torrent,
903                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
904                                    false );
905        }
906    }
907
908    /* look for metainfo size (BEP 9) */
909    if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) {
910        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
911        msgs->metadata_size_hint = (size_t) i;
912    }
913
914    /* look for upload_only (BEP 21) */
915    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
916        seedProbability = i==0 ? 0 : 100;
917
918    /* get peer's listening port */
919    if( tr_bencDictFindInt( &val, "p", &i ) ) {
920        pex.port = htons( (uint16_t)i );
921        fireClientGotPort( msgs, pex.port );
922        dbgmsg( msgs, "peer's port is now %d", (int)i );
923    }
924
925    if( tr_peerIoIsIncoming( msgs->peer->io )
926        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
927        && ( addr_len == 4 ) )
928    {
929        pex.addr.type = TR_AF_INET;
930        memcpy( &pex.addr.addr.addr4, addr, 4 );
931        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
932    }
933
934    if( tr_peerIoIsIncoming( msgs->peer->io )
935        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
936        && ( addr_len == 16 ) )
937    {
938        pex.addr.type = TR_AF_INET6;
939        memcpy( &pex.addr.addr.addr6, addr, 16 );
940        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
941    }
942
943    /* get peer's maximum request queue size */
944    if( tr_bencDictFindInt( &val, "reqq", &i ) )
945        msgs->reqq = i;
946
947    tr_bencFree( &val );
948    tr_free( tmp );
949}
950
951static void
952parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
953{
954    tr_benc dict;
955    char * msg_end;
956    char * benc_end;
957    int64_t msg_type = -1;
958    int64_t piece = -1;
959    int64_t total_size = 0;
960    uint8_t * tmp = tr_new( uint8_t, msglen );
961
962    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
963    msg_end = (char*)tmp + msglen;
964
965    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
966    {
967        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
968        tr_bencDictFindInt( &dict, "piece", &piece );
969        tr_bencDictFindInt( &dict, "total_size", &total_size );
970        tr_bencFree( &dict );
971    }
972
973    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
974            (int)msg_type, (int)piece, (int)total_size );
975
976    if( msg_type == METADATA_MSG_TYPE_REJECT )
977    {
978        /* NOOP */
979    }
980
981    if( ( msg_type == METADATA_MSG_TYPE_DATA )
982        && ( !tr_torrentHasMetadata( msgs->torrent ) )
983        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
984        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
985    {
986        const int pieceLen = msg_end - benc_end;
987        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
988    }
989
990    if( msg_type == METADATA_MSG_TYPE_REQUEST )
991    {
992        if( ( piece >= 0 )
993            && tr_torrentHasMetadata( msgs->torrent )
994            && !tr_torrentIsPrivate( msgs->torrent )
995            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
996        {
997            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
998        }
999        else
1000        {
1001            tr_benc tmp;
1002            struct evbuffer * payload;
1003            struct evbuffer * out = msgs->outMessages;
1004
1005            /* build the rejection message */
1006            tr_bencInitDict( &tmp, 2 );
1007            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1008            tr_bencDictAddInt( &tmp, "piece", piece );
1009            payload = tr_bencToBuf( &tmp, TR_FMT_BENC );
1010
1011            /* write it out as a LTEP message to our outMessages buffer */
1012            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
1013            evbuffer_add_uint8 ( out, BT_LTEP );
1014            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1015            evbuffer_add_buffer( out, payload );
1016            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1017            dbgOutMessageLen( msgs );
1018
1019            /* cleanup */
1020            evbuffer_free( payload );
1021            tr_bencFree( &tmp );
1022        }
1023    }
1024
1025    tr_free( tmp );
1026}
1027
1028static void
1029parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1030{
1031    int loaded = 0;
1032    uint8_t * tmp = tr_new( uint8_t, msglen );
1033    tr_benc val;
1034    tr_torrent * tor = msgs->torrent;
1035    const uint8_t * added;
1036    size_t added_len;
1037
1038    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1039
1040    if( tr_torrentAllowsPex( tor )
1041      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1042    {
1043        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1044        {
1045            tr_pex * pex;
1046            size_t i, n;
1047            size_t added_f_len = 0;
1048            const uint8_t * added_f = NULL;
1049
1050            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1051            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1052
1053            n = MIN( n, MAX_PEX_PEER_COUNT );
1054            for( i=0; i<n; ++i )
1055            {
1056                int seedProbability = -1;
1057                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1058                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1059            }
1060
1061            tr_free( pex );
1062        }
1063
1064        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1065        {
1066            tr_pex * pex;
1067            size_t i, n;
1068            size_t added_f_len = 0;
1069            const uint8_t * added_f = NULL;
1070
1071            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1072            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1073
1074            n = MIN( n, MAX_PEX_PEER_COUNT );
1075            for( i=0; i<n; ++i )
1076            {
1077                int seedProbability = -1;
1078                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1079                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1080            }
1081
1082            tr_free( pex );
1083        }
1084    }
1085
1086    if( loaded )
1087        tr_bencFree( &val );
1088    tr_free( tmp );
1089}
1090
1091static void sendPex( tr_peermsgs * msgs );
1092
1093static void
1094parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1095{
1096    uint8_t ltep_msgid;
1097
1098    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1099    msglen--;
1100
1101    if( ltep_msgid == LTEP_HANDSHAKE )
1102    {
1103        dbgmsg( msgs, "got ltep handshake" );
1104        parseLtepHandshake( msgs, msglen, inbuf );
1105        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1106        {
1107            sendLtepHandshake( msgs );
1108            sendPex( msgs );
1109        }
1110    }
1111    else if( ltep_msgid == UT_PEX_ID )
1112    {
1113        dbgmsg( msgs, "got ut pex" );
1114        msgs->peerSupportsPex = 1;
1115        parseUtPex( msgs, msglen, inbuf );
1116    }
1117    else if( ltep_msgid == UT_METADATA_ID )
1118    {
1119        dbgmsg( msgs, "got ut metadata" );
1120        msgs->peerSupportsMetadataXfer = 1;
1121        parseUtMetadata( msgs, msglen, inbuf );
1122    }
1123    else
1124    {
1125        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1126        evbuffer_drain( inbuf, msglen );
1127    }
1128}
1129
1130static int
1131readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1132{
1133    uint32_t len;
1134
1135    if( inlen < sizeof( len ) )
1136        return READ_LATER;
1137
1138    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1139
1140    if( len == 0 ) /* peer sent us a keepalive message */
1141        dbgmsg( msgs, "got KeepAlive" );
1142    else
1143    {
1144        msgs->incoming.length = len;
1145        msgs->state = AWAITING_BT_ID;
1146    }
1147
1148    return READ_NOW;
1149}
1150
1151static int readBtMessage( tr_peermsgs *, struct evbuffer *, size_t );
1152
1153static int
1154readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1155{
1156    uint8_t id;
1157
1158    if( inlen < sizeof( uint8_t ) )
1159        return READ_LATER;
1160
1161    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1162    msgs->incoming.id = id;
1163    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1164
1165    if( id == BT_PIECE )
1166    {
1167        msgs->state = AWAITING_BT_PIECE;
1168        return READ_NOW;
1169    }
1170    else if( msgs->incoming.length != 1 )
1171    {
1172        msgs->state = AWAITING_BT_MESSAGE;
1173        return READ_NOW;
1174    }
1175    else return readBtMessage( msgs, inbuf, inlen - 1 );
1176}
1177
1178static void
1179updatePeerProgress( tr_peermsgs * msgs )
1180{
1181    tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1182
1183    /*updateFastSet( msgs );*/
1184    updateInterest( msgs );
1185}
1186
1187static void
1188prefetchPieces( tr_peermsgs *msgs )
1189{
1190    int i;
1191
1192    if( !getSession(msgs)->isPrefetchEnabled )
1193        return;
1194
1195    /* Maintain 12 prefetched blocks per unchoked peer */
1196    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1197    {
1198        const struct peer_request * req = msgs->peerAskedFor + i;
1199        if( requestIsValid( msgs, req ) )
1200        {
1201            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1202            ++msgs->prefetchCount;
1203        }
1204    }
1205}
1206
1207static void
1208peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1209{
1210    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1211    const int reqIsValid = requestIsValid( msgs, req );
1212    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1213    const int peerIsChoked = msgs->peer->peerIsChoked;
1214
1215    int allow = false;
1216
1217    if( !reqIsValid )
1218        dbgmsg( msgs, "rejecting an invalid request." );
1219    else if( !clientHasPiece )
1220        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1221    else if( peerIsChoked )
1222        dbgmsg( msgs, "rejecting request from choked peer" );
1223    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1224        dbgmsg( msgs, "rejecting request ... reqq is full" );
1225    else
1226        allow = true;
1227
1228    if( allow ) {
1229        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1230        prefetchPieces( msgs );
1231    } else if( fext ) {
1232        protocolSendReject( msgs, req );
1233    }
1234}
1235
1236static bool
1237messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1238{
1239    switch( id )
1240    {
1241        case BT_CHOKE:
1242        case BT_UNCHOKE:
1243        case BT_INTERESTED:
1244        case BT_NOT_INTERESTED:
1245        case BT_FEXT_HAVE_ALL:
1246        case BT_FEXT_HAVE_NONE:
1247            return len == 1;
1248
1249        case BT_HAVE:
1250        case BT_FEXT_SUGGEST:
1251        case BT_FEXT_ALLOWED_FAST:
1252            return len == 5;
1253
1254        case BT_BITFIELD:
1255            if( tr_torrentHasMetadata( msg->torrent ) )
1256                return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1257            /* we don't know the piece count yet,
1258               so we can only guess whether to send true or false */
1259            if( msg->metadata_size_hint > 0 )
1260                return len <= msg->metadata_size_hint;
1261            return true;
1262
1263        case BT_REQUEST:
1264        case BT_CANCEL:
1265        case BT_FEXT_REJECT:
1266            return len == 13;
1267
1268        case BT_PIECE:
1269            return len > 9 && len <= 16393;
1270
1271        case BT_PORT:
1272            return len == 3;
1273
1274        case BT_LTEP:
1275            return len >= 2;
1276
1277        default:
1278            return false;
1279    }
1280}
1281
1282static int clientGotBlock( tr_peermsgs *               msgs,
1283                           struct evbuffer *           block,
1284                           const struct peer_request * req );
1285
1286static int
1287readBtPiece( tr_peermsgs      * msgs,
1288             struct evbuffer  * inbuf,
1289             size_t             inlen,
1290             size_t           * setme_piece_bytes_read )
1291{
1292    struct peer_request * req = &msgs->incoming.blockReq;
1293
1294    assert( evbuffer_get_length( inbuf ) >= inlen );
1295    dbgmsg( msgs, "In readBtPiece" );
1296
1297    if( !req->length )
1298    {
1299        if( inlen < 8 )
1300            return READ_LATER;
1301
1302        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1303        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1304        req->length = msgs->incoming.length - 9;
1305        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1306        return READ_NOW;
1307    }
1308    else
1309    {
1310        int err;
1311        size_t n;
1312        size_t nLeft;
1313        struct evbuffer * block_buffer;
1314
1315        if( msgs->incoming.block == NULL )
1316            msgs->incoming.block = evbuffer_new( );
1317        block_buffer = msgs->incoming.block;
1318
1319        /* read in another chunk of data */
1320        nLeft = req->length - evbuffer_get_length( block_buffer );
1321        n = MIN( nLeft, inlen );
1322
1323        tr_peerIoReadBytesToBuf( msgs->peer->io, inbuf, block_buffer, n );
1324
1325        fireClientGotData( msgs, n, true );
1326        *setme_piece_bytes_read += n;
1327        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1328               n, req->index, req->offset, req->length,
1329               (int)( req->length - evbuffer_get_length( block_buffer ) ) );
1330        if( evbuffer_get_length( block_buffer ) < req->length )
1331            return READ_LATER;
1332
1333        /* pass the block along... */
1334        err = clientGotBlock( msgs, block_buffer, req );
1335        evbuffer_drain( block_buffer, evbuffer_get_length( block_buffer ) );
1336
1337        /* cleanup */
1338        req->length = 0;
1339        msgs->state = AWAITING_BT_LENGTH;
1340        return err ? READ_ERR : READ_NOW;
1341    }
1342}
1343
1344static void updateDesiredRequestCount( tr_peermsgs * msgs );
1345
1346static int
1347readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1348{
1349    uint32_t      ui32;
1350    uint32_t      msglen = msgs->incoming.length;
1351    const uint8_t id = msgs->incoming.id;
1352#ifndef NDEBUG
1353    const size_t  startBufLen = evbuffer_get_length( inbuf );
1354#endif
1355    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1356
1357    --msglen; /* id length */
1358
1359    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1360
1361    if( inlen < msglen )
1362        return READ_LATER;
1363
1364    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1365    {
1366        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1367        fireError( msgs, EMSGSIZE );
1368        return READ_ERR;
1369    }
1370
1371    switch( id )
1372    {
1373        case BT_CHOKE:
1374            dbgmsg( msgs, "got Choke" );
1375            msgs->peer->clientIsChoked = 1;
1376            if( !fext )
1377                fireGotChoke( msgs );
1378            break;
1379
1380        case BT_UNCHOKE:
1381            dbgmsg( msgs, "got Unchoke" );
1382            msgs->peer->clientIsChoked = 0;
1383            updateDesiredRequestCount( msgs );
1384            break;
1385
1386        case BT_INTERESTED:
1387            dbgmsg( msgs, "got Interested" );
1388            msgs->peer->peerIsInterested = 1;
1389            break;
1390
1391        case BT_NOT_INTERESTED:
1392            dbgmsg( msgs, "got Not Interested" );
1393            msgs->peer->peerIsInterested = 0;
1394            break;
1395
1396        case BT_HAVE:
1397            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1398            dbgmsg( msgs, "got Have: %u", ui32 );
1399            if( tr_torrentHasMetadata( msgs->torrent )
1400                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1401            {
1402                fireError( msgs, ERANGE );
1403                return READ_ERR;
1404            }
1405
1406            /* a peer can send the same HAVE message twice... */
1407            if( !tr_bitfieldHas( &msgs->peer->have, ui32 ) ) {
1408                tr_bitfieldAdd( &msgs->peer->have, ui32 );
1409                fireClientGotHave( msgs, ui32 );
1410            }
1411            updatePeerProgress( msgs );
1412            break;
1413
1414        case BT_BITFIELD: {
1415            uint8_t * tmp = tr_new( uint8_t, msglen );
1416            dbgmsg( msgs, "got a bitfield" );
1417            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1418            tr_bitfieldSetRaw( &msgs->peer->have, tmp, msglen, tr_torrentHasMetadata( msgs->torrent ) );
1419            fireClientGotBitfield( msgs, &msgs->peer->have );
1420            updatePeerProgress( msgs );
1421            tr_free( tmp );
1422            break;
1423        }
1424
1425        case BT_REQUEST:
1426        {
1427            struct peer_request r;
1428            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1429            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1430            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1431            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1432            peerMadeRequest( msgs, &r );
1433            break;
1434        }
1435
1436        case BT_CANCEL:
1437        {
1438            int i;
1439            struct peer_request r;
1440            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1441            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1442            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1443            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1444            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1445
1446            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1447                const struct peer_request * req = msgs->peerAskedFor + i;
1448                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1449                    break;
1450            }
1451
1452            if( i < msgs->peer->pendingReqsToClient )
1453                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1454                                           msgs->peer->pendingReqsToClient-- );
1455            break;
1456        }
1457
1458        case BT_PIECE:
1459            assert( 0 ); /* handled elsewhere! */
1460            break;
1461
1462        case BT_PORT:
1463            dbgmsg( msgs, "Got a BT_PORT" );
1464            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1465            if( msgs->peer->dht_port > 0 )
1466                tr_dhtAddNode( getSession(msgs),
1467                               tr_peerAddress( msgs->peer ),
1468                               msgs->peer->dht_port, 0 );
1469            break;
1470
1471        case BT_FEXT_SUGGEST:
1472            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1473            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1474            if( fext )
1475                fireClientGotSuggest( msgs, ui32 );
1476            else {
1477                fireError( msgs, EMSGSIZE );
1478                return READ_ERR;
1479            }
1480            break;
1481
1482        case BT_FEXT_ALLOWED_FAST:
1483            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1484            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1485            if( fext )
1486                fireClientGotAllowedFast( msgs, ui32 );
1487            else {
1488                fireError( msgs, EMSGSIZE );
1489                return READ_ERR;
1490            }
1491            break;
1492
1493        case BT_FEXT_HAVE_ALL:
1494            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1495            if( fext ) {
1496                tr_bitfieldSetHasAll( &msgs->peer->have );
1497assert( tr_bitfieldHasAll( &msgs->peer->have ) );
1498                fireClientGotHaveAll( msgs );
1499                updatePeerProgress( msgs );
1500            } else {
1501                fireError( msgs, EMSGSIZE );
1502                return READ_ERR;
1503            }
1504            break;
1505
1506        case BT_FEXT_HAVE_NONE:
1507            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1508            if( fext ) {
1509                tr_bitfieldSetHasNone( &msgs->peer->have );
1510                fireClientGotHaveNone( msgs );
1511                updatePeerProgress( msgs );
1512            } else {
1513                fireError( msgs, EMSGSIZE );
1514                return READ_ERR;
1515            }
1516            break;
1517
1518        case BT_FEXT_REJECT:
1519        {
1520            struct peer_request r;
1521            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1522            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1523            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1524            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1525            if( fext )
1526                fireGotRej( msgs, &r );
1527            else {
1528                fireError( msgs, EMSGSIZE );
1529                return READ_ERR;
1530            }
1531            break;
1532        }
1533
1534        case BT_LTEP:
1535            dbgmsg( msgs, "Got a BT_LTEP" );
1536            parseLtep( msgs, msglen, inbuf );
1537            break;
1538
1539        default:
1540            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1541            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1542            break;
1543    }
1544
1545    assert( msglen + 1 == msgs->incoming.length );
1546    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1547
1548    msgs->state = AWAITING_BT_LENGTH;
1549    return READ_NOW;
1550}
1551
1552/* returns 0 on success, or an errno on failure */
1553static int
1554clientGotBlock( tr_peermsgs                * msgs,
1555                struct evbuffer            * data,
1556                const struct peer_request  * req )
1557{
1558    int err;
1559    tr_torrent * tor = msgs->torrent;
1560    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1561
1562    assert( msgs );
1563    assert( req );
1564
1565    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1566        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1567                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1568        return EMSGSIZE;
1569    }
1570
1571    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1572
1573    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1574        dbgmsg( msgs, "we didn't ask for this message..." );
1575        return 0;
1576    }
1577    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1578        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1579        return 0;
1580    }
1581
1582    /**
1583    ***  Save the block
1584    **/
1585
1586    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1587        return err;
1588
1589    tr_bitfieldAdd( &msgs->peer->blame, req->index );
1590    fireGotBlock( msgs, req );
1591    return 0;
1592}
1593
1594static int peerPulse( void * vmsgs );
1595
1596static void
1597didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1598{
1599    tr_peermsgs * msgs = vmsgs;
1600    firePeerGotData( msgs, bytesWritten, wasPieceData );
1601
1602    if ( tr_isPeerIo( io ) && io->userData )
1603        peerPulse( msgs );
1604}
1605
1606static ReadState
1607canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1608{
1609    ReadState         ret;
1610    tr_peermsgs *     msgs = vmsgs;
1611    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1612    const size_t      inlen = evbuffer_get_length( in );
1613
1614    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1615
1616    if( !inlen )
1617    {
1618        ret = READ_LATER;
1619    }
1620    else if( msgs->state == AWAITING_BT_PIECE )
1621    {
1622        ret = readBtPiece( msgs, in, inlen, piece );
1623    }
1624    else switch( msgs->state )
1625    {
1626        case AWAITING_BT_LENGTH:
1627            ret = readBtLength ( msgs, in, inlen ); break;
1628
1629        case AWAITING_BT_ID:
1630            ret = readBtId     ( msgs, in, inlen ); break;
1631
1632        case AWAITING_BT_MESSAGE:
1633            ret = readBtMessage( msgs, in, inlen ); break;
1634
1635        default:
1636            ret = READ_ERR;
1637            assert( 0 );
1638    }
1639
1640    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1641
1642    /* log the raw data that was read */
1643    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1644        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), false );
1645
1646    return ret;
1647}
1648
1649int
1650tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1651{
1652    if( msgs->state != AWAITING_BT_PIECE )
1653        return false;
1654
1655    return block == _tr_block( msgs->torrent,
1656                               msgs->incoming.blockReq.index,
1657                               msgs->incoming.blockReq.offset );
1658}
1659
1660/**
1661***
1662**/
1663
1664static void
1665updateDesiredRequestCount( tr_peermsgs * msgs )
1666{
1667    const tr_torrent * const torrent = msgs->torrent;
1668
1669    /* there are lots of reasons we might not want to request any blocks... */
1670    if( tr_torrentIsSeed( torrent ) || !tr_torrentHasMetadata( torrent )
1671                                    || msgs->peer->clientIsChoked
1672                                    || !msgs->peer->clientIsInterested )
1673    {
1674        msgs->desiredRequestCount = 0;
1675    }
1676    else
1677    {
1678        int estimatedBlocksInPeriod;
1679        int rate_Bps;
1680        int irate_Bps;
1681        const int floor = 4;
1682        const int seconds = REQUEST_BUF_SECS;
1683        const uint64_t now = tr_time_msec( );
1684
1685        /* Get the rate limit we should use.
1686         * FIXME: this needs to consider all the other peers as well... */
1687        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1688        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1689            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1690
1691        /* honor the session limits, if enabled */
1692        if( tr_torrentUsesSessionLimits( torrent ) )
1693            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1694                rate_Bps = MIN( rate_Bps, irate_Bps );
1695
1696        /* use this desired rate to figure out how
1697         * many requests we should send to this peer */
1698        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1699        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1700
1701        /* honor the peer's maximum request count, if specified */
1702        if( msgs->reqq > 0 )
1703            if( msgs->desiredRequestCount > msgs->reqq )
1704                msgs->desiredRequestCount = msgs->reqq;
1705    }
1706}
1707
1708static void
1709updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1710{
1711    int piece;
1712
1713    if( msgs->peerSupportsMetadataXfer
1714        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1715    {
1716        tr_benc tmp;
1717        struct evbuffer * payload;
1718        struct evbuffer * out = msgs->outMessages;
1719
1720        /* build the data message */
1721        tr_bencInitDict( &tmp, 3 );
1722        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1723        tr_bencDictAddInt( &tmp, "piece", piece );
1724        payload = tr_bencToBuf( &tmp, TR_FMT_BENC );
1725
1726        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1727
1728        /* write it out as a LTEP message to our outMessages buffer */
1729        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
1730        evbuffer_add_uint8 ( out, BT_LTEP );
1731        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1732        evbuffer_add_buffer( out, payload );
1733        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1734        dbgOutMessageLen( msgs );
1735
1736        /* cleanup */
1737        evbuffer_free( payload );
1738        tr_bencFree( &tmp );
1739    }
1740}
1741
1742static void
1743updateBlockRequests( tr_peermsgs * msgs )
1744{
1745    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1746        && ( msgs->desiredRequestCount > 0 )
1747        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1748    {
1749        int i;
1750        int n;
1751        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1752        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1753
1754        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n, false );
1755
1756        for( i=0; i<n; ++i )
1757        {
1758            struct peer_request req;
1759            blockToReq( msgs->torrent, blocks[i], &req );
1760            protocolSendRequest( msgs, &req );
1761        }
1762
1763        tr_free( blocks );
1764    }
1765}
1766
1767static size_t
1768fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1769{
1770    int piece;
1771    size_t bytesWritten = 0;
1772    struct peer_request req;
1773    const bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1774    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1775
1776    /**
1777    ***  Protocol messages
1778    **/
1779
1780    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1781    {
1782        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1783        msgs->outMessagesBatchedAt = now;
1784    }
1785    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1786    {
1787        const size_t len = evbuffer_get_length( msgs->outMessages );
1788        /* flush the protocol messages */
1789        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1790        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, false );
1791        msgs->clientSentAnythingAt = now;
1792        msgs->outMessagesBatchedAt = 0;
1793        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1794        bytesWritten +=  len;
1795    }
1796
1797    /**
1798    ***  Metadata Pieces
1799    **/
1800
1801    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1802        && popNextMetadataRequest( msgs, &piece ) )
1803    {
1804        char * data;
1805        int dataLen;
1806        bool ok = false;
1807
1808        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1809        if( ( dataLen > 0 ) && ( data != NULL ) )
1810        {
1811            tr_benc tmp;
1812            struct evbuffer * payload;
1813            struct evbuffer * out = msgs->outMessages;
1814
1815            /* build the data message */
1816            tr_bencInitDict( &tmp, 3 );
1817            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1818            tr_bencDictAddInt( &tmp, "piece", piece );
1819            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1820            payload = tr_bencToBuf( &tmp, TR_FMT_BENC );
1821
1822            /* write it out as a LTEP message to our outMessages buffer */
1823            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) + dataLen );
1824            evbuffer_add_uint8 ( out, BT_LTEP );
1825            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1826            evbuffer_add_buffer( out, payload );
1827            evbuffer_add       ( out, data, dataLen );
1828            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1829            dbgOutMessageLen( msgs );
1830
1831            evbuffer_free( payload );
1832            tr_bencFree( &tmp );
1833            tr_free( data );
1834
1835            ok = true;
1836        }
1837
1838        if( !ok ) /* send a rejection message */
1839        {
1840            tr_benc tmp;
1841            struct evbuffer * payload;
1842            struct evbuffer * out = msgs->outMessages;
1843
1844            /* build the rejection message */
1845            tr_bencInitDict( &tmp, 2 );
1846            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1847            tr_bencDictAddInt( &tmp, "piece", piece );
1848            payload = tr_bencToBuf( &tmp, TR_FMT_BENC );
1849
1850            /* write it out as a LTEP message to our outMessages buffer */
1851            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
1852            evbuffer_add_uint8 ( out, BT_LTEP );
1853            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1854            evbuffer_add_buffer( out, payload );
1855            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1856            dbgOutMessageLen( msgs );
1857
1858            evbuffer_free( payload );
1859            tr_bencFree( &tmp );
1860        }
1861    }
1862
1863    /**
1864    ***  Data Blocks
1865    **/
1866
1867    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1868        && popNextRequest( msgs, &req ) )
1869    {
1870        --msgs->prefetchCount;
1871
1872        if( requestIsValid( msgs, &req )
1873            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1874        {
1875            int err;
1876            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1877            struct evbuffer * out;
1878            struct evbuffer_iovec iovec[1];
1879
1880            out = evbuffer_new( );
1881            evbuffer_expand( out, msglen );
1882
1883            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1884            evbuffer_add_uint8 ( out, BT_PIECE );
1885            evbuffer_add_uint32( out, req.index );
1886            evbuffer_add_uint32( out, req.offset );
1887
1888            evbuffer_reserve_space( out, req.length, iovec, 1 );
1889            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1890            iovec[0].iov_len = req.length;
1891            evbuffer_commit_space( out, iovec, 1 );
1892
1893            /* check the piece if it needs checking... */
1894            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1895                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1896                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1897
1898            if( err )
1899            {
1900                if( fext )
1901                    protocolSendReject( msgs, &req );
1902            }
1903            else
1904            {
1905                const size_t n = evbuffer_get_length( out );
1906                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1907                assert( n == msglen );
1908                tr_peerIoWriteBuf( msgs->peer->io, out, true );
1909                bytesWritten += n;
1910                msgs->clientSentAnythingAt = now;
1911                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1912            }
1913
1914            evbuffer_free( out );
1915
1916            if( err )
1917            {
1918                bytesWritten = 0;
1919                msgs = NULL;
1920            }
1921        }
1922        else if( fext ) /* peer needs a reject message */
1923        {
1924            protocolSendReject( msgs, &req );
1925        }
1926
1927        if( msgs != NULL )
1928            prefetchPieces( msgs );
1929    }
1930
1931    /**
1932    ***  Keepalive
1933    **/
1934
1935    if( ( msgs != NULL )
1936        && ( msgs->clientSentAnythingAt != 0 )
1937        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1938    {
1939        dbgmsg( msgs, "sending a keepalive message" );
1940        evbuffer_add_uint32( msgs->outMessages, 0 );
1941        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1942    }
1943
1944    return bytesWritten;
1945}
1946
1947static int
1948peerPulse( void * vmsgs )
1949{
1950    tr_peermsgs * msgs = vmsgs;
1951    const time_t  now = tr_time( );
1952
1953    if ( tr_isPeerIo( msgs->peer->io ) ) {
1954        updateDesiredRequestCount( msgs );
1955        updateBlockRequests( msgs );
1956        updateMetadataRequests( msgs, now );
1957    }
1958
1959    for( ;; )
1960        if( fillOutputBuffer( msgs, now ) < 1 )
1961            break;
1962
1963    return true; /* loop forever */
1964}
1965
1966void
1967tr_peerMsgsPulse( tr_peermsgs * msgs )
1968{
1969    if( msgs != NULL )
1970        peerPulse( msgs );
1971}
1972
1973static void
1974gotError( tr_peerIo * io UNUSED, short what, void * vmsgs )
1975{
1976    if( what & BEV_EVENT_TIMEOUT )
1977        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1978    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
1979        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1980               what, errno, tr_strerror( errno ) );
1981    fireError( vmsgs, ENOTCONN );
1982}
1983
1984static void
1985sendBitfield( tr_peermsgs * msgs )
1986{
1987    size_t byte_count = 0;
1988    struct evbuffer * out = msgs->outMessages;
1989    void * bytes = tr_cpCreatePieceBitfield( &msgs->torrent->completion, &byte_count );
1990
1991    evbuffer_add_uint32( out, sizeof( uint8_t ) + byte_count );
1992    evbuffer_add_uint8 ( out, BT_BITFIELD );
1993    evbuffer_add       ( out, bytes, byte_count );
1994    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
1995    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1996
1997    tr_free( bytes );
1998}
1999
2000static void
2001tellPeerWhatWeHave( tr_peermsgs * msgs )
2002{
2003    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2004
2005    if( fext && tr_cpHasAll( &msgs->torrent->completion ) )
2006    {
2007        protocolSendHaveAll( msgs );
2008    }
2009    else if( fext && tr_cpHasNone( &msgs->torrent->completion ) )
2010    {
2011        protocolSendHaveNone( msgs );
2012    }
2013    else if( !tr_cpHasNone( &msgs->torrent->completion ) )
2014    {
2015        sendBitfield( msgs );
2016    }
2017}
2018
2019/**
2020***
2021**/
2022
2023/* some peers give us error messages if we send
2024   more than this many peers in a single pex message
2025   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2026#define MAX_PEX_ADDED 50
2027#define MAX_PEX_DROPPED 50
2028
2029typedef struct
2030{
2031    tr_pex *  added;
2032    tr_pex *  dropped;
2033    tr_pex *  elements;
2034    int       addedCount;
2035    int       droppedCount;
2036    int       elementCount;
2037}
2038PexDiffs;
2039
2040static void
2041pexAddedCb( void * vpex, void * userData )
2042{
2043    PexDiffs * diffs = userData;
2044    tr_pex *   pex = vpex;
2045
2046    if( diffs->addedCount < MAX_PEX_ADDED )
2047    {
2048        diffs->added[diffs->addedCount++] = *pex;
2049        diffs->elements[diffs->elementCount++] = *pex;
2050    }
2051}
2052
2053static inline void
2054pexDroppedCb( void * vpex, void * userData )
2055{
2056    PexDiffs * diffs = userData;
2057    tr_pex *   pex = vpex;
2058
2059    if( diffs->droppedCount < MAX_PEX_DROPPED )
2060    {
2061        diffs->dropped[diffs->droppedCount++] = *pex;
2062    }
2063}
2064
2065static inline void
2066pexElementCb( void * vpex, void * userData )
2067{
2068    PexDiffs * diffs = userData;
2069    tr_pex * pex = vpex;
2070
2071    diffs->elements[diffs->elementCount++] = *pex;
2072}
2073
2074typedef void ( tr_set_func )( void * element, void * userData );
2075
2076/**
2077 * @brief find the differences and commonalities in two sorted sets
2078 * @param a the first set
2079 * @param aCount the number of elements in the set 'a'
2080 * @param b the second set
2081 * @param bCount the number of elements in the set 'b'
2082 * @param compare the sorting method for both sets
2083 * @param elementSize the sizeof the element in the two sorted sets
2084 * @param in_a called for items in set 'a' but not set 'b'
2085 * @param in_b called for items in set 'b' but not set 'a'
2086 * @param in_both called for items that are in both sets
2087 * @param userData user data passed along to in_a, in_b, and in_both
2088 */
2089static void
2090tr_set_compare( const void * va, size_t aCount,
2091                const void * vb, size_t bCount,
2092                int compare( const void * a, const void * b ),
2093                size_t elementSize,
2094                tr_set_func in_a_cb,
2095                tr_set_func in_b_cb,
2096                tr_set_func in_both_cb,
2097                void * userData )
2098{
2099    const uint8_t * a = va;
2100    const uint8_t * b = vb;
2101    const uint8_t * aend = a + elementSize * aCount;
2102    const uint8_t * bend = b + elementSize * bCount;
2103
2104    while( a != aend || b != bend )
2105    {
2106        if( a == aend )
2107        {
2108            ( *in_b_cb )( (void*)b, userData );
2109            b += elementSize;
2110        }
2111        else if( b == bend )
2112        {
2113            ( *in_a_cb )( (void*)a, userData );
2114            a += elementSize;
2115        }
2116        else
2117        {
2118            const int val = ( *compare )( a, b );
2119
2120            if( !val )
2121            {
2122                ( *in_both_cb )( (void*)a, userData );
2123                a += elementSize;
2124                b += elementSize;
2125            }
2126            else if( val < 0 )
2127            {
2128                ( *in_a_cb )( (void*)a, userData );
2129                a += elementSize;
2130            }
2131            else if( val > 0 )
2132            {
2133                ( *in_b_cb )( (void*)b, userData );
2134                b += elementSize;
2135            }
2136        }
2137    }
2138}
2139
2140
2141static void
2142sendPex( tr_peermsgs * msgs )
2143{
2144    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2145    {
2146        PexDiffs diffs;
2147        PexDiffs diffs6;
2148        tr_pex * newPex = NULL;
2149        tr_pex * newPex6 = NULL;
2150        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2151        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2152
2153        /* build the diffs */
2154        diffs.added = tr_new( tr_pex, newCount );
2155        diffs.addedCount = 0;
2156        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2157        diffs.droppedCount = 0;
2158        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2159        diffs.elementCount = 0;
2160        tr_set_compare( msgs->pex, msgs->pexCount,
2161                        newPex, newCount,
2162                        tr_pexCompare, sizeof( tr_pex ),
2163                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2164        diffs6.added = tr_new( tr_pex, newCount6 );
2165        diffs6.addedCount = 0;
2166        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2167        diffs6.droppedCount = 0;
2168        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2169        diffs6.elementCount = 0;
2170        tr_set_compare( msgs->pex6, msgs->pexCount6,
2171                        newPex6, newCount6,
2172                        tr_pexCompare, sizeof( tr_pex ),
2173                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2174        dbgmsg(
2175            msgs,
2176            "pex: old peer count %d+%d, new peer count %d+%d, "
2177            "added %d+%d, removed %d+%d",
2178            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2179            diffs.addedCount, diffs6.addedCount,
2180            diffs.droppedCount, diffs6.droppedCount );
2181
2182        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2183            !diffs6.droppedCount )
2184        {
2185            tr_free( diffs.elements );
2186            tr_free( diffs6.elements );
2187        }
2188        else
2189        {
2190            int  i;
2191            tr_benc val;
2192            uint8_t * tmp, *walk;
2193            struct evbuffer * payload;
2194            struct evbuffer * out = msgs->outMessages;
2195
2196            /* update peer */
2197            tr_free( msgs->pex );
2198            msgs->pex = diffs.elements;
2199            msgs->pexCount = diffs.elementCount;
2200            tr_free( msgs->pex6 );
2201            msgs->pex6 = diffs6.elements;
2202            msgs->pexCount6 = diffs6.elementCount;
2203
2204            /* build the pex payload */
2205            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2206                                         * speed vs. likelihood? */
2207
2208            if( diffs.addedCount > 0)
2209            {
2210                /* "added" */
2211                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2212                for( i = 0; i < diffs.addedCount; ++i ) {
2213                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2214                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2215                }
2216                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2217                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2218                tr_free( tmp );
2219
2220                /* "added.f"
2221                 * unset each holepunch flag because we don't support it. */
2222                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2223                for( i = 0; i < diffs.addedCount; ++i )
2224                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2225                assert( ( walk - tmp ) == diffs.addedCount );
2226                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2227                tr_free( tmp );
2228            }
2229
2230            if( diffs.droppedCount > 0 )
2231            {
2232                /* "dropped" */
2233                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2234                for( i = 0; i < diffs.droppedCount; ++i ) {
2235                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2236                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2237                }
2238                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2239                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2240                tr_free( tmp );
2241            }
2242
2243            if( diffs6.addedCount > 0 )
2244            {
2245                /* "added6" */
2246                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2247                for( i = 0; i < diffs6.addedCount; ++i ) {
2248                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2249                    walk += 16;
2250                    memcpy( walk, &diffs6.added[i].port, 2 );
2251                    walk += 2;
2252                }
2253                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2254                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2255                tr_free( tmp );
2256
2257                /* "added6.f"
2258                 * unset each holepunch flag because we don't support it. */
2259                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2260                for( i = 0; i < diffs6.addedCount; ++i )
2261                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2262                assert( ( walk - tmp ) == diffs6.addedCount );
2263                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2264                tr_free( tmp );
2265            }
2266
2267            if( diffs6.droppedCount > 0 )
2268            {
2269                /* "dropped6" */
2270                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2271                for( i = 0; i < diffs6.droppedCount; ++i ) {
2272                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2273                    walk += 16;
2274                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2275                    walk += 2;
2276                }
2277                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2278                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2279                tr_free( tmp );
2280            }
2281
2282            /* write the pex message */
2283            payload = tr_bencToBuf( &val, TR_FMT_BENC );
2284            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
2285            evbuffer_add_uint8 ( out, BT_LTEP );
2286            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2287            evbuffer_add_buffer( out, payload );
2288            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2289            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2290            dbgOutMessageLen( msgs );
2291
2292            evbuffer_free( payload );
2293            tr_bencFree( &val );
2294        }
2295
2296        /* cleanup */
2297        tr_free( diffs.added );
2298        tr_free( diffs.dropped );
2299        tr_free( newPex );
2300        tr_free( diffs6.added );
2301        tr_free( diffs6.dropped );
2302        tr_free( newPex6 );
2303
2304        /*msgs->clientSentPexAt = tr_time( );*/
2305    }
2306}
2307
2308static void
2309pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2310{
2311    struct tr_peermsgs * msgs = vmsgs;
2312
2313    sendPex( msgs );
2314
2315    assert( msgs->pexTimer != NULL );
2316    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2317}
2318
2319/**
2320***
2321**/
2322
2323tr_peermsgs*
2324tr_peerMsgsNew( struct tr_torrent    * torrent,
2325                struct tr_peer       * peer,
2326                tr_peer_callback     * callback,
2327                void                 * callbackData )
2328{
2329    tr_peermsgs * m;
2330
2331    assert( peer );
2332    assert( peer->io );
2333
2334    m = tr_new0( tr_peermsgs, 1 );
2335    m->callback = callback;
2336    m->callbackData = callbackData;
2337    m->peer = peer;
2338    m->torrent = torrent;
2339    m->peer->clientIsChoked = 1;
2340    m->peer->peerIsChoked = 1;
2341    m->peer->clientIsInterested = 0;
2342    m->peer->peerIsInterested = 0;
2343    m->state = AWAITING_BT_LENGTH;
2344    m->outMessages = evbuffer_new( );
2345    m->outMessagesBatchedAt = 0;
2346    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2347    peer->msgs = m;
2348
2349    if( tr_torrentAllowsPex( torrent ) ) {
2350        m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2351        tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2352    }
2353
2354    if( tr_peerIoSupportsUTP( peer->io ) ) {
2355        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2356        tr_peerMgrSetUtpSupported( torrent, addr );
2357        tr_peerMgrSetUtpFailed( torrent, addr, false );
2358    }
2359
2360    if( tr_peerIoSupportsLTEP( peer->io ) )
2361        sendLtepHandshake( m );
2362
2363    tellPeerWhatWeHave( m );
2364
2365    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2366    {
2367        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2368        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2369        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2370            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2371        }
2372    }
2373
2374    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2375    updateDesiredRequestCount( m );
2376
2377    return m;
2378}
2379
2380void
2381tr_peerMsgsFree( tr_peermsgs* msgs )
2382{
2383    if( msgs )
2384    {
2385        if( msgs->pexTimer != NULL )
2386            event_free( msgs->pexTimer );
2387
2388        if( msgs->incoming.block != NULL )
2389            evbuffer_free( msgs->incoming.block );
2390
2391        evbuffer_free( msgs->outMessages );
2392        tr_free( msgs->pex6 );
2393        tr_free( msgs->pex );
2394
2395        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2396        tr_free( msgs );
2397    }
2398}
Note: See TracBrowser for help on using the repository browser.