source: trunk/libtransmission/peer-msgs.c @ 12233

Last change on this file since 12233 was 12233, checked in by jordan, 11 years ago

(trunk) yet more "#include" tweaks

  • Property svn:keywords set to Date Rev Author Id
File size: 71.9 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 12233 2011-03-25 15:03:42Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event2/buffer.h>
21#include <event2/bufferevent.h>
22#include <event2/event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "cache.h"
27#include "completion.h"
28#include "crypto.h" /* tr_sha1() */
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-msgs.h"
32#include "session.h"
33#include "torrent.h"
34#include "torrent-magnet.h"
35#include "tr-dht.h"
36#include "utils.h"
37#include "version.h"
38
39/**
40***
41**/
42
43enum
44{
45    BT_CHOKE                = 0,
46    BT_UNCHOKE              = 1,
47    BT_INTERESTED           = 2,
48    BT_NOT_INTERESTED       = 3,
49    BT_HAVE                 = 4,
50    BT_BITFIELD             = 5,
51    BT_REQUEST              = 6,
52    BT_PIECE                = 7,
53    BT_CANCEL               = 8,
54    BT_PORT                 = 9,
55
56    BT_FEXT_SUGGEST         = 13,
57    BT_FEXT_HAVE_ALL        = 14,
58    BT_FEXT_HAVE_NONE       = 15,
59    BT_FEXT_REJECT          = 16,
60    BT_FEXT_ALLOWED_FAST    = 17,
61
62    BT_LTEP                 = 20,
63
64    LTEP_HANDSHAKE          = 0,
65
66    UT_PEX_ID               = 1,
67    UT_METADATA_ID          = 3,
68
69    MAX_PEX_PEER_COUNT      = 50,
70
71    MIN_CHOKE_PERIOD_SEC    = 10,
72
73    /* idle seconds before we send a keepalive */
74    KEEPALIVE_INTERVAL_SECS = 100,
75
76    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
77
78    REQQ                    = 512,
79
80    METADATA_REQQ           = 64,
81
82    /* used in lowering the outMessages queue period */
83    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
84    HIGH_PRIORITY_INTERVAL_SECS = 2,
85    LOW_PRIORITY_INTERVAL_SECS = 10,
86
87    /* number of pieces we'll allow in our fast set */
88    MAX_FAST_SET_SIZE = 3,
89
90    /* defined in BEP #9 */
91    METADATA_MSG_TYPE_REQUEST = 0,
92    METADATA_MSG_TYPE_DATA = 1,
93    METADATA_MSG_TYPE_REJECT = 2
94};
95
96enum
97{
98    AWAITING_BT_LENGTH,
99    AWAITING_BT_ID,
100    AWAITING_BT_MESSAGE,
101    AWAITING_BT_PIECE
102};
103
104/**
105***
106**/
107
108struct peer_request
109{
110    uint32_t    index;
111    uint32_t    offset;
112    uint32_t    length;
113};
114
115static void
116blockToReq( const tr_torrent     * tor,
117            tr_block_index_t       block,
118            struct peer_request  * setme )
119{
120    tr_torrentGetBlockLocation( tor, block, &setme->index,
121                                            &setme->offset,
122                                            &setme->length );
123}
124
125/**
126***
127**/
128
129/* this is raw, unchanged data from the peer regarding
130 * the current message that it's sending us. */
131struct tr_incoming
132{
133    uint8_t                id;
134    uint32_t               length; /* includes the +1 for id length */
135    struct peer_request    blockReq; /* metadata for incoming blocks */
136    struct evbuffer *      block; /* piece data for incoming blocks */
137};
138
139/**
140 * Low-level communication state information about a connected peer.
141 *
142 * This structure remembers the low-level protocol states that we're
143 * in with this peer, such as active requests, pex messages, and so on.
144 * Its fields are all private to peer-msgs.c.
145 *
146 * Data not directly involved with sending & receiving messages is
147 * stored in tr_peer, where it can be accessed by both peermsgs and
148 * the peer manager.
149 *
150 * @see struct peer_atom
151 * @see tr_peer
152 */
153struct tr_peermsgs
154{
155    bool            peerSupportsPex;
156    bool            peerSupportsMetadataXfer;
157    bool            clientSentLtepHandshake;
158    bool            peerSentLtepHandshake;
159
160    /*bool          haveFastSet;*/
161
162    int             desiredRequestCount;
163
164    int             prefetchCount;
165
166    /* how long the outMessages batch should be allowed to grow before
167     * it's flushed -- some messages (like requests >:) should be sent
168     * very quickly; others aren't as urgent. */
169    int8_t          outMessagesBatchPeriod;
170
171    uint8_t         state;
172    uint8_t         ut_pex_id;
173    uint8_t         ut_metadata_id;
174    uint16_t        pexCount;
175    uint16_t        pexCount6;
176
177    size_t          metadata_size_hint;
178#if 0
179    size_t                 fastsetSize;
180    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
181#endif
182
183    tr_peer *              peer;
184
185    tr_torrent *           torrent;
186
187    tr_peer_callback      * callback;
188    void                  * callbackData;
189
190    struct evbuffer *      outMessages; /* all the non-piece messages */
191
192    struct peer_request    peerAskedFor[REQQ];
193
194    int                    peerAskedForMetadata[METADATA_REQQ];
195    int                    peerAskedForMetadataCount;
196
197    tr_pex               * pex;
198    tr_pex               * pex6;
199
200    /*time_t                 clientSentPexAt;*/
201    time_t                 clientSentAnythingAt;
202
203    /* when we started batching the outMessages */
204    time_t                outMessagesBatchedAt;
205
206    struct tr_incoming    incoming;
207
208    /* if the peer supports the Extension Protocol in BEP 10 and
209       supplied a reqq argument, it's stored here. Otherwise, the
210       value is zero and should be ignored. */
211    int64_t               reqq;
212
213    struct event        * pexTimer;
214};
215
216/**
217***
218**/
219
220#if 0
221static tr_bitfield*
222getHave( const struct tr_peermsgs * msgs )
223{
224    if( msgs->peer->have == NULL )
225        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
226    return msgs->peer->have;
227}
228#endif
229
230static inline tr_session*
231getSession( struct tr_peermsgs * msgs )
232{
233    return msgs->torrent->session;
234}
235
236/**
237***
238**/
239
240static void
241myDebug( const char * file, int line,
242         const struct tr_peermsgs * msgs,
243         const char * fmt, ... )
244{
245    FILE * fp = tr_getLog( );
246
247    if( fp )
248    {
249        va_list           args;
250        char              timestr[64];
251        struct evbuffer * buf = evbuffer_new( );
252        char *            base = tr_basename( file );
253
254        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
255                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
256                             tr_torrentName( msgs->torrent ),
257                             tr_peerIoGetAddrStr( msgs->peer->io ),
258                             msgs->peer->client );
259        va_start( args, fmt );
260        evbuffer_add_vprintf( buf, fmt, args );
261        va_end( args );
262        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
263        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
264
265        tr_free( base );
266        evbuffer_free( buf );
267    }
268}
269
270#define dbgmsg( msgs, ... ) \
271    do { \
272        if( tr_deepLoggingIsActive( ) ) \
273            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
274    } while( 0 )
275
276/**
277***
278**/
279
280static void
281pokeBatchPeriod( tr_peermsgs * msgs, int interval )
282{
283    if( msgs->outMessagesBatchPeriod > interval )
284    {
285        msgs->outMessagesBatchPeriod = interval;
286        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
287    }
288}
289
290static void
291dbgOutMessageLen( tr_peermsgs * msgs )
292{
293    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
294}
295
296static void
297protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
298{
299    struct evbuffer * out = msgs->outMessages;
300
301    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
302
303    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
304    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
305    evbuffer_add_uint32( out, req->index );
306    evbuffer_add_uint32( out, req->offset );
307    evbuffer_add_uint32( out, req->length );
308
309    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
310    dbgOutMessageLen( msgs );
311}
312
313static void
314protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
315{
316    struct evbuffer * out = msgs->outMessages;
317
318    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
319    evbuffer_add_uint8 ( out, BT_REQUEST );
320    evbuffer_add_uint32( out, req->index );
321    evbuffer_add_uint32( out, req->offset );
322    evbuffer_add_uint32( out, req->length );
323
324    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
325    dbgOutMessageLen( msgs );
326    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
327}
328
329static void
330protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
331{
332    struct evbuffer * out = msgs->outMessages;
333
334    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
335    evbuffer_add_uint8 ( out, BT_CANCEL );
336    evbuffer_add_uint32( out, req->index );
337    evbuffer_add_uint32( out, req->offset );
338    evbuffer_add_uint32( out, req->length );
339
340    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
341    dbgOutMessageLen( msgs );
342    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
343}
344
345static void
346protocolSendPort(tr_peermsgs *msgs, uint16_t port)
347{
348    struct evbuffer * out = msgs->outMessages;
349
350    dbgmsg( msgs, "sending Port %u", port);
351    evbuffer_add_uint32( out, 3 );
352    evbuffer_add_uint8 ( out, BT_PORT );
353    evbuffer_add_uint16( out, port);
354}
355
356static void
357protocolSendHave( tr_peermsgs * msgs, uint32_t index )
358{
359    struct evbuffer * out = msgs->outMessages;
360
361    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
362    evbuffer_add_uint8 ( out, BT_HAVE );
363    evbuffer_add_uint32( out, index );
364
365    dbgmsg( msgs, "sending Have %u", index );
366    dbgOutMessageLen( msgs );
367    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
368}
369
370#if 0
371static void
372protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
373{
374    tr_peerIo       * io  = msgs->peer->io;
375    struct evbuffer * out = msgs->outMessages;
376
377    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
378
379    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
380    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
381    evbuffer_add_uint32( io, out, pieceIndex );
382
383    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
384    dbgOutMessageLen( msgs );
385}
386#endif
387
388static void
389protocolSendChoke( tr_peermsgs * msgs, int choke )
390{
391    struct evbuffer * out = msgs->outMessages;
392
393    evbuffer_add_uint32( out, sizeof( uint8_t ) );
394    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
395
396    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
397    dbgOutMessageLen( msgs );
398    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
399}
400
401static void
402protocolSendHaveAll( tr_peermsgs * msgs )
403{
404    struct evbuffer * out = msgs->outMessages;
405
406    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
407
408    evbuffer_add_uint32( out, sizeof( uint8_t ) );
409    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
410
411    dbgmsg( msgs, "sending HAVE_ALL..." );
412    dbgOutMessageLen( msgs );
413    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
414}
415
416static void
417protocolSendHaveNone( tr_peermsgs * msgs )
418{
419    struct evbuffer * out = msgs->outMessages;
420
421    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
422
423    evbuffer_add_uint32( out, sizeof( uint8_t ) );
424    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
425
426    dbgmsg( msgs, "sending HAVE_NONE..." );
427    dbgOutMessageLen( msgs );
428    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
429}
430
431/**
432***  EVENTS
433**/
434
435static void
436publish( tr_peermsgs * msgs, tr_peer_event * e )
437{
438    assert( msgs->peer );
439    assert( msgs->peer->msgs == msgs );
440
441    if( msgs->callback != NULL )
442        msgs->callback( msgs->peer, e, msgs->callbackData );
443}
444
445static void
446fireError( tr_peermsgs * msgs, int err )
447{
448    tr_peer_event e = TR_PEER_EVENT_INIT;
449    e.eventType = TR_PEER_ERROR;
450    e.err = err;
451    publish( msgs, &e );
452}
453
454static void
455fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
456{
457    tr_peer_event e = TR_PEER_EVENT_INIT;
458    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
459    e.pieceIndex = req->index;
460    e.offset = req->offset;
461    e.length = req->length;
462    publish( msgs, &e );
463}
464
465static void
466fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
467{
468    tr_peer_event e = TR_PEER_EVENT_INIT;
469    e.eventType = TR_PEER_CLIENT_GOT_REJ;
470    e.pieceIndex = req->index;
471    e.offset = req->offset;
472    e.length = req->length;
473    publish( msgs, &e );
474}
475
476static void
477fireGotChoke( tr_peermsgs * msgs )
478{
479    tr_peer_event e = TR_PEER_EVENT_INIT;
480    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
481    publish( msgs, &e );
482}
483
484static void
485fireClientGotHaveAll( tr_peermsgs * msgs )
486{
487    tr_peer_event e = TR_PEER_EVENT_INIT;
488    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
489    publish( msgs, &e );
490}
491
492static void
493fireClientGotHaveNone( tr_peermsgs * msgs )
494{
495    tr_peer_event e = TR_PEER_EVENT_INIT;
496    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
497    publish( msgs, &e );
498}
499
500static void
501fireClientGotData( tr_peermsgs * msgs, uint32_t length, int wasPieceData )
502{
503    tr_peer_event e = TR_PEER_EVENT_INIT;
504
505    e.length = length;
506    e.eventType = TR_PEER_CLIENT_GOT_DATA;
507    e.wasPieceData = wasPieceData;
508    publish( msgs, &e );
509}
510
511static void
512fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
513{
514    tr_peer_event e = TR_PEER_EVENT_INIT;
515    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
516    e.pieceIndex = pieceIndex;
517    publish( msgs, &e );
518}
519
520static void
521fireClientGotPort( tr_peermsgs * msgs, tr_port port )
522{
523    tr_peer_event e = TR_PEER_EVENT_INIT;
524    e.eventType = TR_PEER_CLIENT_GOT_PORT;
525    e.port = port;
526    publish( msgs, &e );
527}
528
529static void
530fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
531{
532    tr_peer_event e = TR_PEER_EVENT_INIT;
533    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
534    e.pieceIndex = pieceIndex;
535    publish( msgs, &e );
536}
537
538static void
539fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
540{
541    tr_peer_event e = TR_PEER_EVENT_INIT;
542    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
543    e.bitfield = bitfield;
544    publish( msgs, &e );
545}
546
547static void
548fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
549{
550    tr_peer_event e = TR_PEER_EVENT_INIT;
551    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
552    e.pieceIndex = index;
553    publish( msgs, &e );
554}
555
556static void
557firePeerGotData( tr_peermsgs * msgs, uint32_t length, bool wasPieceData )
558{
559    tr_peer_event e = TR_PEER_EVENT_INIT;
560
561    e.length = length;
562    e.eventType = TR_PEER_PEER_GOT_DATA;
563    e.wasPieceData = wasPieceData;
564
565    publish( msgs, &e );
566}
567
568/**
569***  ALLOWED FAST SET
570***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
571**/
572
573#if 0
574size_t
575tr_generateAllowedSet( tr_piece_index_t * setmePieces,
576                       size_t             desiredSetSize,
577                       size_t             pieceCount,
578                       const uint8_t    * infohash,
579                       const tr_address * addr )
580{
581    size_t setSize = 0;
582
583    assert( setmePieces );
584    assert( desiredSetSize <= pieceCount );
585    assert( desiredSetSize );
586    assert( pieceCount );
587    assert( infohash );
588    assert( addr );
589
590    if( addr->type == TR_AF_INET )
591    {
592        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
593        uint8_t x[SHA_DIGEST_LENGTH];
594
595        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
596        memcpy( w, &ui32, sizeof( uint32_t ) );
597        walk += sizeof( uint32_t );
598        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
599        walk += SHA_DIGEST_LENGTH;
600        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
601        assert( sizeof( w ) == walk-w );
602
603        while( setSize<desiredSetSize )
604        {
605            int i;
606            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
607            {
608                size_t k;
609                uint32_t j = i * 4;                                  /* (5) */
610                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
611                uint32_t index = y % pieceCount;                     /* (7) */
612
613                for( k=0; k<setSize; ++k )                           /* (8) */
614                    if( setmePieces[k] == index )
615                        break;
616
617                if( k == setSize )
618                    setmePieces[setSize++] = index;                  /* (9) */
619            }
620
621            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
622        }
623    }
624
625    return setSize;
626}
627
628static void
629updateFastSet( tr_peermsgs * msgs UNUSED )
630{
631    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
632    const int peerIsNeedy = msgs->peer->progress < 0.10;
633
634    if( fext && peerIsNeedy && !msgs->haveFastSet )
635    {
636        size_t i;
637        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
638        const tr_info * inf = &msgs->torrent->info;
639        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
640
641        /* build the fast set */
642        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
643        msgs->haveFastSet = 1;
644
645        /* send it to the peer */
646        for( i=0; i<msgs->fastsetSize; ++i )
647            protocolSendAllowedFast( msgs, msgs->fastset[i] );
648    }
649}
650#endif
651
652/**
653***  INTEREST
654**/
655
656static void
657sendInterest( tr_peermsgs * msgs, bool clientIsInterested )
658{
659    struct evbuffer * out = msgs->outMessages;
660
661    assert( msgs );
662    assert( tr_isBool( clientIsInterested ) );
663
664    msgs->peer->clientIsInterested = clientIsInterested;
665    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
666    evbuffer_add_uint32( out, sizeof( uint8_t ) );
667    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
668
669    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
670    dbgOutMessageLen( msgs );
671}
672
673static void
674updateInterest( tr_peermsgs * msgs UNUSED )
675{
676    /* FIXME -- might need to poke the mgr on startup */
677}
678
679void
680tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
681{
682    assert( tr_isBool( isInterested ) );
683
684    if( isInterested != msgs->peer->clientIsInterested )
685        sendInterest( msgs, isInterested );
686}
687
688static bool
689popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
690{
691    if( msgs->peerAskedForMetadataCount == 0 )
692        return false;
693
694    *piece = msgs->peerAskedForMetadata[0];
695
696    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
697                               msgs->peerAskedForMetadataCount-- );
698
699    return true;
700}
701
702static bool
703popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
704{
705    if( msgs->peer->pendingReqsToClient == 0 )
706        return false;
707
708    *setme = msgs->peerAskedFor[0];
709
710    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
711                               msgs->peer->pendingReqsToClient-- );
712
713    return true;
714}
715
716static void
717cancelAllRequestsToClient( tr_peermsgs * msgs )
718{
719    struct peer_request req;
720    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
721
722    while( popNextRequest( msgs, &req ))
723        if( mustSendCancel )
724            protocolSendReject( msgs, &req );
725}
726
727void
728tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
729{
730    const time_t now = tr_time( );
731    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
732
733    assert( msgs );
734    assert( msgs->peer );
735    assert( choke == 0 || choke == 1 );
736
737    if( msgs->peer->chokeChangedAt > fibrillationTime )
738    {
739        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
740    }
741    else if( msgs->peer->peerIsChoked != choke )
742    {
743        msgs->peer->peerIsChoked = choke;
744        if( choke )
745            cancelAllRequestsToClient( msgs );
746        protocolSendChoke( msgs, choke );
747        msgs->peer->chokeChangedAt = now;
748    }
749}
750
751/**
752***
753**/
754
755void
756tr_peerMsgsHave( tr_peermsgs * msgs, uint32_t index )
757{
758    protocolSendHave( msgs, index );
759
760    /* since we have more pieces now, we might not be interested in this peer */
761    updateInterest( msgs );
762}
763
764/**
765***
766**/
767
768static bool
769reqIsValid( const tr_peermsgs * peer,
770            uint32_t            index,
771            uint32_t            offset,
772            uint32_t            length )
773{
774    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
775}
776
777static bool
778requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
779{
780    return reqIsValid( msgs, req->index, req->offset, req->length );
781}
782
783void
784tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
785{
786    struct peer_request req;
787/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
788    blockToReq( msgs->torrent, block, &req );
789    protocolSendCancel( msgs, &req );
790}
791
792/**
793***
794**/
795
796static void
797sendLtepHandshake( tr_peermsgs * msgs )
798{
799    tr_benc val, *m;
800    char * buf;
801    int len;
802    bool allow_pex;
803    bool allow_metadata_xfer;
804    struct evbuffer * out = msgs->outMessages;
805    const unsigned char * ipv6 = tr_globalIPv6();
806
807    if( msgs->clientSentLtepHandshake )
808        return;
809
810    dbgmsg( msgs, "sending an ltep handshake" );
811    msgs->clientSentLtepHandshake = 1;
812
813    /* decide if we want to advertise metadata xfer support (BEP 9) */
814    if( tr_torrentIsPrivate( msgs->torrent ) )
815        allow_metadata_xfer = 0;
816    else
817        allow_metadata_xfer = 1;
818
819    /* decide if we want to advertise pex support */
820    if( !tr_torrentAllowsPex( msgs->torrent ) )
821        allow_pex = 0;
822    else if( msgs->peerSentLtepHandshake )
823        allow_pex = msgs->peerSupportsPex ? 1 : 0;
824    else
825        allow_pex = 1;
826
827    tr_bencInitDict( &val, 8 );
828    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
829    if( ipv6 != NULL )
830        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
831    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
832                            && ( msgs->torrent->infoDictLength > 0 ) )
833        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
834    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
835    tr_bencDictAddInt( &val, "reqq", REQQ );
836    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
837    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
838    m  = tr_bencDictAddDict( &val, "m", 2 );
839    if( allow_metadata_xfer )
840        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
841    if( allow_pex )
842        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
843
844    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
845
846    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
847    evbuffer_add_uint8 ( out, BT_LTEP );
848    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
849    evbuffer_add       ( out, buf, len );
850    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
851    dbgOutMessageLen( msgs );
852
853    /* cleanup */
854    tr_bencFree( &val );
855    tr_free( buf );
856}
857
858static void
859parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
860{
861    int64_t   i;
862    tr_benc   val, * sub;
863    uint8_t * tmp = tr_new( uint8_t, len );
864    const uint8_t *addr;
865    size_t addr_len;
866    tr_pex pex;
867    int8_t seedProbability = -1;
868
869    memset( &pex, 0, sizeof( tr_pex ) );
870
871    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
872    msgs->peerSentLtepHandshake = 1;
873
874    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
875    {
876        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
877        tr_free( tmp );
878        return;
879    }
880
881    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
882
883    /* does the peer prefer encrypted connections? */
884    if( tr_bencDictFindInt( &val, "e", &i ) ) {
885        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
886                                              : ENCRYPTION_PREFERENCE_NO;
887        if( i )
888            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
889    }
890
891    /* check supported messages for utorrent pex */
892    msgs->peerSupportsPex = 0;
893    msgs->peerSupportsMetadataXfer = 0;
894
895    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
896        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
897            msgs->peerSupportsPex = i != 0;
898            msgs->ut_pex_id = (uint8_t) i;
899            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
900        }
901        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
902            msgs->peerSupportsMetadataXfer = i != 0;
903            msgs->ut_metadata_id = (uint8_t) i;
904            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
905        }
906        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
907            /* Mysterious µTorrent extension that we don't grok.  However,
908               it implies support for µTP, so use it to indicate that. */
909            tr_peerMgrSetUtpFailed( msgs->torrent,
910                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
911                                    false );
912        }
913    }
914
915    /* look for metainfo size (BEP 9) */
916    if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) {
917        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
918        msgs->metadata_size_hint = (size_t) i;
919    }
920
921    /* look for upload_only (BEP 21) */
922    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
923        seedProbability = i==0 ? 0 : 100;
924
925    /* get peer's listening port */
926    if( tr_bencDictFindInt( &val, "p", &i ) ) {
927        pex.port = htons( (uint16_t)i );
928        fireClientGotPort( msgs, pex.port );
929        dbgmsg( msgs, "peer's port is now %d", (int)i );
930    }
931
932    if( tr_peerIoIsIncoming( msgs->peer->io )
933        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
934        && ( addr_len == 4 ) )
935    {
936        pex.addr.type = TR_AF_INET;
937        memcpy( &pex.addr.addr.addr4, addr, 4 );
938        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
939    }
940
941    if( tr_peerIoIsIncoming( msgs->peer->io )
942        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
943        && ( addr_len == 16 ) )
944    {
945        pex.addr.type = TR_AF_INET6;
946        memcpy( &pex.addr.addr.addr6, addr, 16 );
947        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
948    }
949
950    /* get peer's maximum request queue size */
951    if( tr_bencDictFindInt( &val, "reqq", &i ) )
952        msgs->reqq = i;
953
954    tr_bencFree( &val );
955    tr_free( tmp );
956}
957
958static void
959parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
960{
961    tr_benc dict;
962    char * msg_end;
963    char * benc_end;
964    int64_t msg_type = -1;
965    int64_t piece = -1;
966    int64_t total_size = 0;
967    uint8_t * tmp = tr_new( uint8_t, msglen );
968
969    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
970    msg_end = (char*)tmp + msglen;
971
972    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
973    {
974        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
975        tr_bencDictFindInt( &dict, "piece", &piece );
976        tr_bencDictFindInt( &dict, "total_size", &total_size );
977        tr_bencFree( &dict );
978    }
979
980    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
981            (int)msg_type, (int)piece, (int)total_size );
982
983    if( msg_type == METADATA_MSG_TYPE_REJECT )
984    {
985        /* NOOP */
986    }
987
988    if( ( msg_type == METADATA_MSG_TYPE_DATA )
989        && ( !tr_torrentHasMetadata( msgs->torrent ) )
990        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
991        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
992    {
993        const int pieceLen = msg_end - benc_end;
994        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
995    }
996
997    if( msg_type == METADATA_MSG_TYPE_REQUEST )
998    {
999        if( ( piece >= 0 )
1000            && tr_torrentHasMetadata( msgs->torrent )
1001            && !tr_torrentIsPrivate( msgs->torrent )
1002            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1003        {
1004            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1005        }
1006        else
1007        {
1008            tr_benc tmp;
1009            int payloadLen;
1010            char * payload;
1011            struct evbuffer * out = msgs->outMessages;
1012
1013            /* build the rejection message */
1014            tr_bencInitDict( &tmp, 2 );
1015            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1016            tr_bencDictAddInt( &tmp, "piece", piece );
1017            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1018            tr_bencFree( &tmp );
1019
1020            /* write it out as a LTEP message to our outMessages buffer */
1021            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1022            evbuffer_add_uint8 ( out, BT_LTEP );
1023            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1024            evbuffer_add       ( out, payload, payloadLen );
1025            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1026            dbgOutMessageLen( msgs );
1027
1028            tr_free( payload );
1029        }
1030    }
1031
1032    tr_free( tmp );
1033}
1034
1035static void
1036parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1037{
1038    int loaded = 0;
1039    uint8_t * tmp = tr_new( uint8_t, msglen );
1040    tr_benc val;
1041    tr_torrent * tor = msgs->torrent;
1042    const uint8_t * added;
1043    size_t added_len;
1044
1045    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1046
1047    if( tr_torrentAllowsPex( tor )
1048      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1049    {
1050        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1051        {
1052            tr_pex * pex;
1053            size_t i, n;
1054            size_t added_f_len = 0;
1055            const uint8_t * added_f = NULL;
1056
1057            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1058            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1059
1060            n = MIN( n, MAX_PEX_PEER_COUNT );
1061            for( i=0; i<n; ++i )
1062            {
1063                int seedProbability = -1;
1064                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1065                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1066            }
1067
1068            tr_free( pex );
1069        }
1070
1071        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1072        {
1073            tr_pex * pex;
1074            size_t i, n;
1075            size_t added_f_len = 0;
1076            const uint8_t * added_f = NULL;
1077
1078            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1079            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1080
1081            n = MIN( n, MAX_PEX_PEER_COUNT );
1082            for( i=0; i<n; ++i )
1083            {
1084                int seedProbability = -1;
1085                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1086                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1087            }
1088
1089            tr_free( pex );
1090        }
1091    }
1092
1093    if( loaded )
1094        tr_bencFree( &val );
1095    tr_free( tmp );
1096}
1097
1098static void sendPex( tr_peermsgs * msgs );
1099
1100static void
1101parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1102{
1103    uint8_t ltep_msgid;
1104
1105    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1106    msglen--;
1107
1108    if( ltep_msgid == LTEP_HANDSHAKE )
1109    {
1110        dbgmsg( msgs, "got ltep handshake" );
1111        parseLtepHandshake( msgs, msglen, inbuf );
1112        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1113        {
1114            sendLtepHandshake( msgs );
1115            sendPex( msgs );
1116        }
1117    }
1118    else if( ltep_msgid == UT_PEX_ID )
1119    {
1120        dbgmsg( msgs, "got ut pex" );
1121        msgs->peerSupportsPex = 1;
1122        parseUtPex( msgs, msglen, inbuf );
1123    }
1124    else if( ltep_msgid == UT_METADATA_ID )
1125    {
1126        dbgmsg( msgs, "got ut metadata" );
1127        msgs->peerSupportsMetadataXfer = 1;
1128        parseUtMetadata( msgs, msglen, inbuf );
1129    }
1130    else
1131    {
1132        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1133        evbuffer_drain( inbuf, msglen );
1134    }
1135}
1136
1137static int
1138readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1139{
1140    uint32_t len;
1141
1142    if( inlen < sizeof( len ) )
1143        return READ_LATER;
1144
1145    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1146
1147    if( len == 0 ) /* peer sent us a keepalive message */
1148        dbgmsg( msgs, "got KeepAlive" );
1149    else
1150    {
1151        msgs->incoming.length = len;
1152        msgs->state = AWAITING_BT_ID;
1153    }
1154
1155    return READ_NOW;
1156}
1157
1158static int readBtMessage( tr_peermsgs *, struct evbuffer *, size_t );
1159
1160static int
1161readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1162{
1163    uint8_t id;
1164
1165    if( inlen < sizeof( uint8_t ) )
1166        return READ_LATER;
1167
1168    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1169    msgs->incoming.id = id;
1170    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1171
1172    if( id == BT_PIECE )
1173    {
1174        msgs->state = AWAITING_BT_PIECE;
1175        return READ_NOW;
1176    }
1177    else if( msgs->incoming.length != 1 )
1178    {
1179        msgs->state = AWAITING_BT_MESSAGE;
1180        return READ_NOW;
1181    }
1182    else return readBtMessage( msgs, inbuf, inlen - 1 );
1183}
1184
1185static void
1186updatePeerProgress( tr_peermsgs * msgs )
1187{
1188    tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1189
1190    /*updateFastSet( msgs );*/
1191    updateInterest( msgs );
1192}
1193
1194static void
1195prefetchPieces( tr_peermsgs *msgs )
1196{
1197    int i;
1198
1199    if( !getSession(msgs)->isPrefetchEnabled )
1200        return;
1201
1202    /* Maintain 12 prefetched blocks per unchoked peer */
1203    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1204    {
1205        const struct peer_request * req = msgs->peerAskedFor + i;
1206        if( requestIsValid( msgs, req ) )
1207        {
1208            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1209            ++msgs->prefetchCount;
1210        }
1211    }
1212}
1213
1214static void
1215peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1216{
1217    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1218    const int reqIsValid = requestIsValid( msgs, req );
1219    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1220    const int peerIsChoked = msgs->peer->peerIsChoked;
1221
1222    int allow = false;
1223
1224    if( !reqIsValid )
1225        dbgmsg( msgs, "rejecting an invalid request." );
1226    else if( !clientHasPiece )
1227        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1228    else if( peerIsChoked )
1229        dbgmsg( msgs, "rejecting request from choked peer" );
1230    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1231        dbgmsg( msgs, "rejecting request ... reqq is full" );
1232    else
1233        allow = true;
1234
1235    if( allow ) {
1236        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1237        prefetchPieces( msgs );
1238    } else if( fext ) {
1239        protocolSendReject( msgs, req );
1240    }
1241}
1242
1243static bool
1244messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1245{
1246    switch( id )
1247    {
1248        case BT_CHOKE:
1249        case BT_UNCHOKE:
1250        case BT_INTERESTED:
1251        case BT_NOT_INTERESTED:
1252        case BT_FEXT_HAVE_ALL:
1253        case BT_FEXT_HAVE_NONE:
1254            return len == 1;
1255
1256        case BT_HAVE:
1257        case BT_FEXT_SUGGEST:
1258        case BT_FEXT_ALLOWED_FAST:
1259            return len == 5;
1260
1261        case BT_BITFIELD:
1262            if( tr_torrentHasMetadata( msg->torrent ) )
1263                return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1264            /* we don't know the piece count yet,
1265               so we can only guess whether to send true or false */
1266            if( msg->metadata_size_hint > 0 )
1267                return len <= msg->metadata_size_hint;
1268            return true;
1269
1270        case BT_REQUEST:
1271        case BT_CANCEL:
1272        case BT_FEXT_REJECT:
1273            return len == 13;
1274
1275        case BT_PIECE:
1276            return len > 9 && len <= 16393;
1277
1278        case BT_PORT:
1279            return len == 3;
1280
1281        case BT_LTEP:
1282            return len >= 2;
1283
1284        default:
1285            return false;
1286    }
1287}
1288
1289static int clientGotBlock( tr_peermsgs *               msgs,
1290                           struct evbuffer *           block,
1291                           const struct peer_request * req );
1292
1293static int
1294readBtPiece( tr_peermsgs      * msgs,
1295             struct evbuffer  * inbuf,
1296             size_t             inlen,
1297             size_t           * setme_piece_bytes_read )
1298{
1299    struct peer_request * req = &msgs->incoming.blockReq;
1300
1301    assert( evbuffer_get_length( inbuf ) >= inlen );
1302    dbgmsg( msgs, "In readBtPiece" );
1303
1304    if( !req->length )
1305    {
1306        if( inlen < 8 )
1307            return READ_LATER;
1308
1309        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1310        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1311        req->length = msgs->incoming.length - 9;
1312        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1313        return READ_NOW;
1314    }
1315    else
1316    {
1317        int err;
1318
1319        /* read in another chunk of data */
1320        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1321        size_t n = MIN( nLeft, inlen );
1322        size_t i = n;
1323        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1324        const size_t buflen = SESSION_BUFFER_SIZE;
1325
1326        while( i > 0 )
1327        {
1328            const size_t thisPass = MIN( i, buflen );
1329            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1330            evbuffer_add( msgs->incoming.block, buf, thisPass );
1331            i -= thisPass;
1332        }
1333
1334        tr_sessionReleaseBuffer( getSession( msgs ) );
1335        buf = NULL;
1336
1337        fireClientGotData( msgs, n, true );
1338        *setme_piece_bytes_read += n;
1339        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1340               n, req->index, req->offset, req->length,
1341               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1342        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1343            return READ_LATER;
1344
1345        /* we've got the whole block ... process it */
1346        err = clientGotBlock( msgs, msgs->incoming.block, req );
1347
1348        /* cleanup */
1349        evbuffer_free( msgs->incoming.block );
1350        msgs->incoming.block = evbuffer_new( );
1351        req->length = 0;
1352        msgs->state = AWAITING_BT_LENGTH;
1353        return err ? READ_ERR : READ_NOW;
1354    }
1355}
1356
1357static void updateDesiredRequestCount( tr_peermsgs * msgs );
1358
1359static int
1360readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1361{
1362    uint32_t      ui32;
1363    uint32_t      msglen = msgs->incoming.length;
1364    const uint8_t id = msgs->incoming.id;
1365#ifndef NDEBUG
1366    const size_t  startBufLen = evbuffer_get_length( inbuf );
1367#endif
1368    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1369
1370    --msglen; /* id length */
1371
1372    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1373
1374    if( inlen < msglen )
1375        return READ_LATER;
1376
1377    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1378    {
1379        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1380        fireError( msgs, EMSGSIZE );
1381        return READ_ERR;
1382    }
1383
1384    switch( id )
1385    {
1386        case BT_CHOKE:
1387            dbgmsg( msgs, "got Choke" );
1388            msgs->peer->clientIsChoked = 1;
1389            if( !fext )
1390                fireGotChoke( msgs );
1391            break;
1392
1393        case BT_UNCHOKE:
1394            dbgmsg( msgs, "got Unchoke" );
1395            msgs->peer->clientIsChoked = 0;
1396            updateDesiredRequestCount( msgs );
1397            break;
1398
1399        case BT_INTERESTED:
1400            dbgmsg( msgs, "got Interested" );
1401            msgs->peer->peerIsInterested = 1;
1402            break;
1403
1404        case BT_NOT_INTERESTED:
1405            dbgmsg( msgs, "got Not Interested" );
1406            msgs->peer->peerIsInterested = 0;
1407            break;
1408
1409        case BT_HAVE:
1410            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1411            dbgmsg( msgs, "got Have: %u", ui32 );
1412            if( tr_torrentHasMetadata( msgs->torrent )
1413                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1414            {
1415                fireError( msgs, ERANGE );
1416                return READ_ERR;
1417            }
1418
1419            /* a peer can send the same HAVE message twice... */
1420            if( !tr_bitsetHas( &msgs->peer->have, ui32 ) ) {
1421                tr_bitsetAdd( &msgs->peer->have, ui32 );
1422                fireClientGotHave( msgs, ui32 );
1423            }
1424            updatePeerProgress( msgs );
1425            break;
1426
1427        case BT_BITFIELD: {
1428            tr_bitfield tmp = TR_BITFIELD_INIT;
1429            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1430                                  ? msgs->torrent->info.pieceCount
1431                                  : msglen * 8;
1432            tr_bitfieldConstruct( &tmp, bitCount );
1433            dbgmsg( msgs, "got a bitfield" );
1434            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp.bits, msglen );
1435            tr_bitsetSetBitfield( &msgs->peer->have, &tmp );
1436            fireClientGotBitfield( msgs, &tmp );
1437            tr_bitfieldDestruct( &tmp );
1438            updatePeerProgress( msgs );
1439            break;
1440        }
1441
1442        case BT_REQUEST:
1443        {
1444            struct peer_request r;
1445            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1446            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1447            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1448            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1449            peerMadeRequest( msgs, &r );
1450            break;
1451        }
1452
1453        case BT_CANCEL:
1454        {
1455            int i;
1456            struct peer_request r;
1457            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1458            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1459            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1460            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1461            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1462
1463            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1464                const struct peer_request * req = msgs->peerAskedFor + i;
1465                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1466                    break;
1467            }
1468
1469            if( i < msgs->peer->pendingReqsToClient )
1470                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1471                                           msgs->peer->pendingReqsToClient-- );
1472            break;
1473        }
1474
1475        case BT_PIECE:
1476            assert( 0 ); /* handled elsewhere! */
1477            break;
1478
1479        case BT_PORT:
1480            dbgmsg( msgs, "Got a BT_PORT" );
1481            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1482            if( msgs->peer->dht_port > 0 )
1483                tr_dhtAddNode( getSession(msgs),
1484                               tr_peerAddress( msgs->peer ),
1485                               msgs->peer->dht_port, 0 );
1486            break;
1487
1488        case BT_FEXT_SUGGEST:
1489            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1490            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1491            if( fext )
1492                fireClientGotSuggest( msgs, ui32 );
1493            else {
1494                fireError( msgs, EMSGSIZE );
1495                return READ_ERR;
1496            }
1497            break;
1498
1499        case BT_FEXT_ALLOWED_FAST:
1500            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1501            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1502            if( fext )
1503                fireClientGotAllowedFast( msgs, ui32 );
1504            else {
1505                fireError( msgs, EMSGSIZE );
1506                return READ_ERR;
1507            }
1508            break;
1509
1510        case BT_FEXT_HAVE_ALL:
1511            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1512            if( fext ) {
1513                tr_bitsetSetHaveAll( &msgs->peer->have );
1514                fireClientGotHaveAll( msgs );
1515                updatePeerProgress( msgs );
1516            } else {
1517                fireError( msgs, EMSGSIZE );
1518                return READ_ERR;
1519            }
1520            break;
1521
1522        case BT_FEXT_HAVE_NONE:
1523            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1524            if( fext ) {
1525                tr_bitsetSetHaveNone( &msgs->peer->have );
1526                fireClientGotHaveNone( msgs );
1527                updatePeerProgress( msgs );
1528            } else {
1529                fireError( msgs, EMSGSIZE );
1530                return READ_ERR;
1531            }
1532            break;
1533
1534        case BT_FEXT_REJECT:
1535        {
1536            struct peer_request r;
1537            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1538            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1539            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1540            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1541            if( fext )
1542                fireGotRej( msgs, &r );
1543            else {
1544                fireError( msgs, EMSGSIZE );
1545                return READ_ERR;
1546            }
1547            break;
1548        }
1549
1550        case BT_LTEP:
1551            dbgmsg( msgs, "Got a BT_LTEP" );
1552            parseLtep( msgs, msglen, inbuf );
1553            break;
1554
1555        default:
1556            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1557            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1558            break;
1559    }
1560
1561    assert( msglen + 1 == msgs->incoming.length );
1562    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1563
1564    msgs->state = AWAITING_BT_LENGTH;
1565    return READ_NOW;
1566}
1567
1568static void
1569addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1570{
1571    if( !msgs->peer->blame )
1572         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1573    tr_bitfieldAdd( msgs->peer->blame, index );
1574}
1575
1576/* returns 0 on success, or an errno on failure */
1577static int
1578clientGotBlock( tr_peermsgs                * msgs,
1579                struct evbuffer            * data,
1580                const struct peer_request  * req )
1581{
1582    int err;
1583    tr_torrent * tor = msgs->torrent;
1584    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1585
1586    assert( msgs );
1587    assert( req );
1588
1589    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1590        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1591                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1592        return EMSGSIZE;
1593    }
1594
1595    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1596
1597    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1598        dbgmsg( msgs, "we didn't ask for this message..." );
1599        return 0;
1600    }
1601    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1602        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1603        return 0;
1604    }
1605
1606    /**
1607    ***  Save the block
1608    **/
1609
1610    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1611        return err;
1612
1613    addPeerToBlamefield( msgs, req->index );
1614    fireGotBlock( msgs, req );
1615    return 0;
1616}
1617
1618static int peerPulse( void * vmsgs );
1619
1620static void
1621didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1622{
1623    tr_peermsgs * msgs = vmsgs;
1624    firePeerGotData( msgs, bytesWritten, wasPieceData );
1625
1626    if ( tr_isPeerIo( io ) && io->userData )
1627        peerPulse( msgs );
1628}
1629
1630static ReadState
1631canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1632{
1633    ReadState         ret;
1634    tr_peermsgs *     msgs = vmsgs;
1635    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1636    const size_t      inlen = evbuffer_get_length( in );
1637
1638    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1639
1640    if( !inlen )
1641    {
1642        ret = READ_LATER;
1643    }
1644    else if( msgs->state == AWAITING_BT_PIECE )
1645    {
1646        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1647    }
1648    else switch( msgs->state )
1649    {
1650        case AWAITING_BT_LENGTH:
1651            ret = readBtLength ( msgs, in, inlen ); break;
1652
1653        case AWAITING_BT_ID:
1654            ret = readBtId     ( msgs, in, inlen ); break;
1655
1656        case AWAITING_BT_MESSAGE:
1657            ret = readBtMessage( msgs, in, inlen ); break;
1658
1659        default:
1660            ret = READ_ERR;
1661            assert( 0 );
1662    }
1663
1664    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1665
1666    /* log the raw data that was read */
1667    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1668        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), false );
1669
1670    return ret;
1671}
1672
1673int
1674tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1675{
1676    if( msgs->state != AWAITING_BT_PIECE )
1677        return false;
1678
1679    return block == _tr_block( msgs->torrent,
1680                               msgs->incoming.blockReq.index,
1681                               msgs->incoming.blockReq.offset );
1682}
1683
1684/**
1685***
1686**/
1687
1688static void
1689updateDesiredRequestCount( tr_peermsgs * msgs )
1690{
1691    const tr_torrent * const torrent = msgs->torrent;
1692
1693    if( tr_torrentIsSeed( msgs->torrent ) )
1694    {
1695        msgs->desiredRequestCount = 0;
1696    }
1697    else if( msgs->peer->clientIsChoked )
1698    {
1699        msgs->desiredRequestCount = 0;
1700    }
1701    else if( !msgs->peer->clientIsInterested )
1702    {
1703        msgs->desiredRequestCount = 0;
1704    }
1705    else
1706    {
1707        int estimatedBlocksInPeriod;
1708        int rate_Bps;
1709        int irate_Bps;
1710        const int floor = 4;
1711        const int seconds = REQUEST_BUF_SECS;
1712        const uint64_t now = tr_time_msec( );
1713
1714        /* Get the rate limit we should use.
1715         * FIXME: this needs to consider all the other peers as well... */
1716        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1717        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1718            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1719
1720        /* honor the session limits, if enabled */
1721        if( tr_torrentUsesSessionLimits( torrent ) )
1722            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1723                rate_Bps = MIN( rate_Bps, irate_Bps );
1724
1725        /* use this desired rate to figure out how
1726         * many requests we should send to this peer */
1727        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1728        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1729
1730        /* honor the peer's maximum request count, if specified */
1731        if( msgs->reqq > 0 )
1732            if( msgs->desiredRequestCount > msgs->reqq )
1733                msgs->desiredRequestCount = msgs->reqq;
1734    }
1735}
1736
1737static void
1738updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1739{
1740    int piece;
1741
1742    if( msgs->peerSupportsMetadataXfer
1743        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1744    {
1745        tr_benc tmp;
1746        int payloadLen;
1747        char * payload;
1748        struct evbuffer * out = msgs->outMessages;
1749
1750        /* build the data message */
1751        tr_bencInitDict( &tmp, 3 );
1752        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1753        tr_bencDictAddInt( &tmp, "piece", piece );
1754        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1755        tr_bencFree( &tmp );
1756
1757        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1758
1759        /* write it out as a LTEP message to our outMessages buffer */
1760        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1761        evbuffer_add_uint8 ( out, BT_LTEP );
1762        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1763        evbuffer_add       ( out, payload, payloadLen );
1764        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1765        dbgOutMessageLen( msgs );
1766
1767        tr_free( payload );
1768    }
1769}
1770
1771static void
1772updateBlockRequests( tr_peermsgs * msgs )
1773{
1774    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1775        && ( msgs->desiredRequestCount > 0 )
1776        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1777    {
1778        int i;
1779        int n;
1780        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1781        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1782
1783        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1784
1785        for( i=0; i<n; ++i )
1786        {
1787            struct peer_request req;
1788            blockToReq( msgs->torrent, blocks[i], &req );
1789            protocolSendRequest( msgs, &req );
1790        }
1791
1792        tr_free( blocks );
1793    }
1794}
1795
1796static size_t
1797fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1798{
1799    int piece;
1800    size_t bytesWritten = 0;
1801    struct peer_request req;
1802    const bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1803    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1804
1805    /**
1806    ***  Protocol messages
1807    **/
1808
1809    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1810    {
1811        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1812        msgs->outMessagesBatchedAt = now;
1813    }
1814    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1815    {
1816        const size_t len = evbuffer_get_length( msgs->outMessages );
1817        /* flush the protocol messages */
1818        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1819        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, false );
1820        msgs->clientSentAnythingAt = now;
1821        msgs->outMessagesBatchedAt = 0;
1822        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1823        bytesWritten +=  len;
1824    }
1825
1826    /**
1827    ***  Metadata Pieces
1828    **/
1829
1830    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1831        && popNextMetadataRequest( msgs, &piece ) )
1832    {
1833        char * data;
1834        int dataLen;
1835        bool ok = false;
1836
1837        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1838        if( ( dataLen > 0 ) && ( data != NULL ) )
1839        {
1840            tr_benc tmp;
1841            int payloadLen;
1842            char * payload;
1843            struct evbuffer * out = msgs->outMessages;
1844
1845            /* build the data message */
1846            tr_bencInitDict( &tmp, 3 );
1847            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1848            tr_bencDictAddInt( &tmp, "piece", piece );
1849            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1850            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1851            tr_bencFree( &tmp );
1852
1853            /* write it out as a LTEP message to our outMessages buffer */
1854            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1855            evbuffer_add_uint8 ( out, BT_LTEP );
1856            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1857            evbuffer_add       ( out, payload, payloadLen );
1858            evbuffer_add       ( out, data, dataLen );
1859            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1860            dbgOutMessageLen( msgs );
1861
1862            tr_free( payload );
1863            tr_free( data );
1864
1865            ok = true;
1866        }
1867
1868        if( !ok ) /* send a rejection message */
1869        {
1870            tr_benc tmp;
1871            int payloadLen;
1872            char * payload;
1873            struct evbuffer * out = msgs->outMessages;
1874
1875            /* build the rejection message */
1876            tr_bencInitDict( &tmp, 2 );
1877            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1878            tr_bencDictAddInt( &tmp, "piece", piece );
1879            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1880            tr_bencFree( &tmp );
1881
1882            /* write it out as a LTEP message to our outMessages buffer */
1883            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1884            evbuffer_add_uint8 ( out, BT_LTEP );
1885            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1886            evbuffer_add       ( out, payload, payloadLen );
1887            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1888            dbgOutMessageLen( msgs );
1889
1890            tr_free( payload );
1891        }
1892    }
1893
1894    /**
1895    ***  Data Blocks
1896    **/
1897
1898    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1899        && popNextRequest( msgs, &req ) )
1900    {
1901        --msgs->prefetchCount;
1902
1903        if( requestIsValid( msgs, &req )
1904            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1905        {
1906            int err;
1907            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1908            struct evbuffer * out;
1909            struct evbuffer_iovec iovec[1];
1910
1911            out = evbuffer_new( );
1912            evbuffer_expand( out, msglen );
1913
1914            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1915            evbuffer_add_uint8 ( out, BT_PIECE );
1916            evbuffer_add_uint32( out, req.index );
1917            evbuffer_add_uint32( out, req.offset );
1918
1919            evbuffer_reserve_space( out, req.length, iovec, 1 );
1920            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1921            iovec[0].iov_len = req.length;
1922            evbuffer_commit_space( out, iovec, 1 );
1923
1924            /* check the piece if it needs checking... */
1925            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1926                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1927                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1928
1929            if( err )
1930            {
1931                if( fext )
1932                    protocolSendReject( msgs, &req );
1933            }
1934            else
1935            {
1936                const size_t n = evbuffer_get_length( out );
1937                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1938                assert( n == msglen );
1939                tr_peerIoWriteBuf( msgs->peer->io, out, true );
1940                bytesWritten += n;
1941                msgs->clientSentAnythingAt = now;
1942                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1943            }
1944
1945            evbuffer_free( out );
1946
1947            if( err )
1948            {
1949                bytesWritten = 0;
1950                msgs = NULL;
1951            }
1952        }
1953        else if( fext ) /* peer needs a reject message */
1954        {
1955            protocolSendReject( msgs, &req );
1956        }
1957
1958        if( msgs != NULL )
1959            prefetchPieces( msgs );
1960    }
1961
1962    /**
1963    ***  Keepalive
1964    **/
1965
1966    if( ( msgs != NULL )
1967        && ( msgs->clientSentAnythingAt != 0 )
1968        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1969    {
1970        dbgmsg( msgs, "sending a keepalive message" );
1971        evbuffer_add_uint32( msgs->outMessages, 0 );
1972        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1973    }
1974
1975    return bytesWritten;
1976}
1977
1978static int
1979peerPulse( void * vmsgs )
1980{
1981    tr_peermsgs * msgs = vmsgs;
1982    const time_t  now = tr_time( );
1983
1984    if ( tr_isPeerIo( msgs->peer->io ) ) {
1985        updateDesiredRequestCount( msgs );
1986        updateBlockRequests( msgs );
1987        updateMetadataRequests( msgs, now );
1988    }
1989
1990    for( ;; )
1991        if( fillOutputBuffer( msgs, now ) < 1 )
1992            break;
1993
1994    return true; /* loop forever */
1995}
1996
1997void
1998tr_peerMsgsPulse( tr_peermsgs * msgs )
1999{
2000    if( msgs != NULL )
2001        peerPulse( msgs );
2002}
2003
2004static void
2005gotError( tr_peerIo * io UNUSED, short what, void * vmsgs )
2006{
2007    if( what & BEV_EVENT_TIMEOUT )
2008        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2009    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
2010        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2011               what, errno, tr_strerror( errno ) );
2012    fireError( vmsgs, ENOTCONN );
2013}
2014
2015static void
2016sendBitfield( tr_peermsgs * msgs )
2017{
2018    struct evbuffer * out = msgs->outMessages;
2019    tr_bitfield * bf = tr_cpCreatePieceBitfield( &msgs->torrent->completion );
2020
2021    evbuffer_add_uint32( out, sizeof( uint8_t ) + bf->byteCount );
2022    evbuffer_add_uint8 ( out, BT_BITFIELD );
2023    evbuffer_add       ( out, bf->bits, bf->byteCount );
2024    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
2025    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2026
2027    tr_bitfieldFree( bf );
2028}
2029
2030static void
2031tellPeerWhatWeHave( tr_peermsgs * msgs )
2032{
2033    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2034
2035    if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveAll ) )
2036    {
2037        protocolSendHaveAll( msgs );
2038    }
2039    else if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveNone ) )
2040    {
2041        protocolSendHaveNone( msgs );
2042    }
2043    else
2044    {
2045        sendBitfield( msgs );
2046    }
2047}
2048
2049/**
2050***
2051**/
2052
2053/* some peers give us error messages if we send
2054   more than this many peers in a single pex message
2055   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2056#define MAX_PEX_ADDED 50
2057#define MAX_PEX_DROPPED 50
2058
2059typedef struct
2060{
2061    tr_pex *  added;
2062    tr_pex *  dropped;
2063    tr_pex *  elements;
2064    int       addedCount;
2065    int       droppedCount;
2066    int       elementCount;
2067}
2068PexDiffs;
2069
2070static void
2071pexAddedCb( void * vpex, void * userData )
2072{
2073    PexDiffs * diffs = userData;
2074    tr_pex *   pex = vpex;
2075
2076    if( diffs->addedCount < MAX_PEX_ADDED )
2077    {
2078        diffs->added[diffs->addedCount++] = *pex;
2079        diffs->elements[diffs->elementCount++] = *pex;
2080    }
2081}
2082
2083static inline void
2084pexDroppedCb( void * vpex, void * userData )
2085{
2086    PexDiffs * diffs = userData;
2087    tr_pex *   pex = vpex;
2088
2089    if( diffs->droppedCount < MAX_PEX_DROPPED )
2090    {
2091        diffs->dropped[diffs->droppedCount++] = *pex;
2092    }
2093}
2094
2095static inline void
2096pexElementCb( void * vpex, void * userData )
2097{
2098    PexDiffs * diffs = userData;
2099    tr_pex * pex = vpex;
2100
2101    diffs->elements[diffs->elementCount++] = *pex;
2102}
2103
2104typedef void ( tr_set_func )( void * element, void * userData );
2105
2106/**
2107 * @brief find the differences and commonalities in two sorted sets
2108 * @param a the first set
2109 * @param aCount the number of elements in the set 'a'
2110 * @param b the second set
2111 * @param bCount the number of elements in the set 'b'
2112 * @param compare the sorting method for both sets
2113 * @param elementSize the sizeof the element in the two sorted sets
2114 * @param in_a called for items in set 'a' but not set 'b'
2115 * @param in_b called for items in set 'b' but not set 'a'
2116 * @param in_both called for items that are in both sets
2117 * @param userData user data passed along to in_a, in_b, and in_both
2118 */
2119static void
2120tr_set_compare( const void * va, size_t aCount,
2121                const void * vb, size_t bCount,
2122                int compare( const void * a, const void * b ),
2123                size_t elementSize,
2124                tr_set_func in_a_cb,
2125                tr_set_func in_b_cb,
2126                tr_set_func in_both_cb,
2127                void * userData )
2128{
2129    const uint8_t * a = va;
2130    const uint8_t * b = vb;
2131    const uint8_t * aend = a + elementSize * aCount;
2132    const uint8_t * bend = b + elementSize * bCount;
2133
2134    while( a != aend || b != bend )
2135    {
2136        if( a == aend )
2137        {
2138            ( *in_b_cb )( (void*)b, userData );
2139            b += elementSize;
2140        }
2141        else if( b == bend )
2142        {
2143            ( *in_a_cb )( (void*)a, userData );
2144            a += elementSize;
2145        }
2146        else
2147        {
2148            const int val = ( *compare )( a, b );
2149
2150            if( !val )
2151            {
2152                ( *in_both_cb )( (void*)a, userData );
2153                a += elementSize;
2154                b += elementSize;
2155            }
2156            else if( val < 0 )
2157            {
2158                ( *in_a_cb )( (void*)a, userData );
2159                a += elementSize;
2160            }
2161            else if( val > 0 )
2162            {
2163                ( *in_b_cb )( (void*)b, userData );
2164                b += elementSize;
2165            }
2166        }
2167    }
2168}
2169
2170
2171static void
2172sendPex( tr_peermsgs * msgs )
2173{
2174    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2175    {
2176        PexDiffs diffs;
2177        PexDiffs diffs6;
2178        tr_pex * newPex = NULL;
2179        tr_pex * newPex6 = NULL;
2180        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2181        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2182
2183        /* build the diffs */
2184        diffs.added = tr_new( tr_pex, newCount );
2185        diffs.addedCount = 0;
2186        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2187        diffs.droppedCount = 0;
2188        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2189        diffs.elementCount = 0;
2190        tr_set_compare( msgs->pex, msgs->pexCount,
2191                        newPex, newCount,
2192                        tr_pexCompare, sizeof( tr_pex ),
2193                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2194        diffs6.added = tr_new( tr_pex, newCount6 );
2195        diffs6.addedCount = 0;
2196        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2197        diffs6.droppedCount = 0;
2198        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2199        diffs6.elementCount = 0;
2200        tr_set_compare( msgs->pex6, msgs->pexCount6,
2201                        newPex6, newCount6,
2202                        tr_pexCompare, sizeof( tr_pex ),
2203                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2204        dbgmsg(
2205            msgs,
2206            "pex: old peer count %d+%d, new peer count %d+%d, "
2207            "added %d+%d, removed %d+%d",
2208            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2209            diffs.addedCount, diffs6.addedCount,
2210            diffs.droppedCount, diffs6.droppedCount );
2211
2212        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2213            !diffs6.droppedCount )
2214        {
2215            tr_free( diffs.elements );
2216            tr_free( diffs6.elements );
2217        }
2218        else
2219        {
2220            int  i;
2221            tr_benc val;
2222            char * benc;
2223            int bencLen;
2224            uint8_t * tmp, *walk;
2225            struct evbuffer * out = msgs->outMessages;
2226
2227            /* update peer */
2228            tr_free( msgs->pex );
2229            msgs->pex = diffs.elements;
2230            msgs->pexCount = diffs.elementCount;
2231            tr_free( msgs->pex6 );
2232            msgs->pex6 = diffs6.elements;
2233            msgs->pexCount6 = diffs6.elementCount;
2234
2235            /* build the pex payload */
2236            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2237                                         * speed vs. likelihood? */
2238
2239            if( diffs.addedCount > 0)
2240            {
2241                /* "added" */
2242                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2243                for( i = 0; i < diffs.addedCount; ++i ) {
2244                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2245                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2246                }
2247                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2248                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2249                tr_free( tmp );
2250
2251                /* "added.f"
2252                 * unset each holepunch flag because we don't support it. */
2253                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2254                for( i = 0; i < diffs.addedCount; ++i )
2255                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2256                assert( ( walk - tmp ) == diffs.addedCount );
2257                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2258                tr_free( tmp );
2259            }
2260
2261            if( diffs.droppedCount > 0 )
2262            {
2263                /* "dropped" */
2264                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2265                for( i = 0; i < diffs.droppedCount; ++i ) {
2266                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2267                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2268                }
2269                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2270                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2271                tr_free( tmp );
2272            }
2273
2274            if( diffs6.addedCount > 0 )
2275            {
2276                /* "added6" */
2277                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2278                for( i = 0; i < diffs6.addedCount; ++i ) {
2279                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2280                    walk += 16;
2281                    memcpy( walk, &diffs6.added[i].port, 2 );
2282                    walk += 2;
2283                }
2284                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2285                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2286                tr_free( tmp );
2287
2288                /* "added6.f"
2289                 * unset each holepunch flag because we don't support it. */
2290                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2291                for( i = 0; i < diffs6.addedCount; ++i )
2292                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2293                assert( ( walk - tmp ) == diffs6.addedCount );
2294                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2295                tr_free( tmp );
2296            }
2297
2298            if( diffs6.droppedCount > 0 )
2299            {
2300                /* "dropped6" */
2301                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2302                for( i = 0; i < diffs6.droppedCount; ++i ) {
2303                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2304                    walk += 16;
2305                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2306                    walk += 2;
2307                }
2308                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2309                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2310                tr_free( tmp );
2311            }
2312
2313            /* write the pex message */
2314            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2315            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2316            evbuffer_add_uint8 ( out, BT_LTEP );
2317            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2318            evbuffer_add       ( out, benc, bencLen );
2319            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2320            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2321            dbgOutMessageLen( msgs );
2322
2323            tr_free( benc );
2324            tr_bencFree( &val );
2325        }
2326
2327        /* cleanup */
2328        tr_free( diffs.added );
2329        tr_free( diffs.dropped );
2330        tr_free( newPex );
2331        tr_free( diffs6.added );
2332        tr_free( diffs6.dropped );
2333        tr_free( newPex6 );
2334
2335        /*msgs->clientSentPexAt = tr_time( );*/
2336    }
2337}
2338
2339static void
2340pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2341{
2342    struct tr_peermsgs * msgs = vmsgs;
2343
2344    sendPex( msgs );
2345
2346    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2347}
2348
2349/**
2350***
2351**/
2352
2353tr_peermsgs*
2354tr_peerMsgsNew( struct tr_torrent    * torrent,
2355                struct tr_peer       * peer,
2356                tr_peer_callback     * callback,
2357                void                 * callbackData )
2358{
2359    tr_peermsgs * m;
2360
2361    assert( peer );
2362    assert( peer->io );
2363
2364    m = tr_new0( tr_peermsgs, 1 );
2365    m->callback = callback;
2366    m->callbackData = callbackData;
2367    m->peer = peer;
2368    m->torrent = torrent;
2369    m->peer->clientIsChoked = 1;
2370    m->peer->peerIsChoked = 1;
2371    m->peer->clientIsInterested = 0;
2372    m->peer->peerIsInterested = 0;
2373    m->state = AWAITING_BT_LENGTH;
2374    m->outMessages = evbuffer_new( );
2375    m->outMessagesBatchedAt = 0;
2376    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2377    m->incoming.block = evbuffer_new( );
2378    m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2379    peer->msgs = m;
2380    tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2381
2382    if( tr_peerIoSupportsUTP( peer->io ) ) {
2383        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2384        tr_peerMgrSetUtpSupported( torrent, addr );
2385        tr_peerMgrSetUtpFailed( torrent, addr, false );
2386    }
2387
2388    if( tr_peerIoSupportsLTEP( peer->io ) )
2389        sendLtepHandshake( m );
2390
2391    tellPeerWhatWeHave( m );
2392
2393    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2394    {
2395        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2396        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2397        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2398            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2399        }
2400    }
2401
2402    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2403    updateDesiredRequestCount( m );
2404
2405    return m;
2406}
2407
2408void
2409tr_peerMsgsFree( tr_peermsgs* msgs )
2410{
2411    if( msgs )
2412    {
2413        event_free( msgs->pexTimer );
2414
2415        evbuffer_free( msgs->incoming.block );
2416        evbuffer_free( msgs->outMessages );
2417        tr_free( msgs->pex6 );
2418        tr_free( msgs->pex );
2419
2420        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2421        tr_free( msgs );
2422    }
2423}
Note: See TracBrowser for help on using the repository browser.