source: trunk/libtransmission/peer-msgs.c @ 12918

Last change on this file since 12918 was 12590, checked in by jordan, 10 years ago

(trunk libT) during the extended handshake, don't send the "m" dict if it doesn't have any entries.

  • Property svn:keywords set to Date Rev Author Id
File size: 71.4 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 12590 2011-07-25 22:33:07Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdarg.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <alloca.h>
20
21#include <event2/buffer.h>
22#include <event2/bufferevent.h>
23#include <event2/event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "cache.h"
28#include "completion.h"
29#include "crypto.h" /* tr_sha1() */
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "session.h"
34#include "torrent.h"
35#include "torrent-magnet.h"
36#include "tr-dht.h"
37#include "utils.h"
38#include "version.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56
57    BT_FEXT_SUGGEST         = 13,
58    BT_FEXT_HAVE_ALL        = 14,
59    BT_FEXT_HAVE_NONE       = 15,
60    BT_FEXT_REJECT          = 16,
61    BT_FEXT_ALLOWED_FAST    = 17,
62
63    BT_LTEP                 = 20,
64
65    LTEP_HANDSHAKE          = 0,
66
67    UT_PEX_ID               = 1,
68    UT_METADATA_ID          = 3,
69
70    MAX_PEX_PEER_COUNT      = 50,
71
72    MIN_CHOKE_PERIOD_SEC    = 10,
73
74    /* idle seconds before we send a keepalive */
75    KEEPALIVE_INTERVAL_SECS = 100,
76
77    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
78
79    REQQ                    = 512,
80
81    METADATA_REQQ           = 64,
82
83    /* used in lowering the outMessages queue period */
84    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
85    HIGH_PRIORITY_INTERVAL_SECS = 2,
86    LOW_PRIORITY_INTERVAL_SECS = 10,
87
88    /* number of pieces we'll allow in our fast set */
89    MAX_FAST_SET_SIZE = 3,
90
91    /* defined in BEP #9 */
92    METADATA_MSG_TYPE_REQUEST = 0,
93    METADATA_MSG_TYPE_DATA = 1,
94    METADATA_MSG_TYPE_REJECT = 2
95};
96
97enum
98{
99    AWAITING_BT_LENGTH,
100    AWAITING_BT_ID,
101    AWAITING_BT_MESSAGE,
102    AWAITING_BT_PIECE
103};
104
105/**
106***
107**/
108
109struct peer_request
110{
111    uint32_t    index;
112    uint32_t    offset;
113    uint32_t    length;
114};
115
116static void
117blockToReq( const tr_torrent     * tor,
118            tr_block_index_t       block,
119            struct peer_request  * setme )
120{
121    tr_torrentGetBlockLocation( tor, block, &setme->index,
122                                            &setme->offset,
123                                            &setme->length );
124}
125
126/**
127***
128**/
129
130/* this is raw, unchanged data from the peer regarding
131 * the current message that it's sending us. */
132struct tr_incoming
133{
134    uint8_t                id;
135    uint32_t               length; /* includes the +1 for id length */
136    struct peer_request    blockReq; /* metadata for incoming blocks */
137    struct evbuffer *      block; /* piece data for incoming blocks */
138};
139
140/**
141 * Low-level communication state information about a connected peer.
142 *
143 * This structure remembers the low-level protocol states that we're
144 * in with this peer, such as active requests, pex messages, and so on.
145 * Its fields are all private to peer-msgs.c.
146 *
147 * Data not directly involved with sending & receiving messages is
148 * stored in tr_peer, where it can be accessed by both peermsgs and
149 * the peer manager.
150 *
151 * @see struct peer_atom
152 * @see tr_peer
153 */
154struct tr_peermsgs
155{
156    bool            peerSupportsPex;
157    bool            peerSupportsMetadataXfer;
158    bool            clientSentLtepHandshake;
159    bool            peerSentLtepHandshake;
160
161    /*bool          haveFastSet;*/
162
163    int             desiredRequestCount;
164
165    int             prefetchCount;
166
167    /* how long the outMessages batch should be allowed to grow before
168     * it's flushed -- some messages (like requests >:) should be sent
169     * very quickly; others aren't as urgent. */
170    int8_t          outMessagesBatchPeriod;
171
172    uint8_t         state;
173    uint8_t         ut_pex_id;
174    uint8_t         ut_metadata_id;
175    uint16_t        pexCount;
176    uint16_t        pexCount6;
177
178    size_t          metadata_size_hint;
179#if 0
180    size_t                 fastsetSize;
181    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
182#endif
183
184    tr_peer *              peer;
185
186    tr_torrent *           torrent;
187
188    tr_peer_callback      * callback;
189    void                  * callbackData;
190
191    struct evbuffer *      outMessages; /* all the non-piece messages */
192
193    struct peer_request    peerAskedFor[REQQ];
194
195    int                    peerAskedForMetadata[METADATA_REQQ];
196    int                    peerAskedForMetadataCount;
197
198    tr_pex               * pex;
199    tr_pex               * pex6;
200
201    /*time_t                 clientSentPexAt;*/
202    time_t                 clientSentAnythingAt;
203
204    /* when we started batching the outMessages */
205    time_t                outMessagesBatchedAt;
206
207    struct tr_incoming    incoming;
208
209    /* if the peer supports the Extension Protocol in BEP 10 and
210       supplied a reqq argument, it's stored here. Otherwise, the
211       value is zero and should be ignored. */
212    int64_t               reqq;
213
214    struct event        * pexTimer;
215};
216
217/**
218***
219**/
220
221static inline tr_session*
222getSession( struct tr_peermsgs * msgs )
223{
224    return msgs->torrent->session;
225}
226
227/**
228***
229**/
230
231static void
232myDebug( const char * file, int line,
233         const struct tr_peermsgs * msgs,
234         const char * fmt, ... )
235{
236    FILE * fp = tr_getLog( );
237
238    if( fp )
239    {
240        va_list           args;
241        char              timestr[64];
242        struct evbuffer * buf = evbuffer_new( );
243        char *            base = tr_basename( file );
244
245        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
246                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
247                             tr_torrentName( msgs->torrent ),
248                             tr_peerIoGetAddrStr( msgs->peer->io ),
249                             msgs->peer->client );
250        va_start( args, fmt );
251        evbuffer_add_vprintf( buf, fmt, args );
252        va_end( args );
253        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
254        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
255
256        tr_free( base );
257        evbuffer_free( buf );
258    }
259}
260
261#define dbgmsg( msgs, ... ) \
262    do { \
263        if( tr_deepLoggingIsActive( ) ) \
264            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
265    } while( 0 )
266
267/**
268***
269**/
270
271static void
272pokeBatchPeriod( tr_peermsgs * msgs, int interval )
273{
274    if( msgs->outMessagesBatchPeriod > interval )
275    {
276        msgs->outMessagesBatchPeriod = interval;
277        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
278    }
279}
280
281static void
282dbgOutMessageLen( tr_peermsgs * msgs )
283{
284    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
285}
286
287static void
288protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
289{
290    struct evbuffer * out = msgs->outMessages;
291
292    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
293
294    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
295    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
296    evbuffer_add_uint32( out, req->index );
297    evbuffer_add_uint32( out, req->offset );
298    evbuffer_add_uint32( out, req->length );
299
300    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
301    dbgOutMessageLen( msgs );
302}
303
304static void
305protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
306{
307    struct evbuffer * out = msgs->outMessages;
308
309    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
310    evbuffer_add_uint8 ( out, BT_REQUEST );
311    evbuffer_add_uint32( out, req->index );
312    evbuffer_add_uint32( out, req->offset );
313    evbuffer_add_uint32( out, req->length );
314
315    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
316    dbgOutMessageLen( msgs );
317    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
318}
319
320static void
321protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
322{
323    struct evbuffer * out = msgs->outMessages;
324
325    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
326    evbuffer_add_uint8 ( out, BT_CANCEL );
327    evbuffer_add_uint32( out, req->index );
328    evbuffer_add_uint32( out, req->offset );
329    evbuffer_add_uint32( out, req->length );
330
331    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
332    dbgOutMessageLen( msgs );
333    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
334}
335
336static void
337protocolSendPort(tr_peermsgs *msgs, uint16_t port)
338{
339    struct evbuffer * out = msgs->outMessages;
340
341    dbgmsg( msgs, "sending Port %u", port);
342    evbuffer_add_uint32( out, 3 );
343    evbuffer_add_uint8 ( out, BT_PORT );
344    evbuffer_add_uint16( out, port);
345}
346
347static void
348protocolSendHave( tr_peermsgs * msgs, uint32_t index )
349{
350    struct evbuffer * out = msgs->outMessages;
351
352    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
353    evbuffer_add_uint8 ( out, BT_HAVE );
354    evbuffer_add_uint32( out, index );
355
356    dbgmsg( msgs, "sending Have %u", index );
357    dbgOutMessageLen( msgs );
358    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
359}
360
361#if 0
362static void
363protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
364{
365    tr_peerIo       * io  = msgs->peer->io;
366    struct evbuffer * out = msgs->outMessages;
367
368    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
369
370    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
371    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
372    evbuffer_add_uint32( io, out, pieceIndex );
373
374    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
375    dbgOutMessageLen( msgs );
376}
377#endif
378
379static void
380protocolSendChoke( tr_peermsgs * msgs, int choke )
381{
382    struct evbuffer * out = msgs->outMessages;
383
384    evbuffer_add_uint32( out, sizeof( uint8_t ) );
385    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
386
387    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
388    dbgOutMessageLen( msgs );
389    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
390}
391
392static void
393protocolSendHaveAll( tr_peermsgs * msgs )
394{
395    struct evbuffer * out = msgs->outMessages;
396
397    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
398
399    evbuffer_add_uint32( out, sizeof( uint8_t ) );
400    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
401
402    dbgmsg( msgs, "sending HAVE_ALL..." );
403    dbgOutMessageLen( msgs );
404    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
405}
406
407static void
408protocolSendHaveNone( tr_peermsgs * msgs )
409{
410    struct evbuffer * out = msgs->outMessages;
411
412    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
413
414    evbuffer_add_uint32( out, sizeof( uint8_t ) );
415    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
416
417    dbgmsg( msgs, "sending HAVE_NONE..." );
418    dbgOutMessageLen( msgs );
419    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
420}
421
422/**
423***  EVENTS
424**/
425
426static void
427publish( tr_peermsgs * msgs, tr_peer_event * e )
428{
429    assert( msgs->peer );
430    assert( msgs->peer->msgs == msgs );
431
432    if( msgs->callback != NULL )
433        msgs->callback( msgs->peer, e, msgs->callbackData );
434}
435
436static void
437fireError( tr_peermsgs * msgs, int err )
438{
439    tr_peer_event e = TR_PEER_EVENT_INIT;
440    e.eventType = TR_PEER_ERROR;
441    e.err = err;
442    publish( msgs, &e );
443}
444
445static void
446fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
447{
448    tr_peer_event e = TR_PEER_EVENT_INIT;
449    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
450    e.pieceIndex = req->index;
451    e.offset = req->offset;
452    e.length = req->length;
453    publish( msgs, &e );
454}
455
456static void
457fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
458{
459    tr_peer_event e = TR_PEER_EVENT_INIT;
460    e.eventType = TR_PEER_CLIENT_GOT_REJ;
461    e.pieceIndex = req->index;
462    e.offset = req->offset;
463    e.length = req->length;
464    publish( msgs, &e );
465}
466
467static void
468fireGotChoke( tr_peermsgs * msgs )
469{
470    tr_peer_event e = TR_PEER_EVENT_INIT;
471    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
472    publish( msgs, &e );
473}
474
475static void
476fireClientGotHaveAll( tr_peermsgs * msgs )
477{
478    tr_peer_event e = TR_PEER_EVENT_INIT;
479    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
480    publish( msgs, &e );
481}
482
483static void
484fireClientGotHaveNone( tr_peermsgs * msgs )
485{
486    tr_peer_event e = TR_PEER_EVENT_INIT;
487    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
488    publish( msgs, &e );
489}
490
491static void
492fireClientGotData( tr_peermsgs * msgs, uint32_t length, int wasPieceData )
493{
494    tr_peer_event e = TR_PEER_EVENT_INIT;
495
496    e.length = length;
497    e.eventType = TR_PEER_CLIENT_GOT_DATA;
498    e.wasPieceData = wasPieceData;
499    publish( msgs, &e );
500}
501
502static void
503fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
504{
505    tr_peer_event e = TR_PEER_EVENT_INIT;
506    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
507    e.pieceIndex = pieceIndex;
508    publish( msgs, &e );
509}
510
511static void
512fireClientGotPort( tr_peermsgs * msgs, tr_port port )
513{
514    tr_peer_event e = TR_PEER_EVENT_INIT;
515    e.eventType = TR_PEER_CLIENT_GOT_PORT;
516    e.port = port;
517    publish( msgs, &e );
518}
519
520static void
521fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
522{
523    tr_peer_event e = TR_PEER_EVENT_INIT;
524    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
525    e.pieceIndex = pieceIndex;
526    publish( msgs, &e );
527}
528
529static void
530fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
531{
532    tr_peer_event e = TR_PEER_EVENT_INIT;
533    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
534    e.bitfield = bitfield;
535    publish( msgs, &e );
536}
537
538static void
539fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
540{
541    tr_peer_event e = TR_PEER_EVENT_INIT;
542    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
543    e.pieceIndex = index;
544    publish( msgs, &e );
545}
546
547static void
548firePeerGotData( tr_peermsgs * msgs, uint32_t length, bool wasPieceData )
549{
550    tr_peer_event e = TR_PEER_EVENT_INIT;
551
552    e.length = length;
553    e.eventType = TR_PEER_PEER_GOT_DATA;
554    e.wasPieceData = wasPieceData;
555
556    publish( msgs, &e );
557}
558
559/**
560***  ALLOWED FAST SET
561***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
562**/
563
564#if 0
565size_t
566tr_generateAllowedSet( tr_piece_index_t * setmePieces,
567                       size_t             desiredSetSize,
568                       size_t             pieceCount,
569                       const uint8_t    * infohash,
570                       const tr_address * addr )
571{
572    size_t setSize = 0;
573
574    assert( setmePieces );
575    assert( desiredSetSize <= pieceCount );
576    assert( desiredSetSize );
577    assert( pieceCount );
578    assert( infohash );
579    assert( addr );
580
581    if( addr->type == TR_AF_INET )
582    {
583        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
584        uint8_t x[SHA_DIGEST_LENGTH];
585
586        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
587        memcpy( w, &ui32, sizeof( uint32_t ) );
588        walk += sizeof( uint32_t );
589        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
590        walk += SHA_DIGEST_LENGTH;
591        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
592        assert( sizeof( w ) == walk-w );
593
594        while( setSize<desiredSetSize )
595        {
596            int i;
597            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
598            {
599                size_t k;
600                uint32_t j = i * 4;                                  /* (5) */
601                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
602                uint32_t index = y % pieceCount;                     /* (7) */
603
604                for( k=0; k<setSize; ++k )                           /* (8) */
605                    if( setmePieces[k] == index )
606                        break;
607
608                if( k == setSize )
609                    setmePieces[setSize++] = index;                  /* (9) */
610            }
611
612            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
613        }
614    }
615
616    return setSize;
617}
618
619static void
620updateFastSet( tr_peermsgs * msgs UNUSED )
621{
622    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
623    const int peerIsNeedy = msgs->peer->progress < 0.10;
624
625    if( fext && peerIsNeedy && !msgs->haveFastSet )
626    {
627        size_t i;
628        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
629        const tr_info * inf = &msgs->torrent->info;
630        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
631
632        /* build the fast set */
633        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
634        msgs->haveFastSet = 1;
635
636        /* send it to the peer */
637        for( i=0; i<msgs->fastsetSize; ++i )
638            protocolSendAllowedFast( msgs, msgs->fastset[i] );
639    }
640}
641#endif
642
643/**
644***  INTEREST
645**/
646
647static void
648sendInterest( tr_peermsgs * msgs, bool clientIsInterested )
649{
650    struct evbuffer * out = msgs->outMessages;
651
652    assert( msgs );
653    assert( tr_isBool( clientIsInterested ) );
654
655    msgs->peer->clientIsInterested = clientIsInterested;
656    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
657    evbuffer_add_uint32( out, sizeof( uint8_t ) );
658    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
659
660    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
661    dbgOutMessageLen( msgs );
662}
663
664static void
665updateInterest( tr_peermsgs * msgs UNUSED )
666{
667    /* FIXME -- might need to poke the mgr on startup */
668}
669
670void
671tr_peerMsgsSetInterested( tr_peermsgs * msgs, bool clientIsInterested )
672{
673    assert( tr_isBool( clientIsInterested ) );
674
675    if( clientIsInterested != msgs->peer->clientIsInterested )
676        sendInterest( msgs, clientIsInterested );
677}
678
679static bool
680popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
681{
682    if( msgs->peerAskedForMetadataCount == 0 )
683        return false;
684
685    *piece = msgs->peerAskedForMetadata[0];
686
687    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
688                               msgs->peerAskedForMetadataCount-- );
689
690    return true;
691}
692
693static bool
694popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
695{
696    if( msgs->peer->pendingReqsToClient == 0 )
697        return false;
698
699    *setme = msgs->peerAskedFor[0];
700
701    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
702                               msgs->peer->pendingReqsToClient-- );
703
704    return true;
705}
706
707static void
708cancelAllRequestsToClient( tr_peermsgs * msgs )
709{
710    struct peer_request req;
711    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
712
713    while( popNextRequest( msgs, &req ))
714        if( mustSendCancel )
715            protocolSendReject( msgs, &req );
716}
717
718void
719tr_peerMsgsSetChoke( tr_peermsgs * msgs, bool peerIsChoked )
720{
721    const time_t now = tr_time( );
722    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
723
724    assert( msgs );
725    assert( msgs->peer );
726    assert( tr_isBool( peerIsChoked ) );
727
728    if( msgs->peer->chokeChangedAt > fibrillationTime )
729    {
730        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", peerIsChoked );
731    }
732    else if( msgs->peer->peerIsChoked != peerIsChoked )
733    {
734        msgs->peer->peerIsChoked = peerIsChoked;
735        if( peerIsChoked )
736            cancelAllRequestsToClient( msgs );
737        protocolSendChoke( msgs, peerIsChoked );
738        msgs->peer->chokeChangedAt = now;
739    }
740}
741
742/**
743***
744**/
745
746void
747tr_peerMsgsHave( tr_peermsgs * msgs, uint32_t index )
748{
749    protocolSendHave( msgs, index );
750
751    /* since we have more pieces now, we might not be interested in this peer */
752    updateInterest( msgs );
753}
754
755/**
756***
757**/
758
759static bool
760reqIsValid( const tr_peermsgs * peer,
761            uint32_t            index,
762            uint32_t            offset,
763            uint32_t            length )
764{
765    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
766}
767
768static bool
769requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
770{
771    return reqIsValid( msgs, req->index, req->offset, req->length );
772}
773
774void
775tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
776{
777    struct peer_request req;
778/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
779    blockToReq( msgs->torrent, block, &req );
780    protocolSendCancel( msgs, &req );
781}
782
783/**
784***
785**/
786
787static void
788sendLtepHandshake( tr_peermsgs * msgs )
789{
790    tr_benc val;
791    bool allow_pex;
792    bool allow_metadata_xfer;
793    struct evbuffer * payload;
794    struct evbuffer * out = msgs->outMessages;
795    const unsigned char * ipv6 = tr_globalIPv6();
796
797    if( msgs->clientSentLtepHandshake )
798        return;
799
800    dbgmsg( msgs, "sending an ltep handshake" );
801    msgs->clientSentLtepHandshake = 1;
802
803    /* decide if we want to advertise metadata xfer support (BEP 9) */
804    if( tr_torrentIsPrivate( msgs->torrent ) )
805        allow_metadata_xfer = 0;
806    else
807        allow_metadata_xfer = 1;
808
809    /* decide if we want to advertise pex support */
810    if( !tr_torrentAllowsPex( msgs->torrent ) )
811        allow_pex = 0;
812    else if( msgs->peerSentLtepHandshake )
813        allow_pex = msgs->peerSupportsPex ? 1 : 0;
814    else
815        allow_pex = 1;
816
817    tr_bencInitDict( &val, 8 );
818    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
819    if( ipv6 != NULL )
820        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
821    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
822                            && ( msgs->torrent->infoDictLength > 0 ) )
823        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
824    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
825    tr_bencDictAddInt( &val, "reqq", REQQ );
826    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
827    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
828    if( allow_metadata_xfer || allow_pex ) {
829        tr_benc * m  = tr_bencDictAddDict( &val, "m", 2 );
830        if( allow_metadata_xfer )
831            tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
832        if( allow_pex )
833            tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
834    }
835
836    payload = tr_bencToBuf( &val, TR_FMT_BENC );
837
838    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
839    evbuffer_add_uint8 ( out, BT_LTEP );
840    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
841    evbuffer_add_buffer( out, payload );
842    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
843    dbgOutMessageLen( msgs );
844
845    /* cleanup */
846    evbuffer_free( payload );
847    tr_bencFree( &val );
848}
849
850static void
851parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
852{
853    int64_t   i;
854    tr_benc   val, * sub;
855    uint8_t * tmp = tr_new( uint8_t, len );
856    const uint8_t *addr;
857    size_t addr_len;
858    tr_pex pex;
859    int8_t seedProbability = -1;
860
861    memset( &pex, 0, sizeof( tr_pex ) );
862
863    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
864    msgs->peerSentLtepHandshake = 1;
865
866    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
867    {
868        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
869        tr_free( tmp );
870        return;
871    }
872
873    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
874
875    /* does the peer prefer encrypted connections? */
876    if( tr_bencDictFindInt( &val, "e", &i ) ) {
877        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
878                                              : ENCRYPTION_PREFERENCE_NO;
879        if( i )
880            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
881    }
882
883    /* check supported messages for utorrent pex */
884    msgs->peerSupportsPex = 0;
885    msgs->peerSupportsMetadataXfer = 0;
886
887    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
888        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
889            msgs->peerSupportsPex = i != 0;
890            msgs->ut_pex_id = (uint8_t) i;
891            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
892        }
893        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
894            msgs->peerSupportsMetadataXfer = i != 0;
895            msgs->ut_metadata_id = (uint8_t) i;
896            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
897        }
898        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
899            /* Mysterious µTorrent extension that we don't grok.  However,
900               it implies support for µTP, so use it to indicate that. */
901            tr_peerMgrSetUtpFailed( msgs->torrent,
902                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
903                                    false );
904        }
905    }
906
907    /* look for metainfo size (BEP 9) */
908    if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) {
909        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
910        msgs->metadata_size_hint = (size_t) i;
911    }
912
913    /* look for upload_only (BEP 21) */
914    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
915        seedProbability = i==0 ? 0 : 100;
916
917    /* get peer's listening port */
918    if( tr_bencDictFindInt( &val, "p", &i ) ) {
919        pex.port = htons( (uint16_t)i );
920        fireClientGotPort( msgs, pex.port );
921        dbgmsg( msgs, "peer's port is now %d", (int)i );
922    }
923
924    if( tr_peerIoIsIncoming( msgs->peer->io )
925        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
926        && ( addr_len == 4 ) )
927    {
928        pex.addr.type = TR_AF_INET;
929        memcpy( &pex.addr.addr.addr4, addr, 4 );
930        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
931    }
932
933    if( tr_peerIoIsIncoming( msgs->peer->io )
934        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
935        && ( addr_len == 16 ) )
936    {
937        pex.addr.type = TR_AF_INET6;
938        memcpy( &pex.addr.addr.addr6, addr, 16 );
939        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
940    }
941
942    /* get peer's maximum request queue size */
943    if( tr_bencDictFindInt( &val, "reqq", &i ) )
944        msgs->reqq = i;
945
946    tr_bencFree( &val );
947    tr_free( tmp );
948}
949
950static void
951parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
952{
953    tr_benc dict;
954    char * msg_end;
955    char * benc_end;
956    int64_t msg_type = -1;
957    int64_t piece = -1;
958    int64_t total_size = 0;
959    uint8_t * tmp = tr_new( uint8_t, msglen );
960
961    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
962    msg_end = (char*)tmp + msglen;
963
964    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
965    {
966        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
967        tr_bencDictFindInt( &dict, "piece", &piece );
968        tr_bencDictFindInt( &dict, "total_size", &total_size );
969        tr_bencFree( &dict );
970    }
971
972    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
973            (int)msg_type, (int)piece, (int)total_size );
974
975    if( msg_type == METADATA_MSG_TYPE_REJECT )
976    {
977        /* NOOP */
978    }
979
980    if( ( msg_type == METADATA_MSG_TYPE_DATA )
981        && ( !tr_torrentHasMetadata( msgs->torrent ) )
982        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
983        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
984    {
985        const int pieceLen = msg_end - benc_end;
986        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
987    }
988
989    if( msg_type == METADATA_MSG_TYPE_REQUEST )
990    {
991        if( ( piece >= 0 )
992            && tr_torrentHasMetadata( msgs->torrent )
993            && !tr_torrentIsPrivate( msgs->torrent )
994            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
995        {
996            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
997        }
998        else
999        {
1000            tr_benc tmp;
1001            struct evbuffer * payload;
1002            struct evbuffer * out = msgs->outMessages;
1003
1004            /* build the rejection message */
1005            tr_bencInitDict( &tmp, 2 );
1006            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1007            tr_bencDictAddInt( &tmp, "piece", piece );
1008            payload = tr_bencToBuf( &tmp, TR_FMT_BENC );
1009
1010            /* write it out as a LTEP message to our outMessages buffer */
1011            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
1012            evbuffer_add_uint8 ( out, BT_LTEP );
1013            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1014            evbuffer_add_buffer( out, payload );
1015            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1016            dbgOutMessageLen( msgs );
1017
1018            /* cleanup */
1019            evbuffer_free( payload );
1020            tr_bencFree( &tmp );
1021        }
1022    }
1023
1024    tr_free( tmp );
1025}
1026
1027static void
1028parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1029{
1030    int loaded = 0;
1031    uint8_t * tmp = tr_new( uint8_t, msglen );
1032    tr_benc val;
1033    tr_torrent * tor = msgs->torrent;
1034    const uint8_t * added;
1035    size_t added_len;
1036
1037    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1038
1039    if( tr_torrentAllowsPex( tor )
1040      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1041    {
1042        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1043        {
1044            tr_pex * pex;
1045            size_t i, n;
1046            size_t added_f_len = 0;
1047            const uint8_t * added_f = NULL;
1048
1049            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1050            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1051
1052            n = MIN( n, MAX_PEX_PEER_COUNT );
1053            for( i=0; i<n; ++i )
1054            {
1055                int seedProbability = -1;
1056                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1057                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1058            }
1059
1060            tr_free( pex );
1061        }
1062
1063        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1064        {
1065            tr_pex * pex;
1066            size_t i, n;
1067            size_t added_f_len = 0;
1068            const uint8_t * added_f = NULL;
1069
1070            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1071            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1072
1073            n = MIN( n, MAX_PEX_PEER_COUNT );
1074            for( i=0; i<n; ++i )
1075            {
1076                int seedProbability = -1;
1077                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1078                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1079            }
1080
1081            tr_free( pex );
1082        }
1083    }
1084
1085    if( loaded )
1086        tr_bencFree( &val );
1087    tr_free( tmp );
1088}
1089
1090static void sendPex( tr_peermsgs * msgs );
1091
1092static void
1093parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1094{
1095    uint8_t ltep_msgid;
1096
1097    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1098    msglen--;
1099
1100    if( ltep_msgid == LTEP_HANDSHAKE )
1101    {
1102        dbgmsg( msgs, "got ltep handshake" );
1103        parseLtepHandshake( msgs, msglen, inbuf );
1104        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1105        {
1106            sendLtepHandshake( msgs );
1107            sendPex( msgs );
1108        }
1109    }
1110    else if( ltep_msgid == UT_PEX_ID )
1111    {
1112        dbgmsg( msgs, "got ut pex" );
1113        msgs->peerSupportsPex = 1;
1114        parseUtPex( msgs, msglen, inbuf );
1115    }
1116    else if( ltep_msgid == UT_METADATA_ID )
1117    {
1118        dbgmsg( msgs, "got ut metadata" );
1119        msgs->peerSupportsMetadataXfer = 1;
1120        parseUtMetadata( msgs, msglen, inbuf );
1121    }
1122    else
1123    {
1124        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1125        evbuffer_drain( inbuf, msglen );
1126    }
1127}
1128
1129static int
1130readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1131{
1132    uint32_t len;
1133
1134    if( inlen < sizeof( len ) )
1135        return READ_LATER;
1136
1137    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1138
1139    if( len == 0 ) /* peer sent us a keepalive message */
1140        dbgmsg( msgs, "got KeepAlive" );
1141    else
1142    {
1143        msgs->incoming.length = len;
1144        msgs->state = AWAITING_BT_ID;
1145    }
1146
1147    return READ_NOW;
1148}
1149
1150static int readBtMessage( tr_peermsgs *, struct evbuffer *, size_t );
1151
1152static int
1153readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1154{
1155    uint8_t id;
1156
1157    if( inlen < sizeof( uint8_t ) )
1158        return READ_LATER;
1159
1160    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1161    msgs->incoming.id = id;
1162    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1163
1164    if( id == BT_PIECE )
1165    {
1166        msgs->state = AWAITING_BT_PIECE;
1167        return READ_NOW;
1168    }
1169    else if( msgs->incoming.length != 1 )
1170    {
1171        msgs->state = AWAITING_BT_MESSAGE;
1172        return READ_NOW;
1173    }
1174    else return readBtMessage( msgs, inbuf, inlen - 1 );
1175}
1176
1177static void
1178updatePeerProgress( tr_peermsgs * msgs )
1179{
1180    tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1181
1182    /*updateFastSet( msgs );*/
1183    updateInterest( msgs );
1184}
1185
1186static void
1187prefetchPieces( tr_peermsgs *msgs )
1188{
1189    int i;
1190
1191    if( !getSession(msgs)->isPrefetchEnabled )
1192        return;
1193
1194    /* Maintain 12 prefetched blocks per unchoked peer */
1195    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1196    {
1197        const struct peer_request * req = msgs->peerAskedFor + i;
1198        if( requestIsValid( msgs, req ) )
1199        {
1200            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1201            ++msgs->prefetchCount;
1202        }
1203    }
1204}
1205
1206static void
1207peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1208{
1209    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1210    const int reqIsValid = requestIsValid( msgs, req );
1211    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1212    const int peerIsChoked = msgs->peer->peerIsChoked;
1213
1214    int allow = false;
1215
1216    if( !reqIsValid )
1217        dbgmsg( msgs, "rejecting an invalid request." );
1218    else if( !clientHasPiece )
1219        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1220    else if( peerIsChoked )
1221        dbgmsg( msgs, "rejecting request from choked peer" );
1222    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1223        dbgmsg( msgs, "rejecting request ... reqq is full" );
1224    else
1225        allow = true;
1226
1227    if( allow ) {
1228        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1229        prefetchPieces( msgs );
1230    } else if( fext ) {
1231        protocolSendReject( msgs, req );
1232    }
1233}
1234
1235static bool
1236messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1237{
1238    switch( id )
1239    {
1240        case BT_CHOKE:
1241        case BT_UNCHOKE:
1242        case BT_INTERESTED:
1243        case BT_NOT_INTERESTED:
1244        case BT_FEXT_HAVE_ALL:
1245        case BT_FEXT_HAVE_NONE:
1246            return len == 1;
1247
1248        case BT_HAVE:
1249        case BT_FEXT_SUGGEST:
1250        case BT_FEXT_ALLOWED_FAST:
1251            return len == 5;
1252
1253        case BT_BITFIELD:
1254            if( tr_torrentHasMetadata( msg->torrent ) )
1255                return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1256            /* we don't know the piece count yet,
1257               so we can only guess whether to send true or false */
1258            if( msg->metadata_size_hint > 0 )
1259                return len <= msg->metadata_size_hint;
1260            return true;
1261
1262        case BT_REQUEST:
1263        case BT_CANCEL:
1264        case BT_FEXT_REJECT:
1265            return len == 13;
1266
1267        case BT_PIECE:
1268            return len > 9 && len <= 16393;
1269
1270        case BT_PORT:
1271            return len == 3;
1272
1273        case BT_LTEP:
1274            return len >= 2;
1275
1276        default:
1277            return false;
1278    }
1279}
1280
1281static int clientGotBlock( tr_peermsgs *               msgs,
1282                           struct evbuffer *           block,
1283                           const struct peer_request * req );
1284
1285static int
1286readBtPiece( tr_peermsgs      * msgs,
1287             struct evbuffer  * inbuf,
1288             size_t             inlen,
1289             size_t           * setme_piece_bytes_read )
1290{
1291    struct peer_request * req = &msgs->incoming.blockReq;
1292
1293    assert( evbuffer_get_length( inbuf ) >= inlen );
1294    dbgmsg( msgs, "In readBtPiece" );
1295
1296    if( !req->length )
1297    {
1298        if( inlen < 8 )
1299            return READ_LATER;
1300
1301        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1302        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1303        req->length = msgs->incoming.length - 9;
1304        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1305        return READ_NOW;
1306    }
1307    else
1308    {
1309        int err;
1310        size_t n;
1311        size_t nLeft;
1312        struct evbuffer * block_buffer;
1313
1314        if( msgs->incoming.block == NULL )
1315            msgs->incoming.block = evbuffer_new( );
1316        block_buffer = msgs->incoming.block;
1317
1318        /* read in another chunk of data */
1319        nLeft = req->length - evbuffer_get_length( block_buffer );
1320        n = MIN( nLeft, inlen );
1321
1322        tr_peerIoReadBytesToBuf( msgs->peer->io, inbuf, block_buffer, n );
1323
1324        fireClientGotData( msgs, n, true );
1325        *setme_piece_bytes_read += n;
1326        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1327               n, req->index, req->offset, req->length,
1328               (int)( req->length - evbuffer_get_length( block_buffer ) ) );
1329        if( evbuffer_get_length( block_buffer ) < req->length )
1330            return READ_LATER;
1331
1332        /* pass the block along... */
1333        err = clientGotBlock( msgs, block_buffer, req );
1334        evbuffer_drain( block_buffer, evbuffer_get_length( block_buffer ) );
1335
1336        /* cleanup */
1337        req->length = 0;
1338        msgs->state = AWAITING_BT_LENGTH;
1339        return err ? READ_ERR : READ_NOW;
1340    }
1341}
1342
1343static void updateDesiredRequestCount( tr_peermsgs * msgs );
1344
1345static int
1346readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1347{
1348    uint32_t      ui32;
1349    uint32_t      msglen = msgs->incoming.length;
1350    const uint8_t id = msgs->incoming.id;
1351#ifndef NDEBUG
1352    const size_t  startBufLen = evbuffer_get_length( inbuf );
1353#endif
1354    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1355
1356    --msglen; /* id length */
1357
1358    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1359
1360    if( inlen < msglen )
1361        return READ_LATER;
1362
1363    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1364    {
1365        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1366        fireError( msgs, EMSGSIZE );
1367        return READ_ERR;
1368    }
1369
1370    switch( id )
1371    {
1372        case BT_CHOKE:
1373            dbgmsg( msgs, "got Choke" );
1374            msgs->peer->clientIsChoked = 1;
1375            if( !fext )
1376                fireGotChoke( msgs );
1377            break;
1378
1379        case BT_UNCHOKE:
1380            dbgmsg( msgs, "got Unchoke" );
1381            msgs->peer->clientIsChoked = 0;
1382            updateDesiredRequestCount( msgs );
1383            break;
1384
1385        case BT_INTERESTED:
1386            dbgmsg( msgs, "got Interested" );
1387            msgs->peer->peerIsInterested = 1;
1388            break;
1389
1390        case BT_NOT_INTERESTED:
1391            dbgmsg( msgs, "got Not Interested" );
1392            msgs->peer->peerIsInterested = 0;
1393            break;
1394
1395        case BT_HAVE:
1396            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1397            dbgmsg( msgs, "got Have: %u", ui32 );
1398            if( tr_torrentHasMetadata( msgs->torrent )
1399                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1400            {
1401                fireError( msgs, ERANGE );
1402                return READ_ERR;
1403            }
1404
1405            /* a peer can send the same HAVE message twice... */
1406            if( !tr_bitfieldHas( &msgs->peer->have, ui32 ) ) {
1407                tr_bitfieldAdd( &msgs->peer->have, ui32 );
1408                fireClientGotHave( msgs, ui32 );
1409            }
1410            updatePeerProgress( msgs );
1411            break;
1412
1413        case BT_BITFIELD: {
1414            uint8_t * tmp = tr_new( uint8_t, msglen );
1415            dbgmsg( msgs, "got a bitfield" );
1416            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1417            tr_bitfieldSetRaw( &msgs->peer->have, tmp, msglen );
1418            fireClientGotBitfield( msgs, &msgs->peer->have );
1419            updatePeerProgress( msgs );
1420            tr_free( tmp );
1421            break;
1422        }
1423
1424        case BT_REQUEST:
1425        {
1426            struct peer_request r;
1427            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1428            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1429            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1430            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1431            peerMadeRequest( msgs, &r );
1432            break;
1433        }
1434
1435        case BT_CANCEL:
1436        {
1437            int i;
1438            struct peer_request r;
1439            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1440            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1441            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1442            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1443            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1444
1445            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1446                const struct peer_request * req = msgs->peerAskedFor + i;
1447                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1448                    break;
1449            }
1450
1451            if( i < msgs->peer->pendingReqsToClient )
1452                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1453                                           msgs->peer->pendingReqsToClient-- );
1454            break;
1455        }
1456
1457        case BT_PIECE:
1458            assert( 0 ); /* handled elsewhere! */
1459            break;
1460
1461        case BT_PORT:
1462            dbgmsg( msgs, "Got a BT_PORT" );
1463            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1464            if( msgs->peer->dht_port > 0 )
1465                tr_dhtAddNode( getSession(msgs),
1466                               tr_peerAddress( msgs->peer ),
1467                               msgs->peer->dht_port, 0 );
1468            break;
1469
1470        case BT_FEXT_SUGGEST:
1471            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1472            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1473            if( fext )
1474                fireClientGotSuggest( msgs, ui32 );
1475            else {
1476                fireError( msgs, EMSGSIZE );
1477                return READ_ERR;
1478            }
1479            break;
1480
1481        case BT_FEXT_ALLOWED_FAST:
1482            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1483            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1484            if( fext )
1485                fireClientGotAllowedFast( msgs, ui32 );
1486            else {
1487                fireError( msgs, EMSGSIZE );
1488                return READ_ERR;
1489            }
1490            break;
1491
1492        case BT_FEXT_HAVE_ALL:
1493            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1494            if( fext ) {
1495                tr_bitfieldSetHasAll( &msgs->peer->have );
1496assert( tr_bitfieldHasAll( &msgs->peer->have ) );
1497                fireClientGotHaveAll( msgs );
1498                updatePeerProgress( msgs );
1499            } else {
1500                fireError( msgs, EMSGSIZE );
1501                return READ_ERR;
1502            }
1503            break;
1504
1505        case BT_FEXT_HAVE_NONE:
1506            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1507            if( fext ) {
1508                tr_bitfieldSetHasNone( &msgs->peer->have );
1509                fireClientGotHaveNone( msgs );
1510                updatePeerProgress( msgs );
1511            } else {
1512                fireError( msgs, EMSGSIZE );
1513                return READ_ERR;
1514            }
1515            break;
1516
1517        case BT_FEXT_REJECT:
1518        {
1519            struct peer_request r;
1520            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1521            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1522            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1523            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1524            if( fext )
1525                fireGotRej( msgs, &r );
1526            else {
1527                fireError( msgs, EMSGSIZE );
1528                return READ_ERR;
1529            }
1530            break;
1531        }
1532
1533        case BT_LTEP:
1534            dbgmsg( msgs, "Got a BT_LTEP" );
1535            parseLtep( msgs, msglen, inbuf );
1536            break;
1537
1538        default:
1539            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1540            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1541            break;
1542    }
1543
1544    assert( msglen + 1 == msgs->incoming.length );
1545    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1546
1547    msgs->state = AWAITING_BT_LENGTH;
1548    return READ_NOW;
1549}
1550
1551/* returns 0 on success, or an errno on failure */
1552static int
1553clientGotBlock( tr_peermsgs                * msgs,
1554                struct evbuffer            * data,
1555                const struct peer_request  * req )
1556{
1557    int err;
1558    tr_torrent * tor = msgs->torrent;
1559    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1560
1561    assert( msgs );
1562    assert( req );
1563
1564    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1565        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1566                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1567        return EMSGSIZE;
1568    }
1569
1570    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1571
1572    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1573        dbgmsg( msgs, "we didn't ask for this message..." );
1574        return 0;
1575    }
1576    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1577        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1578        return 0;
1579    }
1580
1581    /**
1582    ***  Save the block
1583    **/
1584
1585    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1586        return err;
1587
1588    tr_bitfieldAdd( &msgs->peer->blame, req->index );
1589    fireGotBlock( msgs, req );
1590    return 0;
1591}
1592
1593static int peerPulse( void * vmsgs );
1594
1595static void
1596didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1597{
1598    tr_peermsgs * msgs = vmsgs;
1599    firePeerGotData( msgs, bytesWritten, wasPieceData );
1600
1601    if ( tr_isPeerIo( io ) && io->userData )
1602        peerPulse( msgs );
1603}
1604
1605static ReadState
1606canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1607{
1608    ReadState         ret;
1609    tr_peermsgs *     msgs = vmsgs;
1610    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1611    const size_t      inlen = evbuffer_get_length( in );
1612
1613    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1614
1615    if( !inlen )
1616    {
1617        ret = READ_LATER;
1618    }
1619    else if( msgs->state == AWAITING_BT_PIECE )
1620    {
1621        ret = readBtPiece( msgs, in, inlen, piece );
1622    }
1623    else switch( msgs->state )
1624    {
1625        case AWAITING_BT_LENGTH:
1626            ret = readBtLength ( msgs, in, inlen ); break;
1627
1628        case AWAITING_BT_ID:
1629            ret = readBtId     ( msgs, in, inlen ); break;
1630
1631        case AWAITING_BT_MESSAGE:
1632            ret = readBtMessage( msgs, in, inlen ); break;
1633
1634        default:
1635            ret = READ_ERR;
1636            assert( 0 );
1637    }
1638
1639    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1640
1641    /* log the raw data that was read */
1642    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1643        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), false );
1644
1645    return ret;
1646}
1647
1648int
1649tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1650{
1651    if( msgs->state != AWAITING_BT_PIECE )
1652        return false;
1653
1654    return block == _tr_block( msgs->torrent,
1655                               msgs->incoming.blockReq.index,
1656                               msgs->incoming.blockReq.offset );
1657}
1658
1659/**
1660***
1661**/
1662
1663static void
1664updateDesiredRequestCount( tr_peermsgs * msgs )
1665{
1666    const tr_torrent * const torrent = msgs->torrent;
1667
1668    /* there are lots of reasons we might not want to request any blocks... */
1669    if( tr_torrentIsSeed( torrent ) || !tr_torrentHasMetadata( torrent )
1670                                    || msgs->peer->clientIsChoked
1671                                    || !msgs->peer->clientIsInterested )
1672    {
1673        msgs->desiredRequestCount = 0;
1674    }
1675    else
1676    {
1677        int estimatedBlocksInPeriod;
1678        int rate_Bps;
1679        int irate_Bps;
1680        const int floor = 4;
1681        const int seconds = REQUEST_BUF_SECS;
1682        const uint64_t now = tr_time_msec( );
1683
1684        /* Get the rate limit we should use.
1685         * FIXME: this needs to consider all the other peers as well... */
1686        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1687        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1688            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1689
1690        /* honor the session limits, if enabled */
1691        if( tr_torrentUsesSessionLimits( torrent ) )
1692            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1693                rate_Bps = MIN( rate_Bps, irate_Bps );
1694
1695        /* use this desired rate to figure out how
1696         * many requests we should send to this peer */
1697        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1698        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1699
1700        /* honor the peer's maximum request count, if specified */
1701        if( msgs->reqq > 0 )
1702            if( msgs->desiredRequestCount > msgs->reqq )
1703                msgs->desiredRequestCount = msgs->reqq;
1704    }
1705}
1706
1707static void
1708updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1709{
1710    int piece;
1711
1712    if( msgs->peerSupportsMetadataXfer
1713        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1714    {
1715        tr_benc tmp;
1716        struct evbuffer * payload;
1717        struct evbuffer * out = msgs->outMessages;
1718
1719        /* build the data message */
1720        tr_bencInitDict( &tmp, 3 );
1721        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1722        tr_bencDictAddInt( &tmp, "piece", piece );
1723        payload = tr_bencToBuf( &tmp, TR_FMT_BENC );
1724
1725        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1726
1727        /* write it out as a LTEP message to our outMessages buffer */
1728        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
1729        evbuffer_add_uint8 ( out, BT_LTEP );
1730        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1731        evbuffer_add_buffer( out, payload );
1732        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1733        dbgOutMessageLen( msgs );
1734
1735        /* cleanup */
1736        evbuffer_free( payload );
1737        tr_bencFree( &tmp );
1738    }
1739}
1740
1741static void
1742updateBlockRequests( tr_peermsgs * msgs )
1743{
1744    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1745        && ( msgs->desiredRequestCount > 0 )
1746        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1747    {
1748        int i;
1749        int n;
1750        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1751        tr_block_index_t * blocks = alloca( sizeof( tr_block_index_t ) * numwant );
1752
1753        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n, false );
1754
1755        for( i=0; i<n; ++i )
1756        {
1757            struct peer_request req;
1758            blockToReq( msgs->torrent, blocks[i], &req );
1759            protocolSendRequest( msgs, &req );
1760        }
1761    }
1762}
1763
1764static size_t
1765fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1766{
1767    int piece;
1768    size_t bytesWritten = 0;
1769    struct peer_request req;
1770    const bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1771    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1772
1773    /**
1774    ***  Protocol messages
1775    **/
1776
1777    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1778    {
1779        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1780        msgs->outMessagesBatchedAt = now;
1781    }
1782    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1783    {
1784        const size_t len = evbuffer_get_length( msgs->outMessages );
1785        /* flush the protocol messages */
1786        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1787        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, false );
1788        msgs->clientSentAnythingAt = now;
1789        msgs->outMessagesBatchedAt = 0;
1790        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1791        bytesWritten +=  len;
1792    }
1793
1794    /**
1795    ***  Metadata Pieces
1796    **/
1797
1798    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1799        && popNextMetadataRequest( msgs, &piece ) )
1800    {
1801        char * data;
1802        int dataLen;
1803        bool ok = false;
1804
1805        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1806        if( ( dataLen > 0 ) && ( data != NULL ) )
1807        {
1808            tr_benc tmp;
1809            struct evbuffer * payload;
1810            struct evbuffer * out = msgs->outMessages;
1811
1812            /* build the data message */
1813            tr_bencInitDict( &tmp, 3 );
1814            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1815            tr_bencDictAddInt( &tmp, "piece", piece );
1816            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1817            payload = tr_bencToBuf( &tmp, TR_FMT_BENC );
1818
1819            /* write it out as a LTEP message to our outMessages buffer */
1820            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) + dataLen );
1821            evbuffer_add_uint8 ( out, BT_LTEP );
1822            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1823            evbuffer_add_buffer( out, payload );
1824            evbuffer_add       ( out, data, dataLen );
1825            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1826            dbgOutMessageLen( msgs );
1827
1828            evbuffer_free( payload );
1829            tr_bencFree( &tmp );
1830            tr_free( data );
1831
1832            ok = true;
1833        }
1834
1835        if( !ok ) /* send a rejection message */
1836        {
1837            tr_benc tmp;
1838            struct evbuffer * payload;
1839            struct evbuffer * out = msgs->outMessages;
1840
1841            /* build the rejection message */
1842            tr_bencInitDict( &tmp, 2 );
1843            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1844            tr_bencDictAddInt( &tmp, "piece", piece );
1845            payload = tr_bencToBuf( &tmp, TR_FMT_BENC );
1846
1847            /* write it out as a LTEP message to our outMessages buffer */
1848            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
1849            evbuffer_add_uint8 ( out, BT_LTEP );
1850            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1851            evbuffer_add_buffer( out, payload );
1852            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1853            dbgOutMessageLen( msgs );
1854
1855            evbuffer_free( payload );
1856            tr_bencFree( &tmp );
1857        }
1858    }
1859
1860    /**
1861    ***  Data Blocks
1862    **/
1863
1864    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1865        && popNextRequest( msgs, &req ) )
1866    {
1867        --msgs->prefetchCount;
1868
1869        if( requestIsValid( msgs, &req )
1870            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1871        {
1872            int err;
1873            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1874            struct evbuffer * out;
1875            struct evbuffer_iovec iovec[1];
1876
1877            out = evbuffer_new( );
1878            evbuffer_expand( out, msglen );
1879
1880            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1881            evbuffer_add_uint8 ( out, BT_PIECE );
1882            evbuffer_add_uint32( out, req.index );
1883            evbuffer_add_uint32( out, req.offset );
1884
1885            evbuffer_reserve_space( out, req.length, iovec, 1 );
1886            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1887            iovec[0].iov_len = req.length;
1888            evbuffer_commit_space( out, iovec, 1 );
1889
1890            /* check the piece if it needs checking... */
1891            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1892                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1893                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1894
1895            if( err )
1896            {
1897                if( fext )
1898                    protocolSendReject( msgs, &req );
1899            }
1900            else
1901            {
1902                const size_t n = evbuffer_get_length( out );
1903                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1904                assert( n == msglen );
1905                tr_peerIoWriteBuf( msgs->peer->io, out, true );
1906                bytesWritten += n;
1907                msgs->clientSentAnythingAt = now;
1908                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1909            }
1910
1911            evbuffer_free( out );
1912
1913            if( err )
1914            {
1915                bytesWritten = 0;
1916                msgs = NULL;
1917            }
1918        }
1919        else if( fext ) /* peer needs a reject message */
1920        {
1921            protocolSendReject( msgs, &req );
1922        }
1923
1924        if( msgs != NULL )
1925            prefetchPieces( msgs );
1926    }
1927
1928    /**
1929    ***  Keepalive
1930    **/
1931
1932    if( ( msgs != NULL )
1933        && ( msgs->clientSentAnythingAt != 0 )
1934        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1935    {
1936        dbgmsg( msgs, "sending a keepalive message" );
1937        evbuffer_add_uint32( msgs->outMessages, 0 );
1938        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1939    }
1940
1941    return bytesWritten;
1942}
1943
1944static int
1945peerPulse( void * vmsgs )
1946{
1947    tr_peermsgs * msgs = vmsgs;
1948    const time_t  now = tr_time( );
1949
1950    if ( tr_isPeerIo( msgs->peer->io ) ) {
1951        updateDesiredRequestCount( msgs );
1952        updateBlockRequests( msgs );
1953        updateMetadataRequests( msgs, now );
1954    }
1955
1956    for( ;; )
1957        if( fillOutputBuffer( msgs, now ) < 1 )
1958            break;
1959
1960    return true; /* loop forever */
1961}
1962
1963void
1964tr_peerMsgsPulse( tr_peermsgs * msgs )
1965{
1966    if( msgs != NULL )
1967        peerPulse( msgs );
1968}
1969
1970static void
1971gotError( tr_peerIo * io UNUSED, short what, void * vmsgs )
1972{
1973    if( what & BEV_EVENT_TIMEOUT )
1974        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1975    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
1976        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1977               what, errno, tr_strerror( errno ) );
1978    fireError( vmsgs, ENOTCONN );
1979}
1980
1981static void
1982sendBitfield( tr_peermsgs * msgs )
1983{
1984    size_t byte_count = 0;
1985    struct evbuffer * out = msgs->outMessages;
1986    void * bytes = tr_cpCreatePieceBitfield( &msgs->torrent->completion, &byte_count );
1987
1988    evbuffer_add_uint32( out, sizeof( uint8_t ) + byte_count );
1989    evbuffer_add_uint8 ( out, BT_BITFIELD );
1990    evbuffer_add       ( out, bytes, byte_count );
1991    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
1992    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1993
1994    tr_free( bytes );
1995}
1996
1997static void
1998tellPeerWhatWeHave( tr_peermsgs * msgs )
1999{
2000    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2001
2002    if( fext && tr_cpHasAll( &msgs->torrent->completion ) )
2003    {
2004        protocolSendHaveAll( msgs );
2005    }
2006    else if( fext && tr_cpHasNone( &msgs->torrent->completion ) )
2007    {
2008        protocolSendHaveNone( msgs );
2009    }
2010    else if( !tr_cpHasNone( &msgs->torrent->completion ) )
2011    {
2012        sendBitfield( msgs );
2013    }
2014}
2015
2016/**
2017***
2018**/
2019
2020/* some peers give us error messages if we send
2021   more than this many peers in a single pex message
2022   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2023#define MAX_PEX_ADDED 50
2024#define MAX_PEX_DROPPED 50
2025
2026typedef struct
2027{
2028    tr_pex *  added;
2029    tr_pex *  dropped;
2030    tr_pex *  elements;
2031    int       addedCount;
2032    int       droppedCount;
2033    int       elementCount;
2034}
2035PexDiffs;
2036
2037static void
2038pexAddedCb( void * vpex, void * userData )
2039{
2040    PexDiffs * diffs = userData;
2041    tr_pex *   pex = vpex;
2042
2043    if( diffs->addedCount < MAX_PEX_ADDED )
2044    {
2045        diffs->added[diffs->addedCount++] = *pex;
2046        diffs->elements[diffs->elementCount++] = *pex;
2047    }
2048}
2049
2050static inline void
2051pexDroppedCb( void * vpex, void * userData )
2052{
2053    PexDiffs * diffs = userData;
2054    tr_pex *   pex = vpex;
2055
2056    if( diffs->droppedCount < MAX_PEX_DROPPED )
2057    {
2058        diffs->dropped[diffs->droppedCount++] = *pex;
2059    }
2060}
2061
2062static inline void
2063pexElementCb( void * vpex, void * userData )
2064{
2065    PexDiffs * diffs = userData;
2066    tr_pex * pex = vpex;
2067
2068    diffs->elements[diffs->elementCount++] = *pex;
2069}
2070
2071typedef void ( tr_set_func )( void * element, void * userData );
2072
2073/**
2074 * @brief find the differences and commonalities in two sorted sets
2075 * @param a the first set
2076 * @param aCount the number of elements in the set 'a'
2077 * @param b the second set
2078 * @param bCount the number of elements in the set 'b'
2079 * @param compare the sorting method for both sets
2080 * @param elementSize the sizeof the element in the two sorted sets
2081 * @param in_a called for items in set 'a' but not set 'b'
2082 * @param in_b called for items in set 'b' but not set 'a'
2083 * @param in_both called for items that are in both sets
2084 * @param userData user data passed along to in_a, in_b, and in_both
2085 */
2086static void
2087tr_set_compare( const void * va, size_t aCount,
2088                const void * vb, size_t bCount,
2089                int compare( const void * a, const void * b ),
2090                size_t elementSize,
2091                tr_set_func in_a_cb,
2092                tr_set_func in_b_cb,
2093                tr_set_func in_both_cb,
2094                void * userData )
2095{
2096    const uint8_t * a = va;
2097    const uint8_t * b = vb;
2098    const uint8_t * aend = a + elementSize * aCount;
2099    const uint8_t * bend = b + elementSize * bCount;
2100
2101    while( a != aend || b != bend )
2102    {
2103        if( a == aend )
2104        {
2105            ( *in_b_cb )( (void*)b, userData );
2106            b += elementSize;
2107        }
2108        else if( b == bend )
2109        {
2110            ( *in_a_cb )( (void*)a, userData );
2111            a += elementSize;
2112        }
2113        else
2114        {
2115            const int val = ( *compare )( a, b );
2116
2117            if( !val )
2118            {
2119                ( *in_both_cb )( (void*)a, userData );
2120                a += elementSize;
2121                b += elementSize;
2122            }
2123            else if( val < 0 )
2124            {
2125                ( *in_a_cb )( (void*)a, userData );
2126                a += elementSize;
2127            }
2128            else if( val > 0 )
2129            {
2130                ( *in_b_cb )( (void*)b, userData );
2131                b += elementSize;
2132            }
2133        }
2134    }
2135}
2136
2137
2138static void
2139sendPex( tr_peermsgs * msgs )
2140{
2141    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2142    {
2143        PexDiffs diffs;
2144        PexDiffs diffs6;
2145        tr_pex * newPex = NULL;
2146        tr_pex * newPex6 = NULL;
2147        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2148        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2149
2150        /* build the diffs */
2151        diffs.added = tr_new( tr_pex, newCount );
2152        diffs.addedCount = 0;
2153        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2154        diffs.droppedCount = 0;
2155        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2156        diffs.elementCount = 0;
2157        tr_set_compare( msgs->pex, msgs->pexCount,
2158                        newPex, newCount,
2159                        tr_pexCompare, sizeof( tr_pex ),
2160                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2161        diffs6.added = tr_new( tr_pex, newCount6 );
2162        diffs6.addedCount = 0;
2163        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2164        diffs6.droppedCount = 0;
2165        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2166        diffs6.elementCount = 0;
2167        tr_set_compare( msgs->pex6, msgs->pexCount6,
2168                        newPex6, newCount6,
2169                        tr_pexCompare, sizeof( tr_pex ),
2170                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2171        dbgmsg(
2172            msgs,
2173            "pex: old peer count %d+%d, new peer count %d+%d, "
2174            "added %d+%d, removed %d+%d",
2175            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2176            diffs.addedCount, diffs6.addedCount,
2177            diffs.droppedCount, diffs6.droppedCount );
2178
2179        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2180            !diffs6.droppedCount )
2181        {
2182            tr_free( diffs.elements );
2183            tr_free( diffs6.elements );
2184        }
2185        else
2186        {
2187            int  i;
2188            tr_benc val;
2189            uint8_t * tmp, *walk;
2190            struct evbuffer * payload;
2191            struct evbuffer * out = msgs->outMessages;
2192
2193            /* update peer */
2194            tr_free( msgs->pex );
2195            msgs->pex = diffs.elements;
2196            msgs->pexCount = diffs.elementCount;
2197            tr_free( msgs->pex6 );
2198            msgs->pex6 = diffs6.elements;
2199            msgs->pexCount6 = diffs6.elementCount;
2200
2201            /* build the pex payload */
2202            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2203                                         * speed vs. likelihood? */
2204
2205            if( diffs.addedCount > 0)
2206            {
2207                /* "added" */
2208                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2209                for( i = 0; i < diffs.addedCount; ++i ) {
2210                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2211                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2212                }
2213                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2214                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2215                tr_free( tmp );
2216
2217                /* "added.f"
2218                 * unset each holepunch flag because we don't support it. */
2219                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2220                for( i = 0; i < diffs.addedCount; ++i )
2221                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2222                assert( ( walk - tmp ) == diffs.addedCount );
2223                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2224                tr_free( tmp );
2225            }
2226
2227            if( diffs.droppedCount > 0 )
2228            {
2229                /* "dropped" */
2230                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2231                for( i = 0; i < diffs.droppedCount; ++i ) {
2232                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2233                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2234                }
2235                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2236                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2237                tr_free( tmp );
2238            }
2239
2240            if( diffs6.addedCount > 0 )
2241            {
2242                /* "added6" */
2243                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2244                for( i = 0; i < diffs6.addedCount; ++i ) {
2245                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2246                    walk += 16;
2247                    memcpy( walk, &diffs6.added[i].port, 2 );
2248                    walk += 2;
2249                }
2250                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2251                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2252                tr_free( tmp );
2253
2254                /* "added6.f"
2255                 * unset each holepunch flag because we don't support it. */
2256                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2257                for( i = 0; i < diffs6.addedCount; ++i )
2258                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2259                assert( ( walk - tmp ) == diffs6.addedCount );
2260                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2261                tr_free( tmp );
2262            }
2263
2264            if( diffs6.droppedCount > 0 )
2265            {
2266                /* "dropped6" */
2267                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2268                for( i = 0; i < diffs6.droppedCount; ++i ) {
2269                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2270                    walk += 16;
2271                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2272                    walk += 2;
2273                }
2274                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2275                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2276                tr_free( tmp );
2277            }
2278
2279            /* write the pex message */
2280            payload = tr_bencToBuf( &val, TR_FMT_BENC );
2281            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
2282            evbuffer_add_uint8 ( out, BT_LTEP );
2283            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2284            evbuffer_add_buffer( out, payload );
2285            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2286            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2287            dbgOutMessageLen( msgs );
2288
2289            evbuffer_free( payload );
2290            tr_bencFree( &val );
2291        }
2292
2293        /* cleanup */
2294        tr_free( diffs.added );
2295        tr_free( diffs.dropped );
2296        tr_free( newPex );
2297        tr_free( diffs6.added );
2298        tr_free( diffs6.dropped );
2299        tr_free( newPex6 );
2300
2301        /*msgs->clientSentPexAt = tr_time( );*/
2302    }
2303}
2304
2305static void
2306pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2307{
2308    struct tr_peermsgs * msgs = vmsgs;
2309
2310    sendPex( msgs );
2311
2312    assert( msgs->pexTimer != NULL );
2313    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2314}
2315
2316/**
2317***
2318**/
2319
2320tr_peermsgs*
2321tr_peerMsgsNew( struct tr_torrent    * torrent,
2322                struct tr_peer       * peer,
2323                tr_peer_callback     * callback,
2324                void                 * callbackData )
2325{
2326    tr_peermsgs * m;
2327
2328    assert( peer );
2329    assert( peer->io );
2330
2331    m = tr_new0( tr_peermsgs, 1 );
2332    m->callback = callback;
2333    m->callbackData = callbackData;
2334    m->peer = peer;
2335    m->torrent = torrent;
2336    m->peer->clientIsChoked = 1;
2337    m->peer->peerIsChoked = 1;
2338    m->peer->clientIsInterested = 0;
2339    m->peer->peerIsInterested = 0;
2340    m->state = AWAITING_BT_LENGTH;
2341    m->outMessages = evbuffer_new( );
2342    m->outMessagesBatchedAt = 0;
2343    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2344    peer->msgs = m;
2345
2346    if( tr_torrentAllowsPex( torrent ) ) {
2347        m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2348        tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2349    }
2350
2351    if( tr_peerIoSupportsUTP( peer->io ) ) {
2352        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2353        tr_peerMgrSetUtpSupported( torrent, addr );
2354        tr_peerMgrSetUtpFailed( torrent, addr, false );
2355    }
2356
2357    if( tr_peerIoSupportsLTEP( peer->io ) )
2358        sendLtepHandshake( m );
2359
2360    tellPeerWhatWeHave( m );
2361
2362    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2363    {
2364        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2365        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2366        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2367            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2368        }
2369    }
2370
2371    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2372    updateDesiredRequestCount( m );
2373
2374    return m;
2375}
2376
2377void
2378tr_peerMsgsFree( tr_peermsgs* msgs )
2379{
2380    if( msgs )
2381    {
2382        if( msgs->pexTimer != NULL )
2383            event_free( msgs->pexTimer );
2384
2385        if( msgs->incoming.block != NULL )
2386            evbuffer_free( msgs->incoming.block );
2387
2388        evbuffer_free( msgs->outMessages );
2389        tr_free( msgs->pex6 );
2390        tr_free( msgs->pex );
2391
2392        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2393        tr_free( msgs );
2394    }
2395}
Note: See TracBrowser for help on using the repository browser.