source: trunk/libtransmission/peer-msgs.c @ 12248

Last change on this file since 12248 was 12248, checked in by jordan, 11 years ago

(trunk libT) break the mac build and introduce new crashes.

This is partially to address #4145 "Downloads stuck at 100%" by refactoring the bitset, bitfield, and tr_completion; however, the ripple effect is larger than usual so things may get worse in the short term before getting better.

livings124: to fix the mac build, remove bitset.[ch] from xcode

  • Property svn:keywords set to Date Rev Author Id
File size: 71.3 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 12248 2011-03-28 16:31:05Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <event2/buffer.h>
20#include <event2/bufferevent.h>
21#include <event2/event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "cache.h"
26#include "completion.h"
27#include "crypto.h" /* tr_sha1() */
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-msgs.h"
31#include "session.h"
32#include "torrent.h"
33#include "torrent-magnet.h"
34#include "tr-dht.h"
35#include "utils.h"
36#include "version.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54
55    BT_FEXT_SUGGEST         = 13,
56    BT_FEXT_HAVE_ALL        = 14,
57    BT_FEXT_HAVE_NONE       = 15,
58    BT_FEXT_REJECT          = 16,
59    BT_FEXT_ALLOWED_FAST    = 17,
60
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    UT_PEX_ID               = 1,
66    UT_METADATA_ID          = 3,
67
68    MAX_PEX_PEER_COUNT      = 50,
69
70    MIN_CHOKE_PERIOD_SEC    = 10,
71
72    /* idle seconds before we send a keepalive */
73    KEEPALIVE_INTERVAL_SECS = 100,
74
75    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
76
77    REQQ                    = 512,
78
79    METADATA_REQQ           = 64,
80
81    /* used in lowering the outMessages queue period */
82    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
83    HIGH_PRIORITY_INTERVAL_SECS = 2,
84    LOW_PRIORITY_INTERVAL_SECS = 10,
85
86    /* number of pieces we'll allow in our fast set */
87    MAX_FAST_SET_SIZE = 3,
88
89    /* defined in BEP #9 */
90    METADATA_MSG_TYPE_REQUEST = 0,
91    METADATA_MSG_TYPE_DATA = 1,
92    METADATA_MSG_TYPE_REJECT = 2
93};
94
95enum
96{
97    AWAITING_BT_LENGTH,
98    AWAITING_BT_ID,
99    AWAITING_BT_MESSAGE,
100    AWAITING_BT_PIECE
101};
102
103/**
104***
105**/
106
107struct peer_request
108{
109    uint32_t    index;
110    uint32_t    offset;
111    uint32_t    length;
112};
113
114static void
115blockToReq( const tr_torrent     * tor,
116            tr_block_index_t       block,
117            struct peer_request  * setme )
118{
119    tr_torrentGetBlockLocation( tor, block, &setme->index,
120                                            &setme->offset,
121                                            &setme->length );
122}
123
124/**
125***
126**/
127
128/* this is raw, unchanged data from the peer regarding
129 * the current message that it's sending us. */
130struct tr_incoming
131{
132    uint8_t                id;
133    uint32_t               length; /* includes the +1 for id length */
134    struct peer_request    blockReq; /* metadata for incoming blocks */
135    struct evbuffer *      block; /* piece data for incoming blocks */
136};
137
138/**
139 * Low-level communication state information about a connected peer.
140 *
141 * This structure remembers the low-level protocol states that we're
142 * in with this peer, such as active requests, pex messages, and so on.
143 * Its fields are all private to peer-msgs.c.
144 *
145 * Data not directly involved with sending & receiving messages is
146 * stored in tr_peer, where it can be accessed by both peermsgs and
147 * the peer manager.
148 *
149 * @see struct peer_atom
150 * @see tr_peer
151 */
152struct tr_peermsgs
153{
154    bool            peerSupportsPex;
155    bool            peerSupportsMetadataXfer;
156    bool            clientSentLtepHandshake;
157    bool            peerSentLtepHandshake;
158
159    /*bool          haveFastSet;*/
160
161    int             desiredRequestCount;
162
163    int             prefetchCount;
164
165    /* how long the outMessages batch should be allowed to grow before
166     * it's flushed -- some messages (like requests >:) should be sent
167     * very quickly; others aren't as urgent. */
168    int8_t          outMessagesBatchPeriod;
169
170    uint8_t         state;
171    uint8_t         ut_pex_id;
172    uint8_t         ut_metadata_id;
173    uint16_t        pexCount;
174    uint16_t        pexCount6;
175
176    size_t          metadata_size_hint;
177#if 0
178    size_t                 fastsetSize;
179    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
180#endif
181
182    tr_peer *              peer;
183
184    tr_torrent *           torrent;
185
186    tr_peer_callback      * callback;
187    void                  * callbackData;
188
189    struct evbuffer *      outMessages; /* all the non-piece messages */
190
191    struct peer_request    peerAskedFor[REQQ];
192
193    int                    peerAskedForMetadata[METADATA_REQQ];
194    int                    peerAskedForMetadataCount;
195
196    tr_pex               * pex;
197    tr_pex               * pex6;
198
199    /*time_t                 clientSentPexAt;*/
200    time_t                 clientSentAnythingAt;
201
202    /* when we started batching the outMessages */
203    time_t                outMessagesBatchedAt;
204
205    struct tr_incoming    incoming;
206
207    /* if the peer supports the Extension Protocol in BEP 10 and
208       supplied a reqq argument, it's stored here. Otherwise, the
209       value is zero and should be ignored. */
210    int64_t               reqq;
211
212    struct event        * pexTimer;
213};
214
215/**
216***
217**/
218
219static inline tr_session*
220getSession( struct tr_peermsgs * msgs )
221{
222    return msgs->torrent->session;
223}
224
225/**
226***
227**/
228
229static void
230myDebug( const char * file, int line,
231         const struct tr_peermsgs * msgs,
232         const char * fmt, ... )
233{
234    FILE * fp = tr_getLog( );
235
236    if( fp )
237    {
238        va_list           args;
239        char              timestr[64];
240        struct evbuffer * buf = evbuffer_new( );
241        char *            base = tr_basename( file );
242
243        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
244                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
245                             tr_torrentName( msgs->torrent ),
246                             tr_peerIoGetAddrStr( msgs->peer->io ),
247                             msgs->peer->client );
248        va_start( args, fmt );
249        evbuffer_add_vprintf( buf, fmt, args );
250        va_end( args );
251        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
252        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
253
254        tr_free( base );
255        evbuffer_free( buf );
256    }
257}
258
259#define dbgmsg( msgs, ... ) \
260    do { \
261        if( tr_deepLoggingIsActive( ) ) \
262            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
263    } while( 0 )
264
265/**
266***
267**/
268
269static void
270pokeBatchPeriod( tr_peermsgs * msgs, int interval )
271{
272    if( msgs->outMessagesBatchPeriod > interval )
273    {
274        msgs->outMessagesBatchPeriod = interval;
275        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
276    }
277}
278
279static void
280dbgOutMessageLen( tr_peermsgs * msgs )
281{
282    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
283}
284
285static void
286protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
287{
288    struct evbuffer * out = msgs->outMessages;
289
290    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
291
292    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
293    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
294    evbuffer_add_uint32( out, req->index );
295    evbuffer_add_uint32( out, req->offset );
296    evbuffer_add_uint32( out, req->length );
297
298    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
299    dbgOutMessageLen( msgs );
300}
301
302static void
303protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
304{
305    struct evbuffer * out = msgs->outMessages;
306
307    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
308    evbuffer_add_uint8 ( out, BT_REQUEST );
309    evbuffer_add_uint32( out, req->index );
310    evbuffer_add_uint32( out, req->offset );
311    evbuffer_add_uint32( out, req->length );
312
313    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
314    dbgOutMessageLen( msgs );
315    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
316}
317
318static void
319protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
320{
321    struct evbuffer * out = msgs->outMessages;
322
323    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
324    evbuffer_add_uint8 ( out, BT_CANCEL );
325    evbuffer_add_uint32( out, req->index );
326    evbuffer_add_uint32( out, req->offset );
327    evbuffer_add_uint32( out, req->length );
328
329    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
330    dbgOutMessageLen( msgs );
331    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
332}
333
334static void
335protocolSendPort(tr_peermsgs *msgs, uint16_t port)
336{
337    struct evbuffer * out = msgs->outMessages;
338
339    dbgmsg( msgs, "sending Port %u", port);
340    evbuffer_add_uint32( out, 3 );
341    evbuffer_add_uint8 ( out, BT_PORT );
342    evbuffer_add_uint16( out, port);
343}
344
345static void
346protocolSendHave( tr_peermsgs * msgs, uint32_t index )
347{
348    struct evbuffer * out = msgs->outMessages;
349
350    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
351    evbuffer_add_uint8 ( out, BT_HAVE );
352    evbuffer_add_uint32( out, index );
353
354    dbgmsg( msgs, "sending Have %u", index );
355    dbgOutMessageLen( msgs );
356    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
357}
358
359#if 0
360static void
361protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
362{
363    tr_peerIo       * io  = msgs->peer->io;
364    struct evbuffer * out = msgs->outMessages;
365
366    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
367
368    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
369    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
370    evbuffer_add_uint32( io, out, pieceIndex );
371
372    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
373    dbgOutMessageLen( msgs );
374}
375#endif
376
377static void
378protocolSendChoke( tr_peermsgs * msgs, int choke )
379{
380    struct evbuffer * out = msgs->outMessages;
381
382    evbuffer_add_uint32( out, sizeof( uint8_t ) );
383    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
384
385    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
386    dbgOutMessageLen( msgs );
387    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
388}
389
390static void
391protocolSendHaveAll( tr_peermsgs * msgs )
392{
393    struct evbuffer * out = msgs->outMessages;
394
395    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
396
397    evbuffer_add_uint32( out, sizeof( uint8_t ) );
398    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
399
400    dbgmsg( msgs, "sending HAVE_ALL..." );
401    dbgOutMessageLen( msgs );
402    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
403}
404
405static void
406protocolSendHaveNone( tr_peermsgs * msgs )
407{
408    struct evbuffer * out = msgs->outMessages;
409
410    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
411
412    evbuffer_add_uint32( out, sizeof( uint8_t ) );
413    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
414
415    dbgmsg( msgs, "sending HAVE_NONE..." );
416    dbgOutMessageLen( msgs );
417    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
418}
419
420/**
421***  EVENTS
422**/
423
424static void
425publish( tr_peermsgs * msgs, tr_peer_event * e )
426{
427    assert( msgs->peer );
428    assert( msgs->peer->msgs == msgs );
429
430    if( msgs->callback != NULL )
431        msgs->callback( msgs->peer, e, msgs->callbackData );
432}
433
434static void
435fireError( tr_peermsgs * msgs, int err )
436{
437    tr_peer_event e = TR_PEER_EVENT_INIT;
438    e.eventType = TR_PEER_ERROR;
439    e.err = err;
440    publish( msgs, &e );
441}
442
443static void
444fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
445{
446    tr_peer_event e = TR_PEER_EVENT_INIT;
447    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
448    e.pieceIndex = req->index;
449    e.offset = req->offset;
450    e.length = req->length;
451    publish( msgs, &e );
452}
453
454static void
455fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
456{
457    tr_peer_event e = TR_PEER_EVENT_INIT;
458    e.eventType = TR_PEER_CLIENT_GOT_REJ;
459    e.pieceIndex = req->index;
460    e.offset = req->offset;
461    e.length = req->length;
462    publish( msgs, &e );
463}
464
465static void
466fireGotChoke( tr_peermsgs * msgs )
467{
468    tr_peer_event e = TR_PEER_EVENT_INIT;
469    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
470    publish( msgs, &e );
471}
472
473static void
474fireClientGotHaveAll( tr_peermsgs * msgs )
475{
476    tr_peer_event e = TR_PEER_EVENT_INIT;
477    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
478    publish( msgs, &e );
479}
480
481static void
482fireClientGotHaveNone( tr_peermsgs * msgs )
483{
484    tr_peer_event e = TR_PEER_EVENT_INIT;
485    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
486    publish( msgs, &e );
487}
488
489static void
490fireClientGotData( tr_peermsgs * msgs, uint32_t length, int wasPieceData )
491{
492    tr_peer_event e = TR_PEER_EVENT_INIT;
493
494    e.length = length;
495    e.eventType = TR_PEER_CLIENT_GOT_DATA;
496    e.wasPieceData = wasPieceData;
497    publish( msgs, &e );
498}
499
500static void
501fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
502{
503    tr_peer_event e = TR_PEER_EVENT_INIT;
504    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
505    e.pieceIndex = pieceIndex;
506    publish( msgs, &e );
507}
508
509static void
510fireClientGotPort( tr_peermsgs * msgs, tr_port port )
511{
512    tr_peer_event e = TR_PEER_EVENT_INIT;
513    e.eventType = TR_PEER_CLIENT_GOT_PORT;
514    e.port = port;
515    publish( msgs, &e );
516}
517
518static void
519fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
520{
521    tr_peer_event e = TR_PEER_EVENT_INIT;
522    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
523    e.pieceIndex = pieceIndex;
524    publish( msgs, &e );
525}
526
527static void
528fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
529{
530    tr_peer_event e = TR_PEER_EVENT_INIT;
531    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
532    e.bitfield = bitfield;
533    publish( msgs, &e );
534}
535
536static void
537fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
538{
539    tr_peer_event e = TR_PEER_EVENT_INIT;
540    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
541    e.pieceIndex = index;
542    publish( msgs, &e );
543}
544
545static void
546firePeerGotData( tr_peermsgs * msgs, uint32_t length, bool wasPieceData )
547{
548    tr_peer_event e = TR_PEER_EVENT_INIT;
549
550    e.length = length;
551    e.eventType = TR_PEER_PEER_GOT_DATA;
552    e.wasPieceData = wasPieceData;
553
554    publish( msgs, &e );
555}
556
557/**
558***  ALLOWED FAST SET
559***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
560**/
561
562#if 0
563size_t
564tr_generateAllowedSet( tr_piece_index_t * setmePieces,
565                       size_t             desiredSetSize,
566                       size_t             pieceCount,
567                       const uint8_t    * infohash,
568                       const tr_address * addr )
569{
570    size_t setSize = 0;
571
572    assert( setmePieces );
573    assert( desiredSetSize <= pieceCount );
574    assert( desiredSetSize );
575    assert( pieceCount );
576    assert( infohash );
577    assert( addr );
578
579    if( addr->type == TR_AF_INET )
580    {
581        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
582        uint8_t x[SHA_DIGEST_LENGTH];
583
584        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
585        memcpy( w, &ui32, sizeof( uint32_t ) );
586        walk += sizeof( uint32_t );
587        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
588        walk += SHA_DIGEST_LENGTH;
589        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
590        assert( sizeof( w ) == walk-w );
591
592        while( setSize<desiredSetSize )
593        {
594            int i;
595            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
596            {
597                size_t k;
598                uint32_t j = i * 4;                                  /* (5) */
599                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
600                uint32_t index = y % pieceCount;                     /* (7) */
601
602                for( k=0; k<setSize; ++k )                           /* (8) */
603                    if( setmePieces[k] == index )
604                        break;
605
606                if( k == setSize )
607                    setmePieces[setSize++] = index;                  /* (9) */
608            }
609
610            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
611        }
612    }
613
614    return setSize;
615}
616
617static void
618updateFastSet( tr_peermsgs * msgs UNUSED )
619{
620    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
621    const int peerIsNeedy = msgs->peer->progress < 0.10;
622
623    if( fext && peerIsNeedy && !msgs->haveFastSet )
624    {
625        size_t i;
626        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
627        const tr_info * inf = &msgs->torrent->info;
628        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
629
630        /* build the fast set */
631        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
632        msgs->haveFastSet = 1;
633
634        /* send it to the peer */
635        for( i=0; i<msgs->fastsetSize; ++i )
636            protocolSendAllowedFast( msgs, msgs->fastset[i] );
637    }
638}
639#endif
640
641/**
642***  INTEREST
643**/
644
645static void
646sendInterest( tr_peermsgs * msgs, bool clientIsInterested )
647{
648    struct evbuffer * out = msgs->outMessages;
649
650    assert( msgs );
651    assert( tr_isBool( clientIsInterested ) );
652
653    msgs->peer->clientIsInterested = clientIsInterested;
654    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
655    evbuffer_add_uint32( out, sizeof( uint8_t ) );
656    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
657
658    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
659    dbgOutMessageLen( msgs );
660}
661
662static void
663updateInterest( tr_peermsgs * msgs UNUSED )
664{
665    /* FIXME -- might need to poke the mgr on startup */
666}
667
668void
669tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
670{
671    assert( tr_isBool( isInterested ) );
672
673    if( isInterested != msgs->peer->clientIsInterested )
674        sendInterest( msgs, isInterested );
675}
676
677static bool
678popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
679{
680    if( msgs->peerAskedForMetadataCount == 0 )
681        return false;
682
683    *piece = msgs->peerAskedForMetadata[0];
684
685    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
686                               msgs->peerAskedForMetadataCount-- );
687
688    return true;
689}
690
691static bool
692popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
693{
694    if( msgs->peer->pendingReqsToClient == 0 )
695        return false;
696
697    *setme = msgs->peerAskedFor[0];
698
699    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
700                               msgs->peer->pendingReqsToClient-- );
701
702    return true;
703}
704
705static void
706cancelAllRequestsToClient( tr_peermsgs * msgs )
707{
708    struct peer_request req;
709    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
710
711    while( popNextRequest( msgs, &req ))
712        if( mustSendCancel )
713            protocolSendReject( msgs, &req );
714}
715
716void
717tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
718{
719    const time_t now = tr_time( );
720    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
721
722    assert( msgs );
723    assert( msgs->peer );
724    assert( choke == 0 || choke == 1 );
725
726    if( msgs->peer->chokeChangedAt > fibrillationTime )
727    {
728        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
729    }
730    else if( msgs->peer->peerIsChoked != choke )
731    {
732        msgs->peer->peerIsChoked = choke;
733        if( choke )
734            cancelAllRequestsToClient( msgs );
735        protocolSendChoke( msgs, choke );
736        msgs->peer->chokeChangedAt = now;
737    }
738}
739
740/**
741***
742**/
743
744void
745tr_peerMsgsHave( tr_peermsgs * msgs, uint32_t index )
746{
747    protocolSendHave( msgs, index );
748
749    /* since we have more pieces now, we might not be interested in this peer */
750    updateInterest( msgs );
751}
752
753/**
754***
755**/
756
757static bool
758reqIsValid( const tr_peermsgs * peer,
759            uint32_t            index,
760            uint32_t            offset,
761            uint32_t            length )
762{
763    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
764}
765
766static bool
767requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
768{
769    return reqIsValid( msgs, req->index, req->offset, req->length );
770}
771
772void
773tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
774{
775    struct peer_request req;
776/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
777    blockToReq( msgs->torrent, block, &req );
778    protocolSendCancel( msgs, &req );
779}
780
781/**
782***
783**/
784
785static void
786sendLtepHandshake( tr_peermsgs * msgs )
787{
788    tr_benc val, *m;
789    char * buf;
790    int len;
791    bool allow_pex;
792    bool allow_metadata_xfer;
793    struct evbuffer * out = msgs->outMessages;
794    const unsigned char * ipv6 = tr_globalIPv6();
795
796    if( msgs->clientSentLtepHandshake )
797        return;
798
799    dbgmsg( msgs, "sending an ltep handshake" );
800    msgs->clientSentLtepHandshake = 1;
801
802    /* decide if we want to advertise metadata xfer support (BEP 9) */
803    if( tr_torrentIsPrivate( msgs->torrent ) )
804        allow_metadata_xfer = 0;
805    else
806        allow_metadata_xfer = 1;
807
808    /* decide if we want to advertise pex support */
809    if( !tr_torrentAllowsPex( msgs->torrent ) )
810        allow_pex = 0;
811    else if( msgs->peerSentLtepHandshake )
812        allow_pex = msgs->peerSupportsPex ? 1 : 0;
813    else
814        allow_pex = 1;
815
816    tr_bencInitDict( &val, 8 );
817    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
818    if( ipv6 != NULL )
819        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
820    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
821                            && ( msgs->torrent->infoDictLength > 0 ) )
822        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
823    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
824    tr_bencDictAddInt( &val, "reqq", REQQ );
825    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
826    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
827    m  = tr_bencDictAddDict( &val, "m", 2 );
828    if( allow_metadata_xfer )
829        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
830    if( allow_pex )
831        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
832
833    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
834
835    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
836    evbuffer_add_uint8 ( out, BT_LTEP );
837    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
838    evbuffer_add       ( out, buf, len );
839    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
840    dbgOutMessageLen( msgs );
841
842    /* cleanup */
843    tr_bencFree( &val );
844    tr_free( buf );
845}
846
847static void
848parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
849{
850    int64_t   i;
851    tr_benc   val, * sub;
852    uint8_t * tmp = tr_new( uint8_t, len );
853    const uint8_t *addr;
854    size_t addr_len;
855    tr_pex pex;
856    int8_t seedProbability = -1;
857
858    memset( &pex, 0, sizeof( tr_pex ) );
859
860    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
861    msgs->peerSentLtepHandshake = 1;
862
863    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
864    {
865        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
866        tr_free( tmp );
867        return;
868    }
869
870    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
871
872    /* does the peer prefer encrypted connections? */
873    if( tr_bencDictFindInt( &val, "e", &i ) ) {
874        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
875                                              : ENCRYPTION_PREFERENCE_NO;
876        if( i )
877            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
878    }
879
880    /* check supported messages for utorrent pex */
881    msgs->peerSupportsPex = 0;
882    msgs->peerSupportsMetadataXfer = 0;
883
884    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
885        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
886            msgs->peerSupportsPex = i != 0;
887            msgs->ut_pex_id = (uint8_t) i;
888            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
889        }
890        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
891            msgs->peerSupportsMetadataXfer = i != 0;
892            msgs->ut_metadata_id = (uint8_t) i;
893            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
894        }
895        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
896            /* Mysterious µTorrent extension that we don't grok.  However,
897               it implies support for µTP, so use it to indicate that. */
898            tr_peerMgrSetUtpFailed( msgs->torrent,
899                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
900                                    false );
901        }
902    }
903
904    /* look for metainfo size (BEP 9) */
905    if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) {
906        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
907        msgs->metadata_size_hint = (size_t) i;
908    }
909
910    /* look for upload_only (BEP 21) */
911    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
912        seedProbability = i==0 ? 0 : 100;
913
914    /* get peer's listening port */
915    if( tr_bencDictFindInt( &val, "p", &i ) ) {
916        pex.port = htons( (uint16_t)i );
917        fireClientGotPort( msgs, pex.port );
918        dbgmsg( msgs, "peer's port is now %d", (int)i );
919    }
920
921    if( tr_peerIoIsIncoming( msgs->peer->io )
922        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
923        && ( addr_len == 4 ) )
924    {
925        pex.addr.type = TR_AF_INET;
926        memcpy( &pex.addr.addr.addr4, addr, 4 );
927        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
928    }
929
930    if( tr_peerIoIsIncoming( msgs->peer->io )
931        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
932        && ( addr_len == 16 ) )
933    {
934        pex.addr.type = TR_AF_INET6;
935        memcpy( &pex.addr.addr.addr6, addr, 16 );
936        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
937    }
938
939    /* get peer's maximum request queue size */
940    if( tr_bencDictFindInt( &val, "reqq", &i ) )
941        msgs->reqq = i;
942
943    tr_bencFree( &val );
944    tr_free( tmp );
945}
946
947static void
948parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
949{
950    tr_benc dict;
951    char * msg_end;
952    char * benc_end;
953    int64_t msg_type = -1;
954    int64_t piece = -1;
955    int64_t total_size = 0;
956    uint8_t * tmp = tr_new( uint8_t, msglen );
957
958    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
959    msg_end = (char*)tmp + msglen;
960
961    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
962    {
963        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
964        tr_bencDictFindInt( &dict, "piece", &piece );
965        tr_bencDictFindInt( &dict, "total_size", &total_size );
966        tr_bencFree( &dict );
967    }
968
969    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
970            (int)msg_type, (int)piece, (int)total_size );
971
972    if( msg_type == METADATA_MSG_TYPE_REJECT )
973    {
974        /* NOOP */
975    }
976
977    if( ( msg_type == METADATA_MSG_TYPE_DATA )
978        && ( !tr_torrentHasMetadata( msgs->torrent ) )
979        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
980        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
981    {
982        const int pieceLen = msg_end - benc_end;
983        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
984    }
985
986    if( msg_type == METADATA_MSG_TYPE_REQUEST )
987    {
988        if( ( piece >= 0 )
989            && tr_torrentHasMetadata( msgs->torrent )
990            && !tr_torrentIsPrivate( msgs->torrent )
991            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
992        {
993            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
994        }
995        else
996        {
997            tr_benc tmp;
998            int payloadLen;
999            char * payload;
1000            struct evbuffer * out = msgs->outMessages;
1001
1002            /* build the rejection message */
1003            tr_bencInitDict( &tmp, 2 );
1004            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1005            tr_bencDictAddInt( &tmp, "piece", piece );
1006            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1007            tr_bencFree( &tmp );
1008
1009            /* write it out as a LTEP message to our outMessages buffer */
1010            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1011            evbuffer_add_uint8 ( out, BT_LTEP );
1012            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1013            evbuffer_add       ( out, payload, payloadLen );
1014            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1015            dbgOutMessageLen( msgs );
1016
1017            tr_free( payload );
1018        }
1019    }
1020
1021    tr_free( tmp );
1022}
1023
1024static void
1025parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1026{
1027    int loaded = 0;
1028    uint8_t * tmp = tr_new( uint8_t, msglen );
1029    tr_benc val;
1030    tr_torrent * tor = msgs->torrent;
1031    const uint8_t * added;
1032    size_t added_len;
1033
1034    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1035
1036    if( tr_torrentAllowsPex( tor )
1037      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1038    {
1039        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1040        {
1041            tr_pex * pex;
1042            size_t i, n;
1043            size_t added_f_len = 0;
1044            const uint8_t * added_f = NULL;
1045
1046            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1047            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1048
1049            n = MIN( n, MAX_PEX_PEER_COUNT );
1050            for( i=0; i<n; ++i )
1051            {
1052                int seedProbability = -1;
1053                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1054                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1055            }
1056
1057            tr_free( pex );
1058        }
1059
1060        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1061        {
1062            tr_pex * pex;
1063            size_t i, n;
1064            size_t added_f_len = 0;
1065            const uint8_t * added_f = NULL;
1066
1067            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1068            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1069
1070            n = MIN( n, MAX_PEX_PEER_COUNT );
1071            for( i=0; i<n; ++i )
1072            {
1073                int seedProbability = -1;
1074                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1075                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1076            }
1077
1078            tr_free( pex );
1079        }
1080    }
1081
1082    if( loaded )
1083        tr_bencFree( &val );
1084    tr_free( tmp );
1085}
1086
1087static void sendPex( tr_peermsgs * msgs );
1088
1089static void
1090parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1091{
1092    uint8_t ltep_msgid;
1093
1094    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1095    msglen--;
1096
1097    if( ltep_msgid == LTEP_HANDSHAKE )
1098    {
1099        dbgmsg( msgs, "got ltep handshake" );
1100        parseLtepHandshake( msgs, msglen, inbuf );
1101        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1102        {
1103            sendLtepHandshake( msgs );
1104            sendPex( msgs );
1105        }
1106    }
1107    else if( ltep_msgid == UT_PEX_ID )
1108    {
1109        dbgmsg( msgs, "got ut pex" );
1110        msgs->peerSupportsPex = 1;
1111        parseUtPex( msgs, msglen, inbuf );
1112    }
1113    else if( ltep_msgid == UT_METADATA_ID )
1114    {
1115        dbgmsg( msgs, "got ut metadata" );
1116        msgs->peerSupportsMetadataXfer = 1;
1117        parseUtMetadata( msgs, msglen, inbuf );
1118    }
1119    else
1120    {
1121        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1122        evbuffer_drain( inbuf, msglen );
1123    }
1124}
1125
1126static int
1127readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1128{
1129    uint32_t len;
1130
1131    if( inlen < sizeof( len ) )
1132        return READ_LATER;
1133
1134    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1135
1136    if( len == 0 ) /* peer sent us a keepalive message */
1137        dbgmsg( msgs, "got KeepAlive" );
1138    else
1139    {
1140        msgs->incoming.length = len;
1141        msgs->state = AWAITING_BT_ID;
1142    }
1143
1144    return READ_NOW;
1145}
1146
1147static int readBtMessage( tr_peermsgs *, struct evbuffer *, size_t );
1148
1149static int
1150readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1151{
1152    uint8_t id;
1153
1154    if( inlen < sizeof( uint8_t ) )
1155        return READ_LATER;
1156
1157    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1158    msgs->incoming.id = id;
1159    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1160
1161    if( id == BT_PIECE )
1162    {
1163        msgs->state = AWAITING_BT_PIECE;
1164        return READ_NOW;
1165    }
1166    else if( msgs->incoming.length != 1 )
1167    {
1168        msgs->state = AWAITING_BT_MESSAGE;
1169        return READ_NOW;
1170    }
1171    else return readBtMessage( msgs, inbuf, inlen - 1 );
1172}
1173
1174static void
1175updatePeerProgress( tr_peermsgs * msgs )
1176{
1177    tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1178
1179    /*updateFastSet( msgs );*/
1180    updateInterest( msgs );
1181}
1182
1183static void
1184prefetchPieces( tr_peermsgs *msgs )
1185{
1186    int i;
1187
1188    if( !getSession(msgs)->isPrefetchEnabled )
1189        return;
1190
1191    /* Maintain 12 prefetched blocks per unchoked peer */
1192    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1193    {
1194        const struct peer_request * req = msgs->peerAskedFor + i;
1195        if( requestIsValid( msgs, req ) )
1196        {
1197            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1198            ++msgs->prefetchCount;
1199        }
1200    }
1201}
1202
1203static void
1204peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1205{
1206    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1207    const int reqIsValid = requestIsValid( msgs, req );
1208    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1209    const int peerIsChoked = msgs->peer->peerIsChoked;
1210
1211    int allow = false;
1212
1213    if( !reqIsValid )
1214        dbgmsg( msgs, "rejecting an invalid request." );
1215    else if( !clientHasPiece )
1216        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1217    else if( peerIsChoked )
1218        dbgmsg( msgs, "rejecting request from choked peer" );
1219    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1220        dbgmsg( msgs, "rejecting request ... reqq is full" );
1221    else
1222        allow = true;
1223
1224    if( allow ) {
1225        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1226        prefetchPieces( msgs );
1227    } else if( fext ) {
1228        protocolSendReject( msgs, req );
1229    }
1230}
1231
1232static bool
1233messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1234{
1235    switch( id )
1236    {
1237        case BT_CHOKE:
1238        case BT_UNCHOKE:
1239        case BT_INTERESTED:
1240        case BT_NOT_INTERESTED:
1241        case BT_FEXT_HAVE_ALL:
1242        case BT_FEXT_HAVE_NONE:
1243            return len == 1;
1244
1245        case BT_HAVE:
1246        case BT_FEXT_SUGGEST:
1247        case BT_FEXT_ALLOWED_FAST:
1248            return len == 5;
1249
1250        case BT_BITFIELD:
1251            if( tr_torrentHasMetadata( msg->torrent ) )
1252                return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1253            /* we don't know the piece count yet,
1254               so we can only guess whether to send true or false */
1255            if( msg->metadata_size_hint > 0 )
1256                return len <= msg->metadata_size_hint;
1257            return true;
1258
1259        case BT_REQUEST:
1260        case BT_CANCEL:
1261        case BT_FEXT_REJECT:
1262            return len == 13;
1263
1264        case BT_PIECE:
1265            return len > 9 && len <= 16393;
1266
1267        case BT_PORT:
1268            return len == 3;
1269
1270        case BT_LTEP:
1271            return len >= 2;
1272
1273        default:
1274            return false;
1275    }
1276}
1277
1278static int clientGotBlock( tr_peermsgs *               msgs,
1279                           struct evbuffer *           block,
1280                           const struct peer_request * req );
1281
1282static int
1283readBtPiece( tr_peermsgs      * msgs,
1284             struct evbuffer  * inbuf,
1285             size_t             inlen,
1286             size_t           * setme_piece_bytes_read )
1287{
1288    struct peer_request * req = &msgs->incoming.blockReq;
1289
1290    assert( evbuffer_get_length( inbuf ) >= inlen );
1291    dbgmsg( msgs, "In readBtPiece" );
1292
1293    if( !req->length )
1294    {
1295        if( inlen < 8 )
1296            return READ_LATER;
1297
1298        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1299        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1300        req->length = msgs->incoming.length - 9;
1301        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1302        return READ_NOW;
1303    }
1304    else
1305    {
1306        int err;
1307
1308        /* read in another chunk of data */
1309        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1310        size_t n = MIN( nLeft, inlen );
1311        size_t i = n;
1312        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1313        const size_t buflen = SESSION_BUFFER_SIZE;
1314
1315        while( i > 0 )
1316        {
1317            const size_t thisPass = MIN( i, buflen );
1318            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1319            evbuffer_add( msgs->incoming.block, buf, thisPass );
1320            i -= thisPass;
1321        }
1322
1323        tr_sessionReleaseBuffer( getSession( msgs ) );
1324        buf = NULL;
1325
1326        fireClientGotData( msgs, n, true );
1327        *setme_piece_bytes_read += n;
1328        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1329               n, req->index, req->offset, req->length,
1330               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1331        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1332            return READ_LATER;
1333
1334        /* we've got the whole block ... process it */
1335        err = clientGotBlock( msgs, msgs->incoming.block, req );
1336
1337        /* cleanup */
1338        evbuffer_free( msgs->incoming.block );
1339        msgs->incoming.block = evbuffer_new( );
1340        req->length = 0;
1341        msgs->state = AWAITING_BT_LENGTH;
1342        return err ? READ_ERR : READ_NOW;
1343    }
1344}
1345
1346static void updateDesiredRequestCount( tr_peermsgs * msgs );
1347
1348static int
1349readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1350{
1351    uint32_t      ui32;
1352    uint32_t      msglen = msgs->incoming.length;
1353    const uint8_t id = msgs->incoming.id;
1354#ifndef NDEBUG
1355    const size_t  startBufLen = evbuffer_get_length( inbuf );
1356#endif
1357    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1358
1359    --msglen; /* id length */
1360
1361    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1362
1363    if( inlen < msglen )
1364        return READ_LATER;
1365
1366    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1367    {
1368        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1369        fireError( msgs, EMSGSIZE );
1370        return READ_ERR;
1371    }
1372
1373    switch( id )
1374    {
1375        case BT_CHOKE:
1376            dbgmsg( msgs, "got Choke" );
1377            msgs->peer->clientIsChoked = 1;
1378            if( !fext )
1379                fireGotChoke( msgs );
1380            break;
1381
1382        case BT_UNCHOKE:
1383            dbgmsg( msgs, "got Unchoke" );
1384            msgs->peer->clientIsChoked = 0;
1385            updateDesiredRequestCount( msgs );
1386            break;
1387
1388        case BT_INTERESTED:
1389            dbgmsg( msgs, "got Interested" );
1390            msgs->peer->peerIsInterested = 1;
1391            break;
1392
1393        case BT_NOT_INTERESTED:
1394            dbgmsg( msgs, "got Not Interested" );
1395            msgs->peer->peerIsInterested = 0;
1396            break;
1397
1398        case BT_HAVE:
1399            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1400            dbgmsg( msgs, "got Have: %u", ui32 );
1401            if( tr_torrentHasMetadata( msgs->torrent )
1402                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1403            {
1404                fireError( msgs, ERANGE );
1405                return READ_ERR;
1406            }
1407
1408            /* a peer can send the same HAVE message twice... */
1409            if( !tr_bitfieldHas( &msgs->peer->have, ui32 ) ) {
1410                tr_bitfieldAdd( &msgs->peer->have, ui32 );
1411                fireClientGotHave( msgs, ui32 );
1412            }
1413            updatePeerProgress( msgs );
1414            break;
1415
1416        case BT_BITFIELD: {
1417            uint8_t * tmp = tr_new( uint8_t, msglen );
1418            dbgmsg( msgs, "got a bitfield" );
1419            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1420            tr_bitfieldSetRaw( &msgs->peer->have, tmp, msglen );
1421            fireClientGotBitfield( msgs, &msgs->peer->have );
1422            updatePeerProgress( msgs );
1423            tr_free( tmp );
1424            break;
1425        }
1426
1427        case BT_REQUEST:
1428        {
1429            struct peer_request r;
1430            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1431            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1432            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1433            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1434            peerMadeRequest( msgs, &r );
1435            break;
1436        }
1437
1438        case BT_CANCEL:
1439        {
1440            int i;
1441            struct peer_request r;
1442            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1443            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1444            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1445            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1446            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1447
1448            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1449                const struct peer_request * req = msgs->peerAskedFor + i;
1450                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1451                    break;
1452            }
1453
1454            if( i < msgs->peer->pendingReqsToClient )
1455                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1456                                           msgs->peer->pendingReqsToClient-- );
1457            break;
1458        }
1459
1460        case BT_PIECE:
1461            assert( 0 ); /* handled elsewhere! */
1462            break;
1463
1464        case BT_PORT:
1465            dbgmsg( msgs, "Got a BT_PORT" );
1466            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1467            if( msgs->peer->dht_port > 0 )
1468                tr_dhtAddNode( getSession(msgs),
1469                               tr_peerAddress( msgs->peer ),
1470                               msgs->peer->dht_port, 0 );
1471            break;
1472
1473        case BT_FEXT_SUGGEST:
1474            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1475            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1476            if( fext )
1477                fireClientGotSuggest( msgs, ui32 );
1478            else {
1479                fireError( msgs, EMSGSIZE );
1480                return READ_ERR;
1481            }
1482            break;
1483
1484        case BT_FEXT_ALLOWED_FAST:
1485            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1486            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1487            if( fext )
1488                fireClientGotAllowedFast( msgs, ui32 );
1489            else {
1490                fireError( msgs, EMSGSIZE );
1491                return READ_ERR;
1492            }
1493            break;
1494
1495        case BT_FEXT_HAVE_ALL:
1496            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1497            if( fext ) {
1498                tr_bitfieldSetHasAll( &msgs->peer->have );
1499assert( tr_bitfieldHasAll( &msgs->peer->have ) );
1500                fireClientGotHaveAll( msgs );
1501                updatePeerProgress( msgs );
1502            } else {
1503                fireError( msgs, EMSGSIZE );
1504                return READ_ERR;
1505            }
1506            break;
1507
1508        case BT_FEXT_HAVE_NONE:
1509            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1510            if( fext ) {
1511                tr_bitfieldSetHasNone( &msgs->peer->have );
1512                fireClientGotHaveNone( msgs );
1513                updatePeerProgress( msgs );
1514            } else {
1515                fireError( msgs, EMSGSIZE );
1516                return READ_ERR;
1517            }
1518            break;
1519
1520        case BT_FEXT_REJECT:
1521        {
1522            struct peer_request r;
1523            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1524            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1525            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1526            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1527            if( fext )
1528                fireGotRej( msgs, &r );
1529            else {
1530                fireError( msgs, EMSGSIZE );
1531                return READ_ERR;
1532            }
1533            break;
1534        }
1535
1536        case BT_LTEP:
1537            dbgmsg( msgs, "Got a BT_LTEP" );
1538            parseLtep( msgs, msglen, inbuf );
1539            break;
1540
1541        default:
1542            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1543            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1544            break;
1545    }
1546
1547    assert( msglen + 1 == msgs->incoming.length );
1548    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1549
1550    msgs->state = AWAITING_BT_LENGTH;
1551    return READ_NOW;
1552}
1553
1554/* returns 0 on success, or an errno on failure */
1555static int
1556clientGotBlock( tr_peermsgs                * msgs,
1557                struct evbuffer            * data,
1558                const struct peer_request  * req )
1559{
1560    int err;
1561    tr_torrent * tor = msgs->torrent;
1562    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1563
1564    assert( msgs );
1565    assert( req );
1566
1567    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1568        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1569                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1570        return EMSGSIZE;
1571    }
1572
1573    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1574
1575    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1576        dbgmsg( msgs, "we didn't ask for this message..." );
1577        return 0;
1578    }
1579    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1580        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1581        return 0;
1582    }
1583
1584    /**
1585    ***  Save the block
1586    **/
1587
1588    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1589        return err;
1590
1591    tr_bitfieldAdd( &msgs->peer->blame, req->index );
1592    fireGotBlock( msgs, req );
1593    return 0;
1594}
1595
1596static int peerPulse( void * vmsgs );
1597
1598static void
1599didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1600{
1601    tr_peermsgs * msgs = vmsgs;
1602    firePeerGotData( msgs, bytesWritten, wasPieceData );
1603
1604    if ( tr_isPeerIo( io ) && io->userData )
1605        peerPulse( msgs );
1606}
1607
1608static ReadState
1609canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1610{
1611    ReadState         ret;
1612    tr_peermsgs *     msgs = vmsgs;
1613    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1614    const size_t      inlen = evbuffer_get_length( in );
1615
1616    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1617
1618    if( !inlen )
1619    {
1620        ret = READ_LATER;
1621    }
1622    else if( msgs->state == AWAITING_BT_PIECE )
1623    {
1624        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1625    }
1626    else switch( msgs->state )
1627    {
1628        case AWAITING_BT_LENGTH:
1629            ret = readBtLength ( msgs, in, inlen ); break;
1630
1631        case AWAITING_BT_ID:
1632            ret = readBtId     ( msgs, in, inlen ); break;
1633
1634        case AWAITING_BT_MESSAGE:
1635            ret = readBtMessage( msgs, in, inlen ); break;
1636
1637        default:
1638            ret = READ_ERR;
1639            assert( 0 );
1640    }
1641
1642    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1643
1644    /* log the raw data that was read */
1645    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1646        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), false );
1647
1648    return ret;
1649}
1650
1651int
1652tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1653{
1654    if( msgs->state != AWAITING_BT_PIECE )
1655        return false;
1656
1657    return block == _tr_block( msgs->torrent,
1658                               msgs->incoming.blockReq.index,
1659                               msgs->incoming.blockReq.offset );
1660}
1661
1662/**
1663***
1664**/
1665
1666static void
1667updateDesiredRequestCount( tr_peermsgs * msgs )
1668{
1669    const tr_torrent * const torrent = msgs->torrent;
1670
1671    if( tr_torrentIsSeed( msgs->torrent ) )
1672    {
1673        msgs->desiredRequestCount = 0;
1674    }
1675    else if( msgs->peer->clientIsChoked )
1676    {
1677        msgs->desiredRequestCount = 0;
1678    }
1679    else if( !msgs->peer->clientIsInterested )
1680    {
1681        msgs->desiredRequestCount = 0;
1682    }
1683    else
1684    {
1685        int estimatedBlocksInPeriod;
1686        int rate_Bps;
1687        int irate_Bps;
1688        const int floor = 4;
1689        const int seconds = REQUEST_BUF_SECS;
1690        const uint64_t now = tr_time_msec( );
1691
1692        /* Get the rate limit we should use.
1693         * FIXME: this needs to consider all the other peers as well... */
1694        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1695        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1696            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1697
1698        /* honor the session limits, if enabled */
1699        if( tr_torrentUsesSessionLimits( torrent ) )
1700            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1701                rate_Bps = MIN( rate_Bps, irate_Bps );
1702
1703        /* use this desired rate to figure out how
1704         * many requests we should send to this peer */
1705        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1706        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1707
1708        /* honor the peer's maximum request count, if specified */
1709        if( msgs->reqq > 0 )
1710            if( msgs->desiredRequestCount > msgs->reqq )
1711                msgs->desiredRequestCount = msgs->reqq;
1712    }
1713}
1714
1715static void
1716updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1717{
1718    int piece;
1719
1720    if( msgs->peerSupportsMetadataXfer
1721        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1722    {
1723        tr_benc tmp;
1724        int payloadLen;
1725        char * payload;
1726        struct evbuffer * out = msgs->outMessages;
1727
1728        /* build the data message */
1729        tr_bencInitDict( &tmp, 3 );
1730        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1731        tr_bencDictAddInt( &tmp, "piece", piece );
1732        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1733        tr_bencFree( &tmp );
1734
1735        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1736
1737        /* write it out as a LTEP message to our outMessages buffer */
1738        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1739        evbuffer_add_uint8 ( out, BT_LTEP );
1740        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1741        evbuffer_add       ( out, payload, payloadLen );
1742        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1743        dbgOutMessageLen( msgs );
1744
1745        tr_free( payload );
1746    }
1747}
1748
1749static void
1750updateBlockRequests( tr_peermsgs * msgs )
1751{
1752    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1753        && ( msgs->desiredRequestCount > 0 )
1754        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1755    {
1756        int i;
1757        int n;
1758        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1759        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1760
1761        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1762
1763        for( i=0; i<n; ++i )
1764        {
1765            struct peer_request req;
1766            blockToReq( msgs->torrent, blocks[i], &req );
1767            protocolSendRequest( msgs, &req );
1768        }
1769
1770        tr_free( blocks );
1771    }
1772}
1773
1774static size_t
1775fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1776{
1777    int piece;
1778    size_t bytesWritten = 0;
1779    struct peer_request req;
1780    const bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1781    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1782
1783    /**
1784    ***  Protocol messages
1785    **/
1786
1787    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1788    {
1789        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1790        msgs->outMessagesBatchedAt = now;
1791    }
1792    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1793    {
1794        const size_t len = evbuffer_get_length( msgs->outMessages );
1795        /* flush the protocol messages */
1796        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1797        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, false );
1798        msgs->clientSentAnythingAt = now;
1799        msgs->outMessagesBatchedAt = 0;
1800        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1801        bytesWritten +=  len;
1802    }
1803
1804    /**
1805    ***  Metadata Pieces
1806    **/
1807
1808    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1809        && popNextMetadataRequest( msgs, &piece ) )
1810    {
1811        char * data;
1812        int dataLen;
1813        bool ok = false;
1814
1815        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1816        if( ( dataLen > 0 ) && ( data != NULL ) )
1817        {
1818            tr_benc tmp;
1819            int payloadLen;
1820            char * payload;
1821            struct evbuffer * out = msgs->outMessages;
1822
1823            /* build the data message */
1824            tr_bencInitDict( &tmp, 3 );
1825            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1826            tr_bencDictAddInt( &tmp, "piece", piece );
1827            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1828            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1829            tr_bencFree( &tmp );
1830
1831            /* write it out as a LTEP message to our outMessages buffer */
1832            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1833            evbuffer_add_uint8 ( out, BT_LTEP );
1834            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1835            evbuffer_add       ( out, payload, payloadLen );
1836            evbuffer_add       ( out, data, dataLen );
1837            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1838            dbgOutMessageLen( msgs );
1839
1840            tr_free( payload );
1841            tr_free( data );
1842
1843            ok = true;
1844        }
1845
1846        if( !ok ) /* send a rejection message */
1847        {
1848            tr_benc tmp;
1849            int payloadLen;
1850            char * payload;
1851            struct evbuffer * out = msgs->outMessages;
1852
1853            /* build the rejection message */
1854            tr_bencInitDict( &tmp, 2 );
1855            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1856            tr_bencDictAddInt( &tmp, "piece", piece );
1857            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1858            tr_bencFree( &tmp );
1859
1860            /* write it out as a LTEP message to our outMessages buffer */
1861            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1862            evbuffer_add_uint8 ( out, BT_LTEP );
1863            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1864            evbuffer_add       ( out, payload, payloadLen );
1865            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1866            dbgOutMessageLen( msgs );
1867
1868            tr_free( payload );
1869        }
1870    }
1871
1872    /**
1873    ***  Data Blocks
1874    **/
1875
1876    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1877        && popNextRequest( msgs, &req ) )
1878    {
1879        --msgs->prefetchCount;
1880
1881        if( requestIsValid( msgs, &req )
1882            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1883        {
1884            int err;
1885            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1886            struct evbuffer * out;
1887            struct evbuffer_iovec iovec[1];
1888
1889            out = evbuffer_new( );
1890            evbuffer_expand( out, msglen );
1891
1892            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1893            evbuffer_add_uint8 ( out, BT_PIECE );
1894            evbuffer_add_uint32( out, req.index );
1895            evbuffer_add_uint32( out, req.offset );
1896
1897            evbuffer_reserve_space( out, req.length, iovec, 1 );
1898            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1899            iovec[0].iov_len = req.length;
1900            evbuffer_commit_space( out, iovec, 1 );
1901
1902            /* check the piece if it needs checking... */
1903            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1904                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1905                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1906
1907            if( err )
1908            {
1909                if( fext )
1910                    protocolSendReject( msgs, &req );
1911            }
1912            else
1913            {
1914                const size_t n = evbuffer_get_length( out );
1915                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1916                assert( n == msglen );
1917                tr_peerIoWriteBuf( msgs->peer->io, out, true );
1918                bytesWritten += n;
1919                msgs->clientSentAnythingAt = now;
1920                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1921            }
1922
1923            evbuffer_free( out );
1924
1925            if( err )
1926            {
1927                bytesWritten = 0;
1928                msgs = NULL;
1929            }
1930        }
1931        else if( fext ) /* peer needs a reject message */
1932        {
1933            protocolSendReject( msgs, &req );
1934        }
1935
1936        if( msgs != NULL )
1937            prefetchPieces( msgs );
1938    }
1939
1940    /**
1941    ***  Keepalive
1942    **/
1943
1944    if( ( msgs != NULL )
1945        && ( msgs->clientSentAnythingAt != 0 )
1946        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1947    {
1948        dbgmsg( msgs, "sending a keepalive message" );
1949        evbuffer_add_uint32( msgs->outMessages, 0 );
1950        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1951    }
1952
1953    return bytesWritten;
1954}
1955
1956static int
1957peerPulse( void * vmsgs )
1958{
1959    tr_peermsgs * msgs = vmsgs;
1960    const time_t  now = tr_time( );
1961
1962    if ( tr_isPeerIo( msgs->peer->io ) ) {
1963        updateDesiredRequestCount( msgs );
1964        updateBlockRequests( msgs );
1965        updateMetadataRequests( msgs, now );
1966    }
1967
1968    for( ;; )
1969        if( fillOutputBuffer( msgs, now ) < 1 )
1970            break;
1971
1972    return true; /* loop forever */
1973}
1974
1975void
1976tr_peerMsgsPulse( tr_peermsgs * msgs )
1977{
1978    if( msgs != NULL )
1979        peerPulse( msgs );
1980}
1981
1982static void
1983gotError( tr_peerIo * io UNUSED, short what, void * vmsgs )
1984{
1985    if( what & BEV_EVENT_TIMEOUT )
1986        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1987    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
1988        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1989               what, errno, tr_strerror( errno ) );
1990    fireError( vmsgs, ENOTCONN );
1991}
1992
1993static void
1994sendBitfield( tr_peermsgs * msgs )
1995{
1996    size_t byte_count = 0;
1997    struct evbuffer * out = msgs->outMessages;
1998    void * bytes = tr_cpCreatePieceBitfield( &msgs->torrent->completion, &byte_count );
1999
2000    evbuffer_add_uint32( out, sizeof( uint8_t ) + byte_count );
2001    evbuffer_add_uint8 ( out, BT_BITFIELD );
2002    evbuffer_add       ( out, bytes, byte_count );
2003    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
2004    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2005
2006    tr_free( bytes );
2007}
2008
2009static void
2010tellPeerWhatWeHave( tr_peermsgs * msgs )
2011{
2012    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2013
2014    if( fext && tr_cpHasAll( &msgs->torrent->completion ) )
2015    {
2016        protocolSendHaveAll( msgs );
2017    }
2018    else if( fext && tr_cpHasNone( &msgs->torrent->completion ) )
2019    {
2020        protocolSendHaveNone( msgs );
2021    }
2022    else if( !tr_cpHasNone( &msgs->torrent->completion ) )
2023    {
2024        sendBitfield( msgs );
2025    }
2026}
2027
2028/**
2029***
2030**/
2031
2032/* some peers give us error messages if we send
2033   more than this many peers in a single pex message
2034   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2035#define MAX_PEX_ADDED 50
2036#define MAX_PEX_DROPPED 50
2037
2038typedef struct
2039{
2040    tr_pex *  added;
2041    tr_pex *  dropped;
2042    tr_pex *  elements;
2043    int       addedCount;
2044    int       droppedCount;
2045    int       elementCount;
2046}
2047PexDiffs;
2048
2049static void
2050pexAddedCb( void * vpex, void * userData )
2051{
2052    PexDiffs * diffs = userData;
2053    tr_pex *   pex = vpex;
2054
2055    if( diffs->addedCount < MAX_PEX_ADDED )
2056    {
2057        diffs->added[diffs->addedCount++] = *pex;
2058        diffs->elements[diffs->elementCount++] = *pex;
2059    }
2060}
2061
2062static inline void
2063pexDroppedCb( void * vpex, void * userData )
2064{
2065    PexDiffs * diffs = userData;
2066    tr_pex *   pex = vpex;
2067
2068    if( diffs->droppedCount < MAX_PEX_DROPPED )
2069    {
2070        diffs->dropped[diffs->droppedCount++] = *pex;
2071    }
2072}
2073
2074static inline void
2075pexElementCb( void * vpex, void * userData )
2076{
2077    PexDiffs * diffs = userData;
2078    tr_pex * pex = vpex;
2079
2080    diffs->elements[diffs->elementCount++] = *pex;
2081}
2082
2083typedef void ( tr_set_func )( void * element, void * userData );
2084
2085/**
2086 * @brief find the differences and commonalities in two sorted sets
2087 * @param a the first set
2088 * @param aCount the number of elements in the set 'a'
2089 * @param b the second set
2090 * @param bCount the number of elements in the set 'b'
2091 * @param compare the sorting method for both sets
2092 * @param elementSize the sizeof the element in the two sorted sets
2093 * @param in_a called for items in set 'a' but not set 'b'
2094 * @param in_b called for items in set 'b' but not set 'a'
2095 * @param in_both called for items that are in both sets
2096 * @param userData user data passed along to in_a, in_b, and in_both
2097 */
2098static void
2099tr_set_compare( const void * va, size_t aCount,
2100                const void * vb, size_t bCount,
2101                int compare( const void * a, const void * b ),
2102                size_t elementSize,
2103                tr_set_func in_a_cb,
2104                tr_set_func in_b_cb,
2105                tr_set_func in_both_cb,
2106                void * userData )
2107{
2108    const uint8_t * a = va;
2109    const uint8_t * b = vb;
2110    const uint8_t * aend = a + elementSize * aCount;
2111    const uint8_t * bend = b + elementSize * bCount;
2112
2113    while( a != aend || b != bend )
2114    {
2115        if( a == aend )
2116        {
2117            ( *in_b_cb )( (void*)b, userData );
2118            b += elementSize;
2119        }
2120        else if( b == bend )
2121        {
2122            ( *in_a_cb )( (void*)a, userData );
2123            a += elementSize;
2124        }
2125        else
2126        {
2127            const int val = ( *compare )( a, b );
2128
2129            if( !val )
2130            {
2131                ( *in_both_cb )( (void*)a, userData );
2132                a += elementSize;
2133                b += elementSize;
2134            }
2135            else if( val < 0 )
2136            {
2137                ( *in_a_cb )( (void*)a, userData );
2138                a += elementSize;
2139            }
2140            else if( val > 0 )
2141            {
2142                ( *in_b_cb )( (void*)b, userData );
2143                b += elementSize;
2144            }
2145        }
2146    }
2147}
2148
2149
2150static void
2151sendPex( tr_peermsgs * msgs )
2152{
2153    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2154    {
2155        PexDiffs diffs;
2156        PexDiffs diffs6;
2157        tr_pex * newPex = NULL;
2158        tr_pex * newPex6 = NULL;
2159        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2160        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2161
2162        /* build the diffs */
2163        diffs.added = tr_new( tr_pex, newCount );
2164        diffs.addedCount = 0;
2165        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2166        diffs.droppedCount = 0;
2167        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2168        diffs.elementCount = 0;
2169        tr_set_compare( msgs->pex, msgs->pexCount,
2170                        newPex, newCount,
2171                        tr_pexCompare, sizeof( tr_pex ),
2172                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2173        diffs6.added = tr_new( tr_pex, newCount6 );
2174        diffs6.addedCount = 0;
2175        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2176        diffs6.droppedCount = 0;
2177        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2178        diffs6.elementCount = 0;
2179        tr_set_compare( msgs->pex6, msgs->pexCount6,
2180                        newPex6, newCount6,
2181                        tr_pexCompare, sizeof( tr_pex ),
2182                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2183        dbgmsg(
2184            msgs,
2185            "pex: old peer count %d+%d, new peer count %d+%d, "
2186            "added %d+%d, removed %d+%d",
2187            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2188            diffs.addedCount, diffs6.addedCount,
2189            diffs.droppedCount, diffs6.droppedCount );
2190
2191        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2192            !diffs6.droppedCount )
2193        {
2194            tr_free( diffs.elements );
2195            tr_free( diffs6.elements );
2196        }
2197        else
2198        {
2199            int  i;
2200            tr_benc val;
2201            char * benc;
2202            int bencLen;
2203            uint8_t * tmp, *walk;
2204            struct evbuffer * out = msgs->outMessages;
2205
2206            /* update peer */
2207            tr_free( msgs->pex );
2208            msgs->pex = diffs.elements;
2209            msgs->pexCount = diffs.elementCount;
2210            tr_free( msgs->pex6 );
2211            msgs->pex6 = diffs6.elements;
2212            msgs->pexCount6 = diffs6.elementCount;
2213
2214            /* build the pex payload */
2215            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2216                                         * speed vs. likelihood? */
2217
2218            if( diffs.addedCount > 0)
2219            {
2220                /* "added" */
2221                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2222                for( i = 0; i < diffs.addedCount; ++i ) {
2223                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2224                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2225                }
2226                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2227                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2228                tr_free( tmp );
2229
2230                /* "added.f"
2231                 * unset each holepunch flag because we don't support it. */
2232                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2233                for( i = 0; i < diffs.addedCount; ++i )
2234                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2235                assert( ( walk - tmp ) == diffs.addedCount );
2236                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2237                tr_free( tmp );
2238            }
2239
2240            if( diffs.droppedCount > 0 )
2241            {
2242                /* "dropped" */
2243                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2244                for( i = 0; i < diffs.droppedCount; ++i ) {
2245                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2246                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2247                }
2248                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2249                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2250                tr_free( tmp );
2251            }
2252
2253            if( diffs6.addedCount > 0 )
2254            {
2255                /* "added6" */
2256                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2257                for( i = 0; i < diffs6.addedCount; ++i ) {
2258                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2259                    walk += 16;
2260                    memcpy( walk, &diffs6.added[i].port, 2 );
2261                    walk += 2;
2262                }
2263                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2264                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2265                tr_free( tmp );
2266
2267                /* "added6.f"
2268                 * unset each holepunch flag because we don't support it. */
2269                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2270                for( i = 0; i < diffs6.addedCount; ++i )
2271                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2272                assert( ( walk - tmp ) == diffs6.addedCount );
2273                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2274                tr_free( tmp );
2275            }
2276
2277            if( diffs6.droppedCount > 0 )
2278            {
2279                /* "dropped6" */
2280                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2281                for( i = 0; i < diffs6.droppedCount; ++i ) {
2282                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2283                    walk += 16;
2284                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2285                    walk += 2;
2286                }
2287                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2288                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2289                tr_free( tmp );
2290            }
2291
2292            /* write the pex message */
2293            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2294            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2295            evbuffer_add_uint8 ( out, BT_LTEP );
2296            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2297            evbuffer_add       ( out, benc, bencLen );
2298            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2299            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2300            dbgOutMessageLen( msgs );
2301
2302            tr_free( benc );
2303            tr_bencFree( &val );
2304        }
2305
2306        /* cleanup */
2307        tr_free( diffs.added );
2308        tr_free( diffs.dropped );
2309        tr_free( newPex );
2310        tr_free( diffs6.added );
2311        tr_free( diffs6.dropped );
2312        tr_free( newPex6 );
2313
2314        /*msgs->clientSentPexAt = tr_time( );*/
2315    }
2316}
2317
2318static void
2319pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2320{
2321    struct tr_peermsgs * msgs = vmsgs;
2322
2323    sendPex( msgs );
2324
2325    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2326}
2327
2328/**
2329***
2330**/
2331
2332tr_peermsgs*
2333tr_peerMsgsNew( struct tr_torrent    * torrent,
2334                struct tr_peer       * peer,
2335                tr_peer_callback     * callback,
2336                void                 * callbackData )
2337{
2338    tr_peermsgs * m;
2339
2340    assert( peer );
2341    assert( peer->io );
2342
2343    m = tr_new0( tr_peermsgs, 1 );
2344    m->callback = callback;
2345    m->callbackData = callbackData;
2346    m->peer = peer;
2347    m->torrent = torrent;
2348    m->peer->clientIsChoked = 1;
2349    m->peer->peerIsChoked = 1;
2350    m->peer->clientIsInterested = 0;
2351    m->peer->peerIsInterested = 0;
2352    m->state = AWAITING_BT_LENGTH;
2353    m->outMessages = evbuffer_new( );
2354    m->outMessagesBatchedAt = 0;
2355    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2356    m->incoming.block = evbuffer_new( );
2357    m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2358    peer->msgs = m;
2359    tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2360
2361    if( tr_peerIoSupportsUTP( peer->io ) ) {
2362        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2363        tr_peerMgrSetUtpSupported( torrent, addr );
2364        tr_peerMgrSetUtpFailed( torrent, addr, false );
2365    }
2366
2367    if( tr_peerIoSupportsLTEP( peer->io ) )
2368        sendLtepHandshake( m );
2369
2370    tellPeerWhatWeHave( m );
2371
2372    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2373    {
2374        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2375        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2376        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2377            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2378        }
2379    }
2380
2381    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2382    updateDesiredRequestCount( m );
2383
2384    return m;
2385}
2386
2387void
2388tr_peerMsgsFree( tr_peermsgs* msgs )
2389{
2390    if( msgs )
2391    {
2392        event_free( msgs->pexTimer );
2393
2394        evbuffer_free( msgs->incoming.block );
2395        evbuffer_free( msgs->outMessages );
2396        tr_free( msgs->pex6 );
2397        tr_free( msgs->pex );
2398
2399        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2400        tr_free( msgs );
2401    }
2402}
Note: See TracBrowser for help on using the repository browser.