source: trunk/libtransmission/peer-msgs.c @ 12230

Last change on this file since 12230 was 12230, checked in by jordan, 11 years ago

(trunk libT) copyediting: yes, removing more unnecessary #includes

  • Property svn:keywords set to Date Rev Author Id
File size: 71.9 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 12230 2011-03-25 06:20:12Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event2/buffer.h>
22#include <event2/bufferevent.h>
23#include <event2/event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "cache.h"
28#include "completion.h"
29#include "crypto.h" /* tr_sha1() */
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "session.h"
34#include "torrent.h"
35#include "torrent-magnet.h"
36#include "tr-dht.h"
37#include "utils.h"
38#include "version.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56
57    BT_FEXT_SUGGEST         = 13,
58    BT_FEXT_HAVE_ALL        = 14,
59    BT_FEXT_HAVE_NONE       = 15,
60    BT_FEXT_REJECT          = 16,
61    BT_FEXT_ALLOWED_FAST    = 17,
62
63    BT_LTEP                 = 20,
64
65    LTEP_HANDSHAKE          = 0,
66
67    UT_PEX_ID               = 1,
68    UT_METADATA_ID          = 3,
69
70    MAX_PEX_PEER_COUNT      = 50,
71
72    MIN_CHOKE_PERIOD_SEC    = 10,
73
74    /* idle seconds before we send a keepalive */
75    KEEPALIVE_INTERVAL_SECS = 100,
76
77    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
78
79    REQQ                    = 512,
80
81    METADATA_REQQ           = 64,
82
83    /* used in lowering the outMessages queue period */
84    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
85    HIGH_PRIORITY_INTERVAL_SECS = 2,
86    LOW_PRIORITY_INTERVAL_SECS = 10,
87
88    /* number of pieces we'll allow in our fast set */
89    MAX_FAST_SET_SIZE = 3,
90
91    /* defined in BEP #9 */
92    METADATA_MSG_TYPE_REQUEST = 0,
93    METADATA_MSG_TYPE_DATA = 1,
94    METADATA_MSG_TYPE_REJECT = 2
95};
96
97enum
98{
99    AWAITING_BT_LENGTH,
100    AWAITING_BT_ID,
101    AWAITING_BT_MESSAGE,
102    AWAITING_BT_PIECE
103};
104
105/**
106***
107**/
108
109struct peer_request
110{
111    uint32_t    index;
112    uint32_t    offset;
113    uint32_t    length;
114};
115
116static void
117blockToReq( const tr_torrent     * tor,
118            tr_block_index_t       block,
119            struct peer_request  * setme )
120{
121    tr_torrentGetBlockLocation( tor, block, &setme->index,
122                                            &setme->offset,
123                                            &setme->length );
124}
125
126/**
127***
128**/
129
130/* this is raw, unchanged data from the peer regarding
131 * the current message that it's sending us. */
132struct tr_incoming
133{
134    uint8_t                id;
135    uint32_t               length; /* includes the +1 for id length */
136    struct peer_request    blockReq; /* metadata for incoming blocks */
137    struct evbuffer *      block; /* piece data for incoming blocks */
138};
139
140/**
141 * Low-level communication state information about a connected peer.
142 *
143 * This structure remembers the low-level protocol states that we're
144 * in with this peer, such as active requests, pex messages, and so on.
145 * Its fields are all private to peer-msgs.c.
146 *
147 * Data not directly involved with sending & receiving messages is
148 * stored in tr_peer, where it can be accessed by both peermsgs and
149 * the peer manager.
150 *
151 * @see struct peer_atom
152 * @see tr_peer
153 */
154struct tr_peermsgs
155{
156    bool            peerSupportsPex;
157    bool            peerSupportsMetadataXfer;
158    bool            clientSentLtepHandshake;
159    bool            peerSentLtepHandshake;
160
161    /*bool          haveFastSet;*/
162
163    int             desiredRequestCount;
164
165    int             prefetchCount;
166
167    /* how long the outMessages batch should be allowed to grow before
168     * it's flushed -- some messages (like requests >:) should be sent
169     * very quickly; others aren't as urgent. */
170    int8_t          outMessagesBatchPeriod;
171
172    uint8_t         state;
173    uint8_t         ut_pex_id;
174    uint8_t         ut_metadata_id;
175    uint16_t        pexCount;
176    uint16_t        pexCount6;
177
178    size_t          metadata_size_hint;
179#if 0
180    size_t                 fastsetSize;
181    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
182#endif
183
184    tr_peer *              peer;
185
186    tr_torrent *           torrent;
187
188    tr_peer_callback      * callback;
189    void                  * callbackData;
190
191    struct evbuffer *      outMessages; /* all the non-piece messages */
192
193    struct peer_request    peerAskedFor[REQQ];
194
195    int                    peerAskedForMetadata[METADATA_REQQ];
196    int                    peerAskedForMetadataCount;
197
198    tr_pex               * pex;
199    tr_pex               * pex6;
200
201    /*time_t                 clientSentPexAt;*/
202    time_t                 clientSentAnythingAt;
203
204    /* when we started batching the outMessages */
205    time_t                outMessagesBatchedAt;
206
207    struct tr_incoming    incoming;
208
209    /* if the peer supports the Extension Protocol in BEP 10 and
210       supplied a reqq argument, it's stored here. Otherwise, the
211       value is zero and should be ignored. */
212    int64_t               reqq;
213
214    struct event        * pexTimer;
215};
216
217/**
218***
219**/
220
221#if 0
222static tr_bitfield*
223getHave( const struct tr_peermsgs * msgs )
224{
225    if( msgs->peer->have == NULL )
226        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
227    return msgs->peer->have;
228}
229#endif
230
231static inline tr_session*
232getSession( struct tr_peermsgs * msgs )
233{
234    return msgs->torrent->session;
235}
236
237/**
238***
239**/
240
241static void
242myDebug( const char * file, int line,
243         const struct tr_peermsgs * msgs,
244         const char * fmt, ... )
245{
246    FILE * fp = tr_getLog( );
247
248    if( fp )
249    {
250        va_list           args;
251        char              timestr[64];
252        struct evbuffer * buf = evbuffer_new( );
253        char *            base = tr_basename( file );
254
255        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
256                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
257                             tr_torrentName( msgs->torrent ),
258                             tr_peerIoGetAddrStr( msgs->peer->io ),
259                             msgs->peer->client );
260        va_start( args, fmt );
261        evbuffer_add_vprintf( buf, fmt, args );
262        va_end( args );
263        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
264        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
265
266        tr_free( base );
267        evbuffer_free( buf );
268    }
269}
270
271#define dbgmsg( msgs, ... ) \
272    do { \
273        if( tr_deepLoggingIsActive( ) ) \
274            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
275    } while( 0 )
276
277/**
278***
279**/
280
281static void
282pokeBatchPeriod( tr_peermsgs * msgs, int interval )
283{
284    if( msgs->outMessagesBatchPeriod > interval )
285    {
286        msgs->outMessagesBatchPeriod = interval;
287        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
288    }
289}
290
291static void
292dbgOutMessageLen( tr_peermsgs * msgs )
293{
294    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
295}
296
297static void
298protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
299{
300    struct evbuffer * out = msgs->outMessages;
301
302    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
303
304    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
305    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
306    evbuffer_add_uint32( out, req->index );
307    evbuffer_add_uint32( out, req->offset );
308    evbuffer_add_uint32( out, req->length );
309
310    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
311    dbgOutMessageLen( msgs );
312}
313
314static void
315protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
316{
317    struct evbuffer * out = msgs->outMessages;
318
319    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
320    evbuffer_add_uint8 ( out, BT_REQUEST );
321    evbuffer_add_uint32( out, req->index );
322    evbuffer_add_uint32( out, req->offset );
323    evbuffer_add_uint32( out, req->length );
324
325    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
326    dbgOutMessageLen( msgs );
327    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
328}
329
330static void
331protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
332{
333    struct evbuffer * out = msgs->outMessages;
334
335    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
336    evbuffer_add_uint8 ( out, BT_CANCEL );
337    evbuffer_add_uint32( out, req->index );
338    evbuffer_add_uint32( out, req->offset );
339    evbuffer_add_uint32( out, req->length );
340
341    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
342    dbgOutMessageLen( msgs );
343    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
344}
345
346static void
347protocolSendPort(tr_peermsgs *msgs, uint16_t port)
348{
349    struct evbuffer * out = msgs->outMessages;
350
351    dbgmsg( msgs, "sending Port %u", port);
352    evbuffer_add_uint32( out, 3 );
353    evbuffer_add_uint8 ( out, BT_PORT );
354    evbuffer_add_uint16( out, port);
355}
356
357static void
358protocolSendHave( tr_peermsgs * msgs, uint32_t index )
359{
360    struct evbuffer * out = msgs->outMessages;
361
362    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
363    evbuffer_add_uint8 ( out, BT_HAVE );
364    evbuffer_add_uint32( out, index );
365
366    dbgmsg( msgs, "sending Have %u", index );
367    dbgOutMessageLen( msgs );
368    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
369}
370
371#if 0
372static void
373protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
374{
375    tr_peerIo       * io  = msgs->peer->io;
376    struct evbuffer * out = msgs->outMessages;
377
378    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
379
380    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
381    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
382    evbuffer_add_uint32( io, out, pieceIndex );
383
384    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
385    dbgOutMessageLen( msgs );
386}
387#endif
388
389static void
390protocolSendChoke( tr_peermsgs * msgs, int choke )
391{
392    struct evbuffer * out = msgs->outMessages;
393
394    evbuffer_add_uint32( out, sizeof( uint8_t ) );
395    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
396
397    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
398    dbgOutMessageLen( msgs );
399    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
400}
401
402static void
403protocolSendHaveAll( tr_peermsgs * msgs )
404{
405    struct evbuffer * out = msgs->outMessages;
406
407    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
408
409    evbuffer_add_uint32( out, sizeof( uint8_t ) );
410    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
411
412    dbgmsg( msgs, "sending HAVE_ALL..." );
413    dbgOutMessageLen( msgs );
414    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
415}
416
417static void
418protocolSendHaveNone( tr_peermsgs * msgs )
419{
420    struct evbuffer * out = msgs->outMessages;
421
422    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
423
424    evbuffer_add_uint32( out, sizeof( uint8_t ) );
425    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
426
427    dbgmsg( msgs, "sending HAVE_NONE..." );
428    dbgOutMessageLen( msgs );
429    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
430}
431
432/**
433***  EVENTS
434**/
435
436static void
437publish( tr_peermsgs * msgs, tr_peer_event * e )
438{
439    assert( msgs->peer );
440    assert( msgs->peer->msgs == msgs );
441
442    if( msgs->callback != NULL )
443        msgs->callback( msgs->peer, e, msgs->callbackData );
444}
445
446static void
447fireError( tr_peermsgs * msgs, int err )
448{
449    tr_peer_event e = TR_PEER_EVENT_INIT;
450    e.eventType = TR_PEER_ERROR;
451    e.err = err;
452    publish( msgs, &e );
453}
454
455static void
456fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
457{
458    tr_peer_event e = TR_PEER_EVENT_INIT;
459    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
460    e.pieceIndex = req->index;
461    e.offset = req->offset;
462    e.length = req->length;
463    publish( msgs, &e );
464}
465
466static void
467fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
468{
469    tr_peer_event e = TR_PEER_EVENT_INIT;
470    e.eventType = TR_PEER_CLIENT_GOT_REJ;
471    e.pieceIndex = req->index;
472    e.offset = req->offset;
473    e.length = req->length;
474    publish( msgs, &e );
475}
476
477static void
478fireGotChoke( tr_peermsgs * msgs )
479{
480    tr_peer_event e = TR_PEER_EVENT_INIT;
481    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
482    publish( msgs, &e );
483}
484
485static void
486fireClientGotHaveAll( tr_peermsgs * msgs )
487{
488    tr_peer_event e = TR_PEER_EVENT_INIT;
489    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
490    publish( msgs, &e );
491}
492
493static void
494fireClientGotHaveNone( tr_peermsgs * msgs )
495{
496    tr_peer_event e = TR_PEER_EVENT_INIT;
497    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
498    publish( msgs, &e );
499}
500
501static void
502fireClientGotData( tr_peermsgs * msgs, uint32_t length, int wasPieceData )
503{
504    tr_peer_event e = TR_PEER_EVENT_INIT;
505
506    e.length = length;
507    e.eventType = TR_PEER_CLIENT_GOT_DATA;
508    e.wasPieceData = wasPieceData;
509    publish( msgs, &e );
510}
511
512static void
513fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
514{
515    tr_peer_event e = TR_PEER_EVENT_INIT;
516    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
517    e.pieceIndex = pieceIndex;
518    publish( msgs, &e );
519}
520
521static void
522fireClientGotPort( tr_peermsgs * msgs, tr_port port )
523{
524    tr_peer_event e = TR_PEER_EVENT_INIT;
525    e.eventType = TR_PEER_CLIENT_GOT_PORT;
526    e.port = port;
527    publish( msgs, &e );
528}
529
530static void
531fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
532{
533    tr_peer_event e = TR_PEER_EVENT_INIT;
534    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
535    e.pieceIndex = pieceIndex;
536    publish( msgs, &e );
537}
538
539static void
540fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
541{
542    tr_peer_event e = TR_PEER_EVENT_INIT;
543    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
544    e.bitfield = bitfield;
545    publish( msgs, &e );
546}
547
548static void
549fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
550{
551    tr_peer_event e = TR_PEER_EVENT_INIT;
552    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
553    e.pieceIndex = index;
554    publish( msgs, &e );
555}
556
557static void
558firePeerGotData( tr_peermsgs * msgs, uint32_t length, bool wasPieceData )
559{
560    tr_peer_event e = TR_PEER_EVENT_INIT;
561
562    e.length = length;
563    e.eventType = TR_PEER_PEER_GOT_DATA;
564    e.wasPieceData = wasPieceData;
565
566    publish( msgs, &e );
567}
568
569/**
570***  ALLOWED FAST SET
571***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
572**/
573
574#if 0
575size_t
576tr_generateAllowedSet( tr_piece_index_t * setmePieces,
577                       size_t             desiredSetSize,
578                       size_t             pieceCount,
579                       const uint8_t    * infohash,
580                       const tr_address * addr )
581{
582    size_t setSize = 0;
583
584    assert( setmePieces );
585    assert( desiredSetSize <= pieceCount );
586    assert( desiredSetSize );
587    assert( pieceCount );
588    assert( infohash );
589    assert( addr );
590
591    if( addr->type == TR_AF_INET )
592    {
593        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
594        uint8_t x[SHA_DIGEST_LENGTH];
595
596        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
597        memcpy( w, &ui32, sizeof( uint32_t ) );
598        walk += sizeof( uint32_t );
599        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
600        walk += SHA_DIGEST_LENGTH;
601        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
602        assert( sizeof( w ) == walk-w );
603
604        while( setSize<desiredSetSize )
605        {
606            int i;
607            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
608            {
609                size_t k;
610                uint32_t j = i * 4;                                  /* (5) */
611                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
612                uint32_t index = y % pieceCount;                     /* (7) */
613
614                for( k=0; k<setSize; ++k )                           /* (8) */
615                    if( setmePieces[k] == index )
616                        break;
617
618                if( k == setSize )
619                    setmePieces[setSize++] = index;                  /* (9) */
620            }
621
622            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
623        }
624    }
625
626    return setSize;
627}
628
629static void
630updateFastSet( tr_peermsgs * msgs UNUSED )
631{
632    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
633    const int peerIsNeedy = msgs->peer->progress < 0.10;
634
635    if( fext && peerIsNeedy && !msgs->haveFastSet )
636    {
637        size_t i;
638        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
639        const tr_info * inf = &msgs->torrent->info;
640        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
641
642        /* build the fast set */
643        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
644        msgs->haveFastSet = 1;
645
646        /* send it to the peer */
647        for( i=0; i<msgs->fastsetSize; ++i )
648            protocolSendAllowedFast( msgs, msgs->fastset[i] );
649    }
650}
651#endif
652
653/**
654***  INTEREST
655**/
656
657static void
658sendInterest( tr_peermsgs * msgs, bool clientIsInterested )
659{
660    struct evbuffer * out = msgs->outMessages;
661
662    assert( msgs );
663    assert( tr_isBool( clientIsInterested ) );
664
665    msgs->peer->clientIsInterested = clientIsInterested;
666    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
667    evbuffer_add_uint32( out, sizeof( uint8_t ) );
668    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
669
670    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
671    dbgOutMessageLen( msgs );
672}
673
674static void
675updateInterest( tr_peermsgs * msgs UNUSED )
676{
677    /* FIXME -- might need to poke the mgr on startup */
678}
679
680void
681tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
682{
683    assert( tr_isBool( isInterested ) );
684
685    if( isInterested != msgs->peer->clientIsInterested )
686        sendInterest( msgs, isInterested );
687}
688
689static bool
690popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
691{
692    if( msgs->peerAskedForMetadataCount == 0 )
693        return false;
694
695    *piece = msgs->peerAskedForMetadata[0];
696
697    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
698                               msgs->peerAskedForMetadataCount-- );
699
700    return true;
701}
702
703static bool
704popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
705{
706    if( msgs->peer->pendingReqsToClient == 0 )
707        return false;
708
709    *setme = msgs->peerAskedFor[0];
710
711    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
712                               msgs->peer->pendingReqsToClient-- );
713
714    return true;
715}
716
717static void
718cancelAllRequestsToClient( tr_peermsgs * msgs )
719{
720    struct peer_request req;
721    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
722
723    while( popNextRequest( msgs, &req ))
724        if( mustSendCancel )
725            protocolSendReject( msgs, &req );
726}
727
728void
729tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
730{
731    const time_t now = tr_time( );
732    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
733
734    assert( msgs );
735    assert( msgs->peer );
736    assert( choke == 0 || choke == 1 );
737
738    if( msgs->peer->chokeChangedAt > fibrillationTime )
739    {
740        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
741    }
742    else if( msgs->peer->peerIsChoked != choke )
743    {
744        msgs->peer->peerIsChoked = choke;
745        if( choke )
746            cancelAllRequestsToClient( msgs );
747        protocolSendChoke( msgs, choke );
748        msgs->peer->chokeChangedAt = now;
749    }
750}
751
752/**
753***
754**/
755
756void
757tr_peerMsgsHave( tr_peermsgs * msgs, uint32_t index )
758{
759    protocolSendHave( msgs, index );
760
761    /* since we have more pieces now, we might not be interested in this peer */
762    updateInterest( msgs );
763}
764
765/**
766***
767**/
768
769static bool
770reqIsValid( const tr_peermsgs * peer,
771            uint32_t            index,
772            uint32_t            offset,
773            uint32_t            length )
774{
775    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
776}
777
778static bool
779requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
780{
781    return reqIsValid( msgs, req->index, req->offset, req->length );
782}
783
784void
785tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
786{
787    struct peer_request req;
788/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
789    blockToReq( msgs->torrent, block, &req );
790    protocolSendCancel( msgs, &req );
791}
792
793/**
794***
795**/
796
797static void
798sendLtepHandshake( tr_peermsgs * msgs )
799{
800    tr_benc val, *m;
801    char * buf;
802    int len;
803    bool allow_pex;
804    bool allow_metadata_xfer;
805    struct evbuffer * out = msgs->outMessages;
806    const unsigned char * ipv6 = tr_globalIPv6();
807
808    if( msgs->clientSentLtepHandshake )
809        return;
810
811    dbgmsg( msgs, "sending an ltep handshake" );
812    msgs->clientSentLtepHandshake = 1;
813
814    /* decide if we want to advertise metadata xfer support (BEP 9) */
815    if( tr_torrentIsPrivate( msgs->torrent ) )
816        allow_metadata_xfer = 0;
817    else
818        allow_metadata_xfer = 1;
819
820    /* decide if we want to advertise pex support */
821    if( !tr_torrentAllowsPex( msgs->torrent ) )
822        allow_pex = 0;
823    else if( msgs->peerSentLtepHandshake )
824        allow_pex = msgs->peerSupportsPex ? 1 : 0;
825    else
826        allow_pex = 1;
827
828    tr_bencInitDict( &val, 8 );
829    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
830    if( ipv6 != NULL )
831        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
832    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
833                            && ( msgs->torrent->infoDictLength > 0 ) )
834        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
835    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
836    tr_bencDictAddInt( &val, "reqq", REQQ );
837    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
838    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
839    m  = tr_bencDictAddDict( &val, "m", 2 );
840    if( allow_metadata_xfer )
841        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
842    if( allow_pex )
843        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
844
845    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
846
847    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
848    evbuffer_add_uint8 ( out, BT_LTEP );
849    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
850    evbuffer_add       ( out, buf, len );
851    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
852    dbgOutMessageLen( msgs );
853
854    /* cleanup */
855    tr_bencFree( &val );
856    tr_free( buf );
857}
858
859static void
860parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
861{
862    int64_t   i;
863    tr_benc   val, * sub;
864    uint8_t * tmp = tr_new( uint8_t, len );
865    const uint8_t *addr;
866    size_t addr_len;
867    tr_pex pex;
868    int8_t seedProbability = -1;
869
870    memset( &pex, 0, sizeof( tr_pex ) );
871
872    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
873    msgs->peerSentLtepHandshake = 1;
874
875    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
876    {
877        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
878        tr_free( tmp );
879        return;
880    }
881
882    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
883
884    /* does the peer prefer encrypted connections? */
885    if( tr_bencDictFindInt( &val, "e", &i ) ) {
886        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
887                                              : ENCRYPTION_PREFERENCE_NO;
888        if( i )
889            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
890    }
891
892    /* check supported messages for utorrent pex */
893    msgs->peerSupportsPex = 0;
894    msgs->peerSupportsMetadataXfer = 0;
895
896    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
897        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
898            msgs->peerSupportsPex = i != 0;
899            msgs->ut_pex_id = (uint8_t) i;
900            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
901        }
902        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
903            msgs->peerSupportsMetadataXfer = i != 0;
904            msgs->ut_metadata_id = (uint8_t) i;
905            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
906        }
907        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
908            /* Mysterious µTorrent extension that we don't grok.  However,
909               it implies support for µTP, so use it to indicate that. */
910            tr_peerMgrSetUtpFailed( msgs->torrent,
911                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
912                                    false );
913        }
914    }
915
916    /* look for metainfo size (BEP 9) */
917    if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) {
918        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
919        msgs->metadata_size_hint = (size_t) i;
920    }
921
922    /* look for upload_only (BEP 21) */
923    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
924        seedProbability = i==0 ? 0 : 100;
925
926    /* get peer's listening port */
927    if( tr_bencDictFindInt( &val, "p", &i ) ) {
928        pex.port = htons( (uint16_t)i );
929        fireClientGotPort( msgs, pex.port );
930        dbgmsg( msgs, "peer's port is now %d", (int)i );
931    }
932
933    if( tr_peerIoIsIncoming( msgs->peer->io )
934        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
935        && ( addr_len == 4 ) )
936    {
937        pex.addr.type = TR_AF_INET;
938        memcpy( &pex.addr.addr.addr4, addr, 4 );
939        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
940    }
941
942    if( tr_peerIoIsIncoming( msgs->peer->io )
943        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
944        && ( addr_len == 16 ) )
945    {
946        pex.addr.type = TR_AF_INET6;
947        memcpy( &pex.addr.addr.addr6, addr, 16 );
948        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
949    }
950
951    /* get peer's maximum request queue size */
952    if( tr_bencDictFindInt( &val, "reqq", &i ) )
953        msgs->reqq = i;
954
955    tr_bencFree( &val );
956    tr_free( tmp );
957}
958
959static void
960parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
961{
962    tr_benc dict;
963    char * msg_end;
964    char * benc_end;
965    int64_t msg_type = -1;
966    int64_t piece = -1;
967    int64_t total_size = 0;
968    uint8_t * tmp = tr_new( uint8_t, msglen );
969
970    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
971    msg_end = (char*)tmp + msglen;
972
973    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
974    {
975        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
976        tr_bencDictFindInt( &dict, "piece", &piece );
977        tr_bencDictFindInt( &dict, "total_size", &total_size );
978        tr_bencFree( &dict );
979    }
980
981    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
982            (int)msg_type, (int)piece, (int)total_size );
983
984    if( msg_type == METADATA_MSG_TYPE_REJECT )
985    {
986        /* NOOP */
987    }
988
989    if( ( msg_type == METADATA_MSG_TYPE_DATA )
990        && ( !tr_torrentHasMetadata( msgs->torrent ) )
991        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
992        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
993    {
994        const int pieceLen = msg_end - benc_end;
995        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
996    }
997
998    if( msg_type == METADATA_MSG_TYPE_REQUEST )
999    {
1000        if( ( piece >= 0 )
1001            && tr_torrentHasMetadata( msgs->torrent )
1002            && !tr_torrentIsPrivate( msgs->torrent )
1003            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1004        {
1005            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1006        }
1007        else
1008        {
1009            tr_benc tmp;
1010            int payloadLen;
1011            char * payload;
1012            struct evbuffer * out = msgs->outMessages;
1013
1014            /* build the rejection message */
1015            tr_bencInitDict( &tmp, 2 );
1016            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1017            tr_bencDictAddInt( &tmp, "piece", piece );
1018            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1019            tr_bencFree( &tmp );
1020
1021            /* write it out as a LTEP message to our outMessages buffer */
1022            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1023            evbuffer_add_uint8 ( out, BT_LTEP );
1024            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1025            evbuffer_add       ( out, payload, payloadLen );
1026            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1027            dbgOutMessageLen( msgs );
1028
1029            tr_free( payload );
1030        }
1031    }
1032
1033    tr_free( tmp );
1034}
1035
1036static void
1037parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1038{
1039    int loaded = 0;
1040    uint8_t * tmp = tr_new( uint8_t, msglen );
1041    tr_benc val;
1042    tr_torrent * tor = msgs->torrent;
1043    const uint8_t * added;
1044    size_t added_len;
1045
1046    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1047
1048    if( tr_torrentAllowsPex( tor )
1049      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1050    {
1051        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1052        {
1053            tr_pex * pex;
1054            size_t i, n;
1055            size_t added_f_len = 0;
1056            const uint8_t * added_f = NULL;
1057
1058            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1059            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1060
1061            n = MIN( n, MAX_PEX_PEER_COUNT );
1062            for( i=0; i<n; ++i )
1063            {
1064                int seedProbability = -1;
1065                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1066                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1067            }
1068
1069            tr_free( pex );
1070        }
1071
1072        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1073        {
1074            tr_pex * pex;
1075            size_t i, n;
1076            size_t added_f_len = 0;
1077            const uint8_t * added_f = NULL;
1078
1079            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1080            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1081
1082            n = MIN( n, MAX_PEX_PEER_COUNT );
1083            for( i=0; i<n; ++i )
1084            {
1085                int seedProbability = -1;
1086                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1087                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1088            }
1089
1090            tr_free( pex );
1091        }
1092    }
1093
1094    if( loaded )
1095        tr_bencFree( &val );
1096    tr_free( tmp );
1097}
1098
1099static void sendPex( tr_peermsgs * msgs );
1100
1101static void
1102parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1103{
1104    uint8_t ltep_msgid;
1105
1106    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1107    msglen--;
1108
1109    if( ltep_msgid == LTEP_HANDSHAKE )
1110    {
1111        dbgmsg( msgs, "got ltep handshake" );
1112        parseLtepHandshake( msgs, msglen, inbuf );
1113        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1114        {
1115            sendLtepHandshake( msgs );
1116            sendPex( msgs );
1117        }
1118    }
1119    else if( ltep_msgid == UT_PEX_ID )
1120    {
1121        dbgmsg( msgs, "got ut pex" );
1122        msgs->peerSupportsPex = 1;
1123        parseUtPex( msgs, msglen, inbuf );
1124    }
1125    else if( ltep_msgid == UT_METADATA_ID )
1126    {
1127        dbgmsg( msgs, "got ut metadata" );
1128        msgs->peerSupportsMetadataXfer = 1;
1129        parseUtMetadata( msgs, msglen, inbuf );
1130    }
1131    else
1132    {
1133        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1134        evbuffer_drain( inbuf, msglen );
1135    }
1136}
1137
1138static int
1139readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1140{
1141    uint32_t len;
1142
1143    if( inlen < sizeof( len ) )
1144        return READ_LATER;
1145
1146    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1147
1148    if( len == 0 ) /* peer sent us a keepalive message */
1149        dbgmsg( msgs, "got KeepAlive" );
1150    else
1151    {
1152        msgs->incoming.length = len;
1153        msgs->state = AWAITING_BT_ID;
1154    }
1155
1156    return READ_NOW;
1157}
1158
1159static int readBtMessage( tr_peermsgs *, struct evbuffer *, size_t );
1160
1161static int
1162readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1163{
1164    uint8_t id;
1165
1166    if( inlen < sizeof( uint8_t ) )
1167        return READ_LATER;
1168
1169    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1170    msgs->incoming.id = id;
1171    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1172
1173    if( id == BT_PIECE )
1174    {
1175        msgs->state = AWAITING_BT_PIECE;
1176        return READ_NOW;
1177    }
1178    else if( msgs->incoming.length != 1 )
1179    {
1180        msgs->state = AWAITING_BT_MESSAGE;
1181        return READ_NOW;
1182    }
1183    else return readBtMessage( msgs, inbuf, inlen - 1 );
1184}
1185
1186static void
1187updatePeerProgress( tr_peermsgs * msgs )
1188{
1189    tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1190
1191    /*updateFastSet( msgs );*/
1192    updateInterest( msgs );
1193}
1194
1195static void
1196prefetchPieces( tr_peermsgs *msgs )
1197{
1198    int i;
1199
1200    if( !getSession(msgs)->isPrefetchEnabled )
1201        return;
1202
1203    /* Maintain 12 prefetched blocks per unchoked peer */
1204    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1205    {
1206        const struct peer_request * req = msgs->peerAskedFor + i;
1207        if( requestIsValid( msgs, req ) )
1208        {
1209            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1210            ++msgs->prefetchCount;
1211        }
1212    }
1213}
1214
1215static void
1216peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1217{
1218    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1219    const int reqIsValid = requestIsValid( msgs, req );
1220    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1221    const int peerIsChoked = msgs->peer->peerIsChoked;
1222
1223    int allow = false;
1224
1225    if( !reqIsValid )
1226        dbgmsg( msgs, "rejecting an invalid request." );
1227    else if( !clientHasPiece )
1228        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1229    else if( peerIsChoked )
1230        dbgmsg( msgs, "rejecting request from choked peer" );
1231    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1232        dbgmsg( msgs, "rejecting request ... reqq is full" );
1233    else
1234        allow = true;
1235
1236    if( allow ) {
1237        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1238        prefetchPieces( msgs );
1239    } else if( fext ) {
1240        protocolSendReject( msgs, req );
1241    }
1242}
1243
1244static bool
1245messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1246{
1247    switch( id )
1248    {
1249        case BT_CHOKE:
1250        case BT_UNCHOKE:
1251        case BT_INTERESTED:
1252        case BT_NOT_INTERESTED:
1253        case BT_FEXT_HAVE_ALL:
1254        case BT_FEXT_HAVE_NONE:
1255            return len == 1;
1256
1257        case BT_HAVE:
1258        case BT_FEXT_SUGGEST:
1259        case BT_FEXT_ALLOWED_FAST:
1260            return len == 5;
1261
1262        case BT_BITFIELD:
1263            if( tr_torrentHasMetadata( msg->torrent ) )
1264                return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1265            /* we don't know the piece count yet,
1266               so we can only guess whether to send true or false */
1267            if( msg->metadata_size_hint > 0 )
1268                return len <= msg->metadata_size_hint;
1269            return true;
1270
1271        case BT_REQUEST:
1272        case BT_CANCEL:
1273        case BT_FEXT_REJECT:
1274            return len == 13;
1275
1276        case BT_PIECE:
1277            return len > 9 && len <= 16393;
1278
1279        case BT_PORT:
1280            return len == 3;
1281
1282        case BT_LTEP:
1283            return len >= 2;
1284
1285        default:
1286            return false;
1287    }
1288}
1289
1290static int clientGotBlock( tr_peermsgs *               msgs,
1291                           struct evbuffer *           block,
1292                           const struct peer_request * req );
1293
1294static int
1295readBtPiece( tr_peermsgs      * msgs,
1296             struct evbuffer  * inbuf,
1297             size_t             inlen,
1298             size_t           * setme_piece_bytes_read )
1299{
1300    struct peer_request * req = &msgs->incoming.blockReq;
1301
1302    assert( evbuffer_get_length( inbuf ) >= inlen );
1303    dbgmsg( msgs, "In readBtPiece" );
1304
1305    if( !req->length )
1306    {
1307        if( inlen < 8 )
1308            return READ_LATER;
1309
1310        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1311        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1312        req->length = msgs->incoming.length - 9;
1313        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1314        return READ_NOW;
1315    }
1316    else
1317    {
1318        int err;
1319
1320        /* read in another chunk of data */
1321        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1322        size_t n = MIN( nLeft, inlen );
1323        size_t i = n;
1324        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1325        const size_t buflen = SESSION_BUFFER_SIZE;
1326
1327        while( i > 0 )
1328        {
1329            const size_t thisPass = MIN( i, buflen );
1330            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1331            evbuffer_add( msgs->incoming.block, buf, thisPass );
1332            i -= thisPass;
1333        }
1334
1335        tr_sessionReleaseBuffer( getSession( msgs ) );
1336        buf = NULL;
1337
1338        fireClientGotData( msgs, n, true );
1339        *setme_piece_bytes_read += n;
1340        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1341               n, req->index, req->offset, req->length,
1342               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1343        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1344            return READ_LATER;
1345
1346        /* we've got the whole block ... process it */
1347        err = clientGotBlock( msgs, msgs->incoming.block, req );
1348
1349        /* cleanup */
1350        evbuffer_free( msgs->incoming.block );
1351        msgs->incoming.block = evbuffer_new( );
1352        req->length = 0;
1353        msgs->state = AWAITING_BT_LENGTH;
1354        return err ? READ_ERR : READ_NOW;
1355    }
1356}
1357
1358static void updateDesiredRequestCount( tr_peermsgs * msgs );
1359
1360static int
1361readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1362{
1363    uint32_t      ui32;
1364    uint32_t      msglen = msgs->incoming.length;
1365    const uint8_t id = msgs->incoming.id;
1366#ifndef NDEBUG
1367    const size_t  startBufLen = evbuffer_get_length( inbuf );
1368#endif
1369    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1370
1371    --msglen; /* id length */
1372
1373    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1374
1375    if( inlen < msglen )
1376        return READ_LATER;
1377
1378    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1379    {
1380        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1381        fireError( msgs, EMSGSIZE );
1382        return READ_ERR;
1383    }
1384
1385    switch( id )
1386    {
1387        case BT_CHOKE:
1388            dbgmsg( msgs, "got Choke" );
1389            msgs->peer->clientIsChoked = 1;
1390            if( !fext )
1391                fireGotChoke( msgs );
1392            break;
1393
1394        case BT_UNCHOKE:
1395            dbgmsg( msgs, "got Unchoke" );
1396            msgs->peer->clientIsChoked = 0;
1397            updateDesiredRequestCount( msgs );
1398            break;
1399
1400        case BT_INTERESTED:
1401            dbgmsg( msgs, "got Interested" );
1402            msgs->peer->peerIsInterested = 1;
1403            break;
1404
1405        case BT_NOT_INTERESTED:
1406            dbgmsg( msgs, "got Not Interested" );
1407            msgs->peer->peerIsInterested = 0;
1408            break;
1409
1410        case BT_HAVE:
1411            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1412            dbgmsg( msgs, "got Have: %u", ui32 );
1413            if( tr_torrentHasMetadata( msgs->torrent )
1414                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1415            {
1416                fireError( msgs, ERANGE );
1417                return READ_ERR;
1418            }
1419
1420            /* a peer can send the same HAVE message twice... */
1421            if( !tr_bitsetHas( &msgs->peer->have, ui32 ) ) {
1422                tr_bitsetAdd( &msgs->peer->have, ui32 );
1423                fireClientGotHave( msgs, ui32 );
1424            }
1425            updatePeerProgress( msgs );
1426            break;
1427
1428        case BT_BITFIELD: {
1429            tr_bitfield tmp = TR_BITFIELD_INIT;
1430            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1431                                  ? msgs->torrent->info.pieceCount
1432                                  : msglen * 8;
1433            tr_bitfieldConstruct( &tmp, bitCount );
1434            dbgmsg( msgs, "got a bitfield" );
1435            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp.bits, msglen );
1436            tr_bitsetSetBitfield( &msgs->peer->have, &tmp );
1437            fireClientGotBitfield( msgs, &tmp );
1438            tr_bitfieldDestruct( &tmp );
1439            updatePeerProgress( msgs );
1440            break;
1441        }
1442
1443        case BT_REQUEST:
1444        {
1445            struct peer_request r;
1446            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1447            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1448            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1449            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1450            peerMadeRequest( msgs, &r );
1451            break;
1452        }
1453
1454        case BT_CANCEL:
1455        {
1456            int i;
1457            struct peer_request r;
1458            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1459            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1460            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1461            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1462            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1463
1464            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1465                const struct peer_request * req = msgs->peerAskedFor + i;
1466                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1467                    break;
1468            }
1469
1470            if( i < msgs->peer->pendingReqsToClient )
1471                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1472                                           msgs->peer->pendingReqsToClient-- );
1473            break;
1474        }
1475
1476        case BT_PIECE:
1477            assert( 0 ); /* handled elsewhere! */
1478            break;
1479
1480        case BT_PORT:
1481            dbgmsg( msgs, "Got a BT_PORT" );
1482            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1483            if( msgs->peer->dht_port > 0 )
1484                tr_dhtAddNode( getSession(msgs),
1485                               tr_peerAddress( msgs->peer ),
1486                               msgs->peer->dht_port, 0 );
1487            break;
1488
1489        case BT_FEXT_SUGGEST:
1490            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1491            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1492            if( fext )
1493                fireClientGotSuggest( msgs, ui32 );
1494            else {
1495                fireError( msgs, EMSGSIZE );
1496                return READ_ERR;
1497            }
1498            break;
1499
1500        case BT_FEXT_ALLOWED_FAST:
1501            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1502            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1503            if( fext )
1504                fireClientGotAllowedFast( msgs, ui32 );
1505            else {
1506                fireError( msgs, EMSGSIZE );
1507                return READ_ERR;
1508            }
1509            break;
1510
1511        case BT_FEXT_HAVE_ALL:
1512            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1513            if( fext ) {
1514                tr_bitsetSetHaveAll( &msgs->peer->have );
1515                fireClientGotHaveAll( msgs );
1516                updatePeerProgress( msgs );
1517            } else {
1518                fireError( msgs, EMSGSIZE );
1519                return READ_ERR;
1520            }
1521            break;
1522
1523        case BT_FEXT_HAVE_NONE:
1524            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1525            if( fext ) {
1526                tr_bitsetSetHaveNone( &msgs->peer->have );
1527                fireClientGotHaveNone( msgs );
1528                updatePeerProgress( msgs );
1529            } else {
1530                fireError( msgs, EMSGSIZE );
1531                return READ_ERR;
1532            }
1533            break;
1534
1535        case BT_FEXT_REJECT:
1536        {
1537            struct peer_request r;
1538            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1539            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1540            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1541            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1542            if( fext )
1543                fireGotRej( msgs, &r );
1544            else {
1545                fireError( msgs, EMSGSIZE );
1546                return READ_ERR;
1547            }
1548            break;
1549        }
1550
1551        case BT_LTEP:
1552            dbgmsg( msgs, "Got a BT_LTEP" );
1553            parseLtep( msgs, msglen, inbuf );
1554            break;
1555
1556        default:
1557            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1558            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1559            break;
1560    }
1561
1562    assert( msglen + 1 == msgs->incoming.length );
1563    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1564
1565    msgs->state = AWAITING_BT_LENGTH;
1566    return READ_NOW;
1567}
1568
1569static void
1570addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1571{
1572    if( !msgs->peer->blame )
1573         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1574    tr_bitfieldAdd( msgs->peer->blame, index );
1575}
1576
1577/* returns 0 on success, or an errno on failure */
1578static int
1579clientGotBlock( tr_peermsgs                * msgs,
1580                struct evbuffer            * data,
1581                const struct peer_request  * req )
1582{
1583    int err;
1584    tr_torrent * tor = msgs->torrent;
1585    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1586
1587    assert( msgs );
1588    assert( req );
1589
1590    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1591        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1592                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1593        return EMSGSIZE;
1594    }
1595
1596    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1597
1598    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1599        dbgmsg( msgs, "we didn't ask for this message..." );
1600        return 0;
1601    }
1602    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1603        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1604        return 0;
1605    }
1606
1607    /**
1608    ***  Save the block
1609    **/
1610
1611    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1612        return err;
1613
1614    addPeerToBlamefield( msgs, req->index );
1615    fireGotBlock( msgs, req );
1616    return 0;
1617}
1618
1619static int peerPulse( void * vmsgs );
1620
1621static void
1622didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1623{
1624    tr_peermsgs * msgs = vmsgs;
1625    firePeerGotData( msgs, bytesWritten, wasPieceData );
1626
1627    if ( tr_isPeerIo( io ) && io->userData )
1628        peerPulse( msgs );
1629}
1630
1631static ReadState
1632canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1633{
1634    ReadState         ret;
1635    tr_peermsgs *     msgs = vmsgs;
1636    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1637    const size_t      inlen = evbuffer_get_length( in );
1638
1639    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1640
1641    if( !inlen )
1642    {
1643        ret = READ_LATER;
1644    }
1645    else if( msgs->state == AWAITING_BT_PIECE )
1646    {
1647        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1648    }
1649    else switch( msgs->state )
1650    {
1651        case AWAITING_BT_LENGTH:
1652            ret = readBtLength ( msgs, in, inlen ); break;
1653
1654        case AWAITING_BT_ID:
1655            ret = readBtId     ( msgs, in, inlen ); break;
1656
1657        case AWAITING_BT_MESSAGE:
1658            ret = readBtMessage( msgs, in, inlen ); break;
1659
1660        default:
1661            ret = READ_ERR;
1662            assert( 0 );
1663    }
1664
1665    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1666
1667    /* log the raw data that was read */
1668    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1669        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), false );
1670
1671    return ret;
1672}
1673
1674int
1675tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1676{
1677    if( msgs->state != AWAITING_BT_PIECE )
1678        return false;
1679
1680    return block == _tr_block( msgs->torrent,
1681                               msgs->incoming.blockReq.index,
1682                               msgs->incoming.blockReq.offset );
1683}
1684
1685/**
1686***
1687**/
1688
1689static void
1690updateDesiredRequestCount( tr_peermsgs * msgs )
1691{
1692    const tr_torrent * const torrent = msgs->torrent;
1693
1694    if( tr_torrentIsSeed( msgs->torrent ) )
1695    {
1696        msgs->desiredRequestCount = 0;
1697    }
1698    else if( msgs->peer->clientIsChoked )
1699    {
1700        msgs->desiredRequestCount = 0;
1701    }
1702    else if( !msgs->peer->clientIsInterested )
1703    {
1704        msgs->desiredRequestCount = 0;
1705    }
1706    else
1707    {
1708        int estimatedBlocksInPeriod;
1709        int rate_Bps;
1710        int irate_Bps;
1711        const int floor = 4;
1712        const int seconds = REQUEST_BUF_SECS;
1713        const uint64_t now = tr_time_msec( );
1714
1715        /* Get the rate limit we should use.
1716         * FIXME: this needs to consider all the other peers as well... */
1717        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1718        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1719            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1720
1721        /* honor the session limits, if enabled */
1722        if( tr_torrentUsesSessionLimits( torrent ) )
1723            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1724                rate_Bps = MIN( rate_Bps, irate_Bps );
1725
1726        /* use this desired rate to figure out how
1727         * many requests we should send to this peer */
1728        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1729        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1730
1731        /* honor the peer's maximum request count, if specified */
1732        if( msgs->reqq > 0 )
1733            if( msgs->desiredRequestCount > msgs->reqq )
1734                msgs->desiredRequestCount = msgs->reqq;
1735    }
1736}
1737
1738static void
1739updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1740{
1741    int piece;
1742
1743    if( msgs->peerSupportsMetadataXfer
1744        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1745    {
1746        tr_benc tmp;
1747        int payloadLen;
1748        char * payload;
1749        struct evbuffer * out = msgs->outMessages;
1750
1751        /* build the data message */
1752        tr_bencInitDict( &tmp, 3 );
1753        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1754        tr_bencDictAddInt( &tmp, "piece", piece );
1755        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1756        tr_bencFree( &tmp );
1757
1758        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1759
1760        /* write it out as a LTEP message to our outMessages buffer */
1761        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1762        evbuffer_add_uint8 ( out, BT_LTEP );
1763        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1764        evbuffer_add       ( out, payload, payloadLen );
1765        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1766        dbgOutMessageLen( msgs );
1767
1768        tr_free( payload );
1769    }
1770}
1771
1772static void
1773updateBlockRequests( tr_peermsgs * msgs )
1774{
1775    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1776        && ( msgs->desiredRequestCount > 0 )
1777        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1778    {
1779        int i;
1780        int n;
1781        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1782        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1783
1784        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1785
1786        for( i=0; i<n; ++i )
1787        {
1788            struct peer_request req;
1789            blockToReq( msgs->torrent, blocks[i], &req );
1790            protocolSendRequest( msgs, &req );
1791        }
1792
1793        tr_free( blocks );
1794    }
1795}
1796
1797static size_t
1798fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1799{
1800    int piece;
1801    size_t bytesWritten = 0;
1802    struct peer_request req;
1803    const bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1804    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1805
1806    /**
1807    ***  Protocol messages
1808    **/
1809
1810    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1811    {
1812        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1813        msgs->outMessagesBatchedAt = now;
1814    }
1815    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1816    {
1817        const size_t len = evbuffer_get_length( msgs->outMessages );
1818        /* flush the protocol messages */
1819        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1820        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, false );
1821        msgs->clientSentAnythingAt = now;
1822        msgs->outMessagesBatchedAt = 0;
1823        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1824        bytesWritten +=  len;
1825    }
1826
1827    /**
1828    ***  Metadata Pieces
1829    **/
1830
1831    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1832        && popNextMetadataRequest( msgs, &piece ) )
1833    {
1834        char * data;
1835        int dataLen;
1836        bool ok = false;
1837
1838        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1839        if( ( dataLen > 0 ) && ( data != NULL ) )
1840        {
1841            tr_benc tmp;
1842            int payloadLen;
1843            char * payload;
1844            struct evbuffer * out = msgs->outMessages;
1845
1846            /* build the data message */
1847            tr_bencInitDict( &tmp, 3 );
1848            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1849            tr_bencDictAddInt( &tmp, "piece", piece );
1850            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1851            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1852            tr_bencFree( &tmp );
1853
1854            /* write it out as a LTEP message to our outMessages buffer */
1855            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1856            evbuffer_add_uint8 ( out, BT_LTEP );
1857            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1858            evbuffer_add       ( out, payload, payloadLen );
1859            evbuffer_add       ( out, data, dataLen );
1860            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1861            dbgOutMessageLen( msgs );
1862
1863            tr_free( payload );
1864            tr_free( data );
1865
1866            ok = true;
1867        }
1868
1869        if( !ok ) /* send a rejection message */
1870        {
1871            tr_benc tmp;
1872            int payloadLen;
1873            char * payload;
1874            struct evbuffer * out = msgs->outMessages;
1875
1876            /* build the rejection message */
1877            tr_bencInitDict( &tmp, 2 );
1878            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1879            tr_bencDictAddInt( &tmp, "piece", piece );
1880            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1881            tr_bencFree( &tmp );
1882
1883            /* write it out as a LTEP message to our outMessages buffer */
1884            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1885            evbuffer_add_uint8 ( out, BT_LTEP );
1886            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1887            evbuffer_add       ( out, payload, payloadLen );
1888            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1889            dbgOutMessageLen( msgs );
1890
1891            tr_free( payload );
1892        }
1893    }
1894
1895    /**
1896    ***  Data Blocks
1897    **/
1898
1899    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1900        && popNextRequest( msgs, &req ) )
1901    {
1902        --msgs->prefetchCount;
1903
1904        if( requestIsValid( msgs, &req )
1905            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1906        {
1907            int err;
1908            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1909            struct evbuffer * out;
1910            struct evbuffer_iovec iovec[1];
1911
1912            out = evbuffer_new( );
1913            evbuffer_expand( out, msglen );
1914
1915            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1916            evbuffer_add_uint8 ( out, BT_PIECE );
1917            evbuffer_add_uint32( out, req.index );
1918            evbuffer_add_uint32( out, req.offset );
1919
1920            evbuffer_reserve_space( out, req.length, iovec, 1 );
1921            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1922            iovec[0].iov_len = req.length;
1923            evbuffer_commit_space( out, iovec, 1 );
1924
1925            /* check the piece if it needs checking... */
1926            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1927                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1928                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1929
1930            if( err )
1931            {
1932                if( fext )
1933                    protocolSendReject( msgs, &req );
1934            }
1935            else
1936            {
1937                const size_t n = evbuffer_get_length( out );
1938                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1939                assert( n == msglen );
1940                tr_peerIoWriteBuf( msgs->peer->io, out, true );
1941                bytesWritten += n;
1942                msgs->clientSentAnythingAt = now;
1943                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1944            }
1945
1946            evbuffer_free( out );
1947
1948            if( err )
1949            {
1950                bytesWritten = 0;
1951                msgs = NULL;
1952            }
1953        }
1954        else if( fext ) /* peer needs a reject message */
1955        {
1956            protocolSendReject( msgs, &req );
1957        }
1958
1959        if( msgs != NULL )
1960            prefetchPieces( msgs );
1961    }
1962
1963    /**
1964    ***  Keepalive
1965    **/
1966
1967    if( ( msgs != NULL )
1968        && ( msgs->clientSentAnythingAt != 0 )
1969        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1970    {
1971        dbgmsg( msgs, "sending a keepalive message" );
1972        evbuffer_add_uint32( msgs->outMessages, 0 );
1973        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1974    }
1975
1976    return bytesWritten;
1977}
1978
1979static int
1980peerPulse( void * vmsgs )
1981{
1982    tr_peermsgs * msgs = vmsgs;
1983    const time_t  now = tr_time( );
1984
1985    if ( tr_isPeerIo( msgs->peer->io ) ) {
1986        updateDesiredRequestCount( msgs );
1987        updateBlockRequests( msgs );
1988        updateMetadataRequests( msgs, now );
1989    }
1990
1991    for( ;; )
1992        if( fillOutputBuffer( msgs, now ) < 1 )
1993            break;
1994
1995    return true; /* loop forever */
1996}
1997
1998void
1999tr_peerMsgsPulse( tr_peermsgs * msgs )
2000{
2001    if( msgs != NULL )
2002        peerPulse( msgs );
2003}
2004
2005static void
2006gotError( tr_peerIo * io UNUSED, short what, void * vmsgs )
2007{
2008    if( what & BEV_EVENT_TIMEOUT )
2009        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2010    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
2011        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2012               what, errno, tr_strerror( errno ) );
2013    fireError( vmsgs, ENOTCONN );
2014}
2015
2016static void
2017sendBitfield( tr_peermsgs * msgs )
2018{
2019    struct evbuffer * out = msgs->outMessages;
2020    tr_bitfield * bf = tr_cpCreatePieceBitfield( &msgs->torrent->completion );
2021
2022    evbuffer_add_uint32( out, sizeof( uint8_t ) + bf->byteCount );
2023    evbuffer_add_uint8 ( out, BT_BITFIELD );
2024    evbuffer_add       ( out, bf->bits, bf->byteCount );
2025    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
2026    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2027
2028    tr_bitfieldFree( bf );
2029}
2030
2031static void
2032tellPeerWhatWeHave( tr_peermsgs * msgs )
2033{
2034    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2035
2036    if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveAll ) )
2037    {
2038        protocolSendHaveAll( msgs );
2039    }
2040    else if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveNone ) )
2041    {
2042        protocolSendHaveNone( msgs );
2043    }
2044    else
2045    {
2046        sendBitfield( msgs );
2047    }
2048}
2049
2050/**
2051***
2052**/
2053
2054/* some peers give us error messages if we send
2055   more than this many peers in a single pex message
2056   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2057#define MAX_PEX_ADDED 50
2058#define MAX_PEX_DROPPED 50
2059
2060typedef struct
2061{
2062    tr_pex *  added;
2063    tr_pex *  dropped;
2064    tr_pex *  elements;
2065    int       addedCount;
2066    int       droppedCount;
2067    int       elementCount;
2068}
2069PexDiffs;
2070
2071static void
2072pexAddedCb( void * vpex, void * userData )
2073{
2074    PexDiffs * diffs = userData;
2075    tr_pex *   pex = vpex;
2076
2077    if( diffs->addedCount < MAX_PEX_ADDED )
2078    {
2079        diffs->added[diffs->addedCount++] = *pex;
2080        diffs->elements[diffs->elementCount++] = *pex;
2081    }
2082}
2083
2084static inline void
2085pexDroppedCb( void * vpex, void * userData )
2086{
2087    PexDiffs * diffs = userData;
2088    tr_pex *   pex = vpex;
2089
2090    if( diffs->droppedCount < MAX_PEX_DROPPED )
2091    {
2092        diffs->dropped[diffs->droppedCount++] = *pex;
2093    }
2094}
2095
2096static inline void
2097pexElementCb( void * vpex, void * userData )
2098{
2099    PexDiffs * diffs = userData;
2100    tr_pex * pex = vpex;
2101
2102    diffs->elements[diffs->elementCount++] = *pex;
2103}
2104
2105typedef void ( tr_set_func )( void * element, void * userData );
2106
2107/**
2108 * @brief find the differences and commonalities in two sorted sets
2109 * @param a the first set
2110 * @param aCount the number of elements in the set 'a'
2111 * @param b the second set
2112 * @param bCount the number of elements in the set 'b'
2113 * @param compare the sorting method for both sets
2114 * @param elementSize the sizeof the element in the two sorted sets
2115 * @param in_a called for items in set 'a' but not set 'b'
2116 * @param in_b called for items in set 'b' but not set 'a'
2117 * @param in_both called for items that are in both sets
2118 * @param userData user data passed along to in_a, in_b, and in_both
2119 */
2120static void
2121tr_set_compare( const void * va, size_t aCount,
2122                const void * vb, size_t bCount,
2123                int compare( const void * a, const void * b ),
2124                size_t elementSize,
2125                tr_set_func in_a_cb,
2126                tr_set_func in_b_cb,
2127                tr_set_func in_both_cb,
2128                void * userData )
2129{
2130    const uint8_t * a = va;
2131    const uint8_t * b = vb;
2132    const uint8_t * aend = a + elementSize * aCount;
2133    const uint8_t * bend = b + elementSize * bCount;
2134
2135    while( a != aend || b != bend )
2136    {
2137        if( a == aend )
2138        {
2139            ( *in_b_cb )( (void*)b, userData );
2140            b += elementSize;
2141        }
2142        else if( b == bend )
2143        {
2144            ( *in_a_cb )( (void*)a, userData );
2145            a += elementSize;
2146        }
2147        else
2148        {
2149            const int val = ( *compare )( a, b );
2150
2151            if( !val )
2152            {
2153                ( *in_both_cb )( (void*)a, userData );
2154                a += elementSize;
2155                b += elementSize;
2156            }
2157            else if( val < 0 )
2158            {
2159                ( *in_a_cb )( (void*)a, userData );
2160                a += elementSize;
2161            }
2162            else if( val > 0 )
2163            {
2164                ( *in_b_cb )( (void*)b, userData );
2165                b += elementSize;
2166            }
2167        }
2168    }
2169}
2170
2171
2172static void
2173sendPex( tr_peermsgs * msgs )
2174{
2175    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2176    {
2177        PexDiffs diffs;
2178        PexDiffs diffs6;
2179        tr_pex * newPex = NULL;
2180        tr_pex * newPex6 = NULL;
2181        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2182        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2183
2184        /* build the diffs */
2185        diffs.added = tr_new( tr_pex, newCount );
2186        diffs.addedCount = 0;
2187        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2188        diffs.droppedCount = 0;
2189        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2190        diffs.elementCount = 0;
2191        tr_set_compare( msgs->pex, msgs->pexCount,
2192                        newPex, newCount,
2193                        tr_pexCompare, sizeof( tr_pex ),
2194                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2195        diffs6.added = tr_new( tr_pex, newCount6 );
2196        diffs6.addedCount = 0;
2197        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2198        diffs6.droppedCount = 0;
2199        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2200        diffs6.elementCount = 0;
2201        tr_set_compare( msgs->pex6, msgs->pexCount6,
2202                        newPex6, newCount6,
2203                        tr_pexCompare, sizeof( tr_pex ),
2204                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2205        dbgmsg(
2206            msgs,
2207            "pex: old peer count %d+%d, new peer count %d+%d, "
2208            "added %d+%d, removed %d+%d",
2209            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2210            diffs.addedCount, diffs6.addedCount,
2211            diffs.droppedCount, diffs6.droppedCount );
2212
2213        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2214            !diffs6.droppedCount )
2215        {
2216            tr_free( diffs.elements );
2217            tr_free( diffs6.elements );
2218        }
2219        else
2220        {
2221            int  i;
2222            tr_benc val;
2223            char * benc;
2224            int bencLen;
2225            uint8_t * tmp, *walk;
2226            struct evbuffer * out = msgs->outMessages;
2227
2228            /* update peer */
2229            tr_free( msgs->pex );
2230            msgs->pex = diffs.elements;
2231            msgs->pexCount = diffs.elementCount;
2232            tr_free( msgs->pex6 );
2233            msgs->pex6 = diffs6.elements;
2234            msgs->pexCount6 = diffs6.elementCount;
2235
2236            /* build the pex payload */
2237            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2238                                         * speed vs. likelihood? */
2239
2240            if( diffs.addedCount > 0)
2241            {
2242                /* "added" */
2243                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2244                for( i = 0; i < diffs.addedCount; ++i ) {
2245                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2246                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2247                }
2248                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2249                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2250                tr_free( tmp );
2251
2252                /* "added.f"
2253                 * unset each holepunch flag because we don't support it. */
2254                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2255                for( i = 0; i < diffs.addedCount; ++i )
2256                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2257                assert( ( walk - tmp ) == diffs.addedCount );
2258                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2259                tr_free( tmp );
2260            }
2261
2262            if( diffs.droppedCount > 0 )
2263            {
2264                /* "dropped" */
2265                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2266                for( i = 0; i < diffs.droppedCount; ++i ) {
2267                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2268                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2269                }
2270                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2271                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2272                tr_free( tmp );
2273            }
2274
2275            if( diffs6.addedCount > 0 )
2276            {
2277                /* "added6" */
2278                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2279                for( i = 0; i < diffs6.addedCount; ++i ) {
2280                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2281                    walk += 16;
2282                    memcpy( walk, &diffs6.added[i].port, 2 );
2283                    walk += 2;
2284                }
2285                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2286                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2287                tr_free( tmp );
2288
2289                /* "added6.f"
2290                 * unset each holepunch flag because we don't support it. */
2291                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2292                for( i = 0; i < diffs6.addedCount; ++i )
2293                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2294                assert( ( walk - tmp ) == diffs6.addedCount );
2295                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2296                tr_free( tmp );
2297            }
2298
2299            if( diffs6.droppedCount > 0 )
2300            {
2301                /* "dropped6" */
2302                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2303                for( i = 0; i < diffs6.droppedCount; ++i ) {
2304                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2305                    walk += 16;
2306                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2307                    walk += 2;
2308                }
2309                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2310                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2311                tr_free( tmp );
2312            }
2313
2314            /* write the pex message */
2315            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2316            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2317            evbuffer_add_uint8 ( out, BT_LTEP );
2318            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2319            evbuffer_add       ( out, benc, bencLen );
2320            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2321            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2322            dbgOutMessageLen( msgs );
2323
2324            tr_free( benc );
2325            tr_bencFree( &val );
2326        }
2327
2328        /* cleanup */
2329        tr_free( diffs.added );
2330        tr_free( diffs.dropped );
2331        tr_free( newPex );
2332        tr_free( diffs6.added );
2333        tr_free( diffs6.dropped );
2334        tr_free( newPex6 );
2335
2336        /*msgs->clientSentPexAt = tr_time( );*/
2337    }
2338}
2339
2340static void
2341pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2342{
2343    struct tr_peermsgs * msgs = vmsgs;
2344
2345    sendPex( msgs );
2346
2347    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2348}
2349
2350/**
2351***
2352**/
2353
2354tr_peermsgs*
2355tr_peerMsgsNew( struct tr_torrent    * torrent,
2356                struct tr_peer       * peer,
2357                tr_peer_callback     * callback,
2358                void                 * callbackData )
2359{
2360    tr_peermsgs * m;
2361
2362    assert( peer );
2363    assert( peer->io );
2364
2365    m = tr_new0( tr_peermsgs, 1 );
2366    m->callback = callback;
2367    m->callbackData = callbackData;
2368    m->peer = peer;
2369    m->torrent = torrent;
2370    m->peer->clientIsChoked = 1;
2371    m->peer->peerIsChoked = 1;
2372    m->peer->clientIsInterested = 0;
2373    m->peer->peerIsInterested = 0;
2374    m->state = AWAITING_BT_LENGTH;
2375    m->outMessages = evbuffer_new( );
2376    m->outMessagesBatchedAt = 0;
2377    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2378    m->incoming.block = evbuffer_new( );
2379    m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2380    peer->msgs = m;
2381    tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2382
2383    if( tr_peerIoSupportsUTP( peer->io ) ) {
2384        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2385        tr_peerMgrSetUtpSupported( torrent, addr );
2386        tr_peerMgrSetUtpFailed( torrent, addr, false );
2387    }
2388
2389    if( tr_peerIoSupportsLTEP( peer->io ) )
2390        sendLtepHandshake( m );
2391
2392    tellPeerWhatWeHave( m );
2393
2394    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2395    {
2396        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2397        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2398        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2399            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2400        }
2401    }
2402
2403    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2404    updateDesiredRequestCount( m );
2405
2406    return m;
2407}
2408
2409void
2410tr_peerMsgsFree( tr_peermsgs* msgs )
2411{
2412    if( msgs )
2413    {
2414        event_free( msgs->pexTimer );
2415
2416        evbuffer_free( msgs->incoming.block );
2417        evbuffer_free( msgs->outMessages );
2418        tr_free( msgs->pex6 );
2419        tr_free( msgs->pex );
2420
2421        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2422        tr_free( msgs );
2423    }
2424}
Note: See TracBrowser for help on using the repository browser.